opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use log::debug;
22
23use super::core::*;
24use super::delete::FsDeleter;
25use super::lister::FsLister;
26use super::reader::FsReader;
27use super::writer::FsWriter;
28use super::writer::FsWriters;
29use crate::raw::*;
30use crate::services::FsConfig;
31use crate::*;
32
33impl Configurator for FsConfig {
34    type Builder = FsBuilder;
35    fn into_builder(self) -> Self::Builder {
36        FsBuilder { config: self }
37    }
38}
39
40/// POSIX file system support.
41#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct FsBuilder {
44    config: FsConfig,
45}
46
47impl FsBuilder {
48    /// Set root for backend.
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58
59    /// Set temp dir for atomic write.
60    ///
61    /// # Notes
62    ///
63    /// - When append is enabled, we will not use atomic write
64    ///   to avoid data loss and performance issue.
65    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
66        if !dir.is_empty() {
67            self.config.atomic_write_dir = Some(dir.to_string());
68        }
69
70        self
71    }
72}
73
74impl Builder for FsBuilder {
75    const SCHEME: Scheme = Scheme::Fs;
76    type Config = FsConfig;
77
78    fn build(self) -> Result<impl Access> {
79        debug!("backend build started: {:?}", &self);
80
81        let root = match self.config.root.map(PathBuf::from) {
82            Some(root) => Ok(root),
83            None => Err(Error::new(
84                ErrorKind::ConfigInvalid,
85                "root is not specified",
86            )),
87        }?;
88        debug!("backend use root {}", root.to_string_lossy());
89
90        // If root dir is not exist, we must create it.
91        if let Err(e) = std::fs::metadata(&root) {
92            if e.kind() == std::io::ErrorKind::NotFound {
93                std::fs::create_dir_all(&root).map_err(|e| {
94                    Error::new(ErrorKind::Unexpected, "create root dir failed")
95                        .with_operation("Builder::build")
96                        .with_context("root", root.to_string_lossy())
97                        .set_source(e)
98                })?;
99            }
100        }
101
102        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
103
104        // If atomic write dir is not exist, we must create it.
105        if let Some(d) = &atomic_write_dir {
106            if let Err(e) = std::fs::metadata(d) {
107                if e.kind() == std::io::ErrorKind::NotFound {
108                    std::fs::create_dir_all(d).map_err(|e| {
109                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
110                            .with_operation("Builder::build")
111                            .with_context("atomic_write_dir", d.to_string_lossy())
112                            .set_source(e)
113                    })?;
114                }
115            }
116        }
117
118        // Canonicalize the root directory. This should work since we already know that we can
119        // get the metadata of the path.
120        let root = root.canonicalize().map_err(|e| {
121            Error::new(
122                ErrorKind::Unexpected,
123                "canonicalize of root directory failed",
124            )
125            .set_source(e)
126        })?;
127
128        // Canonicalize the atomic_write_dir directory. This should work since we already know that
129        // we can get the metadata of the path.
130        let atomic_write_dir = atomic_write_dir
131            .map(|p| {
132                p.canonicalize().map(Some).map_err(|e| {
133                    Error::new(
134                        ErrorKind::Unexpected,
135                        "canonicalize of atomic_write_dir directory failed",
136                    )
137                    .with_operation("Builder::build")
138                    .with_context("root", root.to_string_lossy())
139                    .set_source(e)
140                })
141            })
142            .unwrap_or(Ok(None))?;
143
144        Ok(FsBackend {
145            core: Arc::new(FsCore {
146                info: {
147                    let am = AccessorInfo::default();
148                    am.set_scheme(Scheme::Fs)
149                        .set_root(&root.to_string_lossy())
150                        .set_native_capability(Capability {
151                            stat: true,
152
153                            read: true,
154
155                            write: true,
156                            write_can_empty: true,
157                            write_can_append: true,
158                            write_can_multi: true,
159                            write_with_if_not_exists: true,
160
161                            create_dir: true,
162                            delete: true,
163
164                            list: true,
165
166                            copy: true,
167                            rename: true,
168
169                            shared: true,
170
171                            ..Default::default()
172                        });
173
174                    am.into()
175                },
176                root,
177                atomic_write_dir,
178                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
179            }),
180        })
181    }
182}
183
184/// Backend is used to serve `Accessor` support for posix-like fs.
185#[derive(Debug, Clone)]
186pub struct FsBackend {
187    core: Arc<FsCore>,
188}
189
190impl Access for FsBackend {
191    type Reader = FsReader<tokio::fs::File>;
192    type Writer = FsWriters;
193    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
194    type Deleter = oio::OneShotDeleter<FsDeleter>;
195
196    fn info(&self) -> Arc<AccessorInfo> {
197        self.core.info.clone()
198    }
199
200    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
201        self.core.fs_create_dir(path).await?;
202        Ok(RpCreateDir::default())
203    }
204
205    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
206        let m = self.core.fs_stat(path).await?;
207        Ok(RpStat::new(m))
208    }
209
210    /// # Notes
211    ///
212    /// There are three ways to get the total file length:
213    ///
214    /// - call std::fs::metadata directly and then open. (400ns)
215    /// - open file first, and then use `f.metadata()` (300ns)
216    /// - open file first, and then use `seek`. (100ns)
217    ///
218    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
219    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
220        let f = self.core.fs_read(path, &args).await?;
221        let r = FsReader::new(
222            self.core.clone(),
223            f,
224            args.range().size().unwrap_or(u64::MAX) as _,
225        );
226        Ok((RpRead::new(), r))
227    }
228
229    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
230        let is_append = op.append();
231        let concurrent = op.concurrent();
232
233        let writer = FsWriter::create(self.core.clone(), path, op).await?;
234
235        let writer = if is_append {
236            FsWriters::One(writer)
237        } else {
238            FsWriters::Two(oio::PositionWriter::new(
239                self.info().clone(),
240                writer,
241                concurrent,
242            ))
243        };
244
245        Ok((RpWrite::default(), writer))
246    }
247
248    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
249        Ok((
250            RpDelete::default(),
251            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
252        ))
253    }
254
255    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
256        match self.core.fs_list(path).await? {
257            Some(f) => {
258                let rd = FsLister::new(&self.core.root, path, f);
259                Ok((RpList::default(), Some(rd)))
260            }
261            None => Ok((RpList::default(), None)),
262        }
263    }
264
265    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
266        self.core.fs_copy(from, to).await?;
267        Ok(RpCopy::default())
268    }
269
270    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
271        self.core.fs_rename(from, to).await?;
272        Ok(RpRename::default())
273    }
274}