opendal/services/compfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35    type Builder = CompfsBuilder;
36    fn into_builder(self) -> Self::Builder {
37        CompfsBuilder { config: self }
38    }
39}
40
41/// [`compio`]-based file system support.
42#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44    config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48    /// Set root for Compfs
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58}
59
60impl Builder for CompfsBuilder {
61    const SCHEME: Scheme = Scheme::Compfs;
62    type Config = CompfsConfig;
63
64    fn build(self) -> Result<impl Access> {
65        let root = match self.config.root {
66            Some(root) => Ok(root),
67            None => Err(Error::new(
68                ErrorKind::ConfigInvalid,
69                "root is not specified",
70            )),
71        }?;
72
73        // If root dir does not exist, we must create it.
74        if let Err(e) = std::fs::metadata(&root) {
75            if e.kind() == std::io::ErrorKind::NotFound {
76                std::fs::create_dir_all(&root).map_err(|e| {
77                    Error::new(ErrorKind::Unexpected, "create root dir failed")
78                        .with_operation("Builder::build")
79                        .with_context("root", root.as_str())
80                        .set_source(e)
81                })?;
82            }
83        }
84
85        let dispatcher = Dispatcher::new().map_err(|_| {
86            Error::new(
87                ErrorKind::Unexpected,
88                "failed to initiate compio dispatcher",
89            )
90        })?;
91        let core = CompfsCore {
92            info: {
93                let am = AccessorInfo::default();
94                am.set_scheme(Scheme::Compfs)
95                    .set_root(&root)
96                    .set_native_capability(Capability {
97                        stat: true,
98
99                        read: true,
100
101                        write: true,
102                        write_can_empty: true,
103                        write_can_multi: true,
104                        create_dir: true,
105                        delete: true,
106
107                        list: true,
108
109                        copy: true,
110                        rename: true,
111
112                        shared: true,
113
114                        ..Default::default()
115                    });
116
117                am.into()
118            },
119            root: root.into(),
120            dispatcher,
121            buf_pool: oio::PooledBuf::new(16),
122        };
123        Ok(CompfsBackend {
124            core: Arc::new(core),
125        })
126    }
127}
128
129#[derive(Debug)]
130pub struct CompfsBackend {
131    core: Arc<CompfsCore>,
132}
133
134impl Access for CompfsBackend {
135    type Reader = CompfsReader;
136    type Writer = CompfsWriter;
137    type Lister = Option<CompfsLister>;
138    type Deleter = OneShotDeleter<CompfsDeleter>;
139
140    fn info(&self) -> Arc<AccessorInfo> {
141        self.core.info.clone()
142    }
143
144    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
145        let path = self.core.prepare_path(path);
146
147        self.core
148            .exec(move || async move { compio::fs::create_dir_all(path).await })
149            .await?;
150
151        Ok(RpCreateDir::default())
152    }
153
154    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
155        let path = self.core.prepare_path(path);
156
157        let meta = self
158            .core
159            .exec(move || async move { compio::fs::metadata(path).await })
160            .await?;
161        let ty = meta.file_type();
162        let mode = if ty.is_dir() {
163            EntryMode::DIR
164        } else if ty.is_file() {
165            EntryMode::FILE
166        } else {
167            EntryMode::Unknown
168        };
169        let last_mod = meta.modified().map_err(new_std_io_error)?.into();
170        let ret = Metadata::new(mode)
171            .with_last_modified(last_mod)
172            .with_content_length(meta.len());
173
174        Ok(RpStat::new(ret))
175    }
176
177    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
178        Ok((
179            RpDelete::default(),
180            OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
181        ))
182    }
183
184    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
185        let from = self.core.prepare_path(from);
186        let to = self.core.prepare_path(to);
187
188        self.core
189            .exec(move || async move {
190                let from = OpenOptions::new().read(true).open(from).await?;
191                if let Some(parent) = to.parent() {
192                    compio::fs::create_dir_all(parent).await?;
193                }
194                let to = OpenOptions::new()
195                    .write(true)
196                    .create(true)
197                    .truncate(true)
198                    .open(to)
199                    .await?;
200
201                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
202                compio::io::copy(&mut from, &mut to).await?;
203
204                Ok(())
205            })
206            .await?;
207
208        Ok(RpCopy::default())
209    }
210
211    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
212        let from = self.core.prepare_path(from);
213        let to = self.core.prepare_path(to);
214
215        self.core
216            .exec(move || async move {
217                if let Some(parent) = to.parent() {
218                    compio::fs::create_dir_all(parent).await?;
219                }
220                compio::fs::rename(from, to).await
221            })
222            .await?;
223
224        Ok(RpRename::default())
225    }
226
227    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
228        let path = self.core.prepare_path(path);
229
230        let file = self
231            .core
232            .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
233            .await?;
234
235        let r = CompfsReader::new(self.core.clone(), file, op.range());
236        Ok((RpRead::new(), r))
237    }
238
239    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
240        let path = self.core.prepare_path(path);
241        let append = args.append();
242        let file = self
243            .core
244            .exec(move || async move {
245                if let Some(parent) = path.parent() {
246                    compio::fs::create_dir_all(parent).await?;
247                }
248                let file = compio::fs::OpenOptions::new()
249                    .create(true)
250                    .write(true)
251                    .truncate(!append)
252                    .open(path)
253                    .await?;
254                let mut file = Cursor::new(file);
255                if append {
256                    let len = file.get_ref().metadata().await?.len();
257                    file.set_position(len);
258                }
259                Ok(file)
260            })
261            .await?;
262
263        let w = CompfsWriter::new(self.core.clone(), file);
264        Ok((RpWrite::new(), w))
265    }
266
267    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
268        let path = self.core.prepare_path(path);
269
270        let read_dir = match self
271            .core
272            .exec_blocking({
273                let path = path.clone();
274                move || std::fs::read_dir(path)
275            })
276            .await?
277        {
278            Ok(rd) => rd,
279            Err(e) => {
280                return if e.kind() == std::io::ErrorKind::NotFound {
281                    Ok((RpList::default(), None))
282                } else {
283                    Err(new_std_io_error(e))
284                };
285            }
286        };
287
288        let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
289        Ok((RpList::default(), Some(lister)))
290    }
291}