opendal/services/compfs/
core.rs1use std::future::Future;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use compio::buf::{IoBuf, IoBuffer, IoVectoredBuf};
23use compio::dispatcher::Dispatcher;
24
25use crate::raw::*;
26use crate::*;
27
28unsafe impl IoBuf for Buffer {
29 fn as_buf_ptr(&self) -> *const u8 {
30 self.current().as_ptr()
31 }
32
33 fn buf_len(&self) -> usize {
34 self.current().len()
35 }
36
37 fn buf_capacity(&self) -> usize {
38 self.current().len()
40 }
41}
42
43#[derive(Debug)]
44pub(super) struct CompfsCore {
45 pub info: Arc<AccessorInfo>,
46
47 pub root: PathBuf,
48 pub dispatcher: Dispatcher,
49 pub buf_pool: oio::PooledBuf,
50}
51
52impl CompfsCore {
53 pub fn prepare_path(&self, path: &str) -> PathBuf {
54 self.root.join(path.trim_end_matches('/'))
55 }
56
57 pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
58 where
59 Fn: FnOnce() -> Fut + Send + 'static,
60 Fut: Future<Output = std::io::Result<R>> + 'static,
61 R: Send + 'static,
62 {
63 self.dispatcher
64 .dispatch(f)
65 .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn io task failed"))?
66 .await
67 .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))?
68 .map_err(new_std_io_error)
69 }
70
71 pub async fn exec_blocking<Fn, R>(&self, f: Fn) -> Result<R>
72 where
73 Fn: FnOnce() -> R + Send + 'static,
74 R: Send + 'static,
75 {
76 self.dispatcher
77 .dispatch_blocking(f)
78 .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn blocking task failed"))?
79 .await
80 .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))
81 }
82}
83
84impl IoVectoredBuf for Buffer {
85 unsafe fn iter_io_buffer(&self) -> impl Iterator<Item = IoBuffer> {
86 self.clone().map(|b| unsafe { b.as_io_buffer() })
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use bytes::Buf;
93 use bytes::Bytes;
94 use rand::Rng;
95 use rand::thread_rng;
96
97 use super::*;
98
99 fn setup_buffer() -> (Buffer, usize, Bytes) {
100 let mut rng = thread_rng();
101
102 let bs = (0..100)
103 .map(|_| {
104 let len = rng.gen_range(1..100);
105 let mut buf = vec![0; len];
106 rng.fill(&mut buf[..]);
107 Bytes::from(buf)
108 })
109 .collect::<Vec<_>>();
110
111 let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
112 let total_content = bs.iter().flatten().copied().collect::<Bytes>();
113 let buf = Buffer::from(bs);
114
115 (buf, total_size, total_content)
116 }
117
118 #[test]
119 fn test_io_buf() {
120 let (buf, _len, _bytes) = setup_buffer();
121 let slice = IoBuf::as_slice(&buf);
122
123 assert_eq!(slice, buf.current().chunk())
124 }
125}