opendal/services/compfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use compio::buf::{IoBuf, IoBuffer, IoVectoredBuf};
23use compio::dispatcher::Dispatcher;
24
25use crate::raw::*;
26use crate::*;
27
28unsafe impl IoBuf for Buffer {
29    fn as_buf_ptr(&self) -> *const u8 {
30        self.current().as_ptr()
31    }
32
33    fn buf_len(&self) -> usize {
34        self.current().len()
35    }
36
37    fn buf_capacity(&self) -> usize {
38        // `Bytes` doesn't expose uninitialized capacity, so treat it as the same as `len`
39        self.current().len()
40    }
41}
42
43#[derive(Debug)]
44pub(super) struct CompfsCore {
45    pub info: Arc<AccessorInfo>,
46
47    pub root: PathBuf,
48    pub dispatcher: Dispatcher,
49    pub buf_pool: oio::PooledBuf,
50}
51
52impl CompfsCore {
53    pub fn prepare_path(&self, path: &str) -> PathBuf {
54        self.root.join(path.trim_end_matches('/'))
55    }
56
57    pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
58    where
59        Fn: FnOnce() -> Fut + Send + 'static,
60        Fut: Future<Output = std::io::Result<R>> + 'static,
61        R: Send + 'static,
62    {
63        self.dispatcher
64            .dispatch(f)
65            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn io task failed"))?
66            .await
67            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))?
68            .map_err(new_std_io_error)
69    }
70
71    pub async fn exec_blocking<Fn, R>(&self, f: Fn) -> Result<R>
72    where
73        Fn: FnOnce() -> R + Send + 'static,
74        R: Send + 'static,
75    {
76        self.dispatcher
77            .dispatch_blocking(f)
78            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn blocking task failed"))?
79            .await
80            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))
81    }
82}
83
84impl IoVectoredBuf for Buffer {
85    unsafe fn iter_io_buffer(&self) -> impl Iterator<Item = IoBuffer> {
86        self.clone().map(|b| unsafe { b.as_io_buffer() })
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use bytes::Buf;
93    use bytes::Bytes;
94    use rand::Rng;
95    use rand::thread_rng;
96
97    use super::*;
98
99    fn setup_buffer() -> (Buffer, usize, Bytes) {
100        let mut rng = thread_rng();
101
102        let bs = (0..100)
103            .map(|_| {
104                let len = rng.gen_range(1..100);
105                let mut buf = vec![0; len];
106                rng.fill(&mut buf[..]);
107                Bytes::from(buf)
108            })
109            .collect::<Vec<_>>();
110
111        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
112        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
113        let buf = Buffer::from(bs);
114
115        (buf, total_size, total_content)
116    }
117
118    #[test]
119    fn test_io_buf() {
120        let (buf, _len, _bytes) = setup_buffer();
121        let slice = IoBuf::as_slice(&buf);
122
123        assert_eq!(slice, buf.current().chunk())
124    }
125}