opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37    type Builder = FsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        FsBuilder { config: self }
40    }
41}
42
43/// POSIX file system support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47    config: FsConfig,
48}
49
50impl FsBuilder {
51    /// Set root for backend.
52    pub fn root(mut self, root: &str) -> Self {
53        self.config.root = if root.is_empty() {
54            None
55        } else {
56            Some(root.to_string())
57        };
58
59        self
60    }
61
62    /// Set temp dir for atomic write.
63    ///
64    /// # Notes
65    ///
66    /// - When append is enabled, we will not use atomic write
67    ///   to avoid data loss and performance issue.
68    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69        if !dir.is_empty() {
70            self.config.atomic_write_dir = Some(dir.to_string());
71        }
72
73        self
74    }
75}
76
77impl Builder for FsBuilder {
78    const SCHEME: Scheme = Scheme::Fs;
79    type Config = FsConfig;
80
81    fn build(self) -> Result<impl Access> {
82        debug!("backend build started: {:?}", &self);
83
84        let root = match self.config.root.map(PathBuf::from) {
85            Some(root) => Ok(root),
86            None => Err(Error::new(
87                ErrorKind::ConfigInvalid,
88                "root is not specified",
89            )),
90        }?;
91        debug!("backend use root {}", root.to_string_lossy());
92
93        // If root dir is not exist, we must create it.
94        if let Err(e) = std::fs::metadata(&root) {
95            if e.kind() == std::io::ErrorKind::NotFound {
96                std::fs::create_dir_all(&root).map_err(|e| {
97                    Error::new(ErrorKind::Unexpected, "create root dir failed")
98                        .with_operation("Builder::build")
99                        .with_context("root", root.to_string_lossy())
100                        .set_source(e)
101                })?;
102            }
103        }
104
105        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107        // If atomic write dir is not exist, we must create it.
108        if let Some(d) = &atomic_write_dir {
109            if let Err(e) = std::fs::metadata(d) {
110                if e.kind() == std::io::ErrorKind::NotFound {
111                    std::fs::create_dir_all(d).map_err(|e| {
112                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113                            .with_operation("Builder::build")
114                            .with_context("atomic_write_dir", d.to_string_lossy())
115                            .set_source(e)
116                    })?;
117                }
118            }
119        }
120
121        // Canonicalize the root directory. This should work since we already know that we can
122        // get the metadata of the path.
123        let root = root.canonicalize().map_err(|e| {
124            Error::new(
125                ErrorKind::Unexpected,
126                "canonicalize of root directory failed",
127            )
128            .with_operation("Builder::build")
129            .with_context("root", root.to_string_lossy())
130            .set_source(e)
131        })?;
132
133        // Canonicalize the atomic_write_dir directory. This should work since we already know that
134        // we can get the metadata of the path.
135        let atomic_write_dir = atomic_write_dir
136            .map(|p| {
137                p.canonicalize().map(Some).map_err(|e| {
138                    Error::new(
139                        ErrorKind::Unexpected,
140                        "canonicalize of atomic_write_dir directory failed",
141                    )
142                    .with_operation("Builder::build")
143                    .with_context("root", root.to_string_lossy())
144                    .set_source(e)
145                })
146            })
147            .unwrap_or(Ok(None))?;
148
149        Ok(FsBackend {
150            core: Arc::new(FsCore {
151                info: {
152                    let am = AccessorInfo::default();
153                    am.set_scheme(Scheme::Fs)
154                        .set_root(&root.to_string_lossy())
155                        .set_native_capability(Capability {
156                            stat: true,
157                            stat_has_content_length: true,
158                            stat_has_last_modified: true,
159
160                            read: true,
161
162                            write: true,
163                            write_can_empty: true,
164                            write_can_append: true,
165                            write_can_multi: true,
166                            write_with_if_not_exists: true,
167
168                            create_dir: true,
169                            delete: true,
170
171                            list: true,
172
173                            copy: true,
174                            rename: true,
175                            blocking: true,
176
177                            shared: true,
178
179                            ..Default::default()
180                        });
181
182                    am.into()
183                },
184                root,
185                atomic_write_dir,
186                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187            }),
188        })
189    }
190}
191
192/// Backend is used to serve `Accessor` support for posix-like fs.
193#[derive(Debug, Clone)]
194pub struct FsBackend {
195    core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199    type Reader = FsReader<tokio::fs::File>;
200    type Writer = FsWriters;
201    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202    type Deleter = oio::OneShotDeleter<FsDeleter>;
203    type BlockingReader = FsReader<std::fs::File>;
204    type BlockingWriter = FsWriter<std::fs::File>;
205    type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206    type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208    fn info(&self) -> Arc<AccessorInfo> {
209        self.core.info.clone()
210    }
211
212    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213        let p = self.core.root.join(path.trim_end_matches('/'));
214
215        tokio::fs::create_dir_all(&p)
216            .await
217            .map_err(new_std_io_error)?;
218
219        Ok(RpCreateDir::default())
220    }
221
222    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223        let p = self.core.root.join(path.trim_end_matches('/'));
224
225        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227        let mode = if meta.is_dir() {
228            EntryMode::DIR
229        } else if meta.is_file() {
230            EntryMode::FILE
231        } else {
232            EntryMode::Unknown
233        };
234        let m = Metadata::new(mode)
235            .with_content_length(meta.len())
236            .with_last_modified(
237                meta.modified()
238                    .map(DateTime::from)
239                    .map_err(new_std_io_error)?,
240            );
241
242        Ok(RpStat::new(m))
243    }
244
245    /// # Notes
246    ///
247    /// There are three ways to get the total file length:
248    ///
249    /// - call std::fs::metadata directly and then open. (400ns)
250    /// - open file first, and then use `f.metadata()` (300ns)
251    /// - open file first, and then use `seek`. (100ns)
252    ///
253    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
254    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255        let p = self.core.root.join(path.trim_end_matches('/'));
256
257        let mut f = tokio::fs::OpenOptions::new()
258            .read(true)
259            .open(&p)
260            .await
261            .map_err(new_std_io_error)?;
262
263        if args.range().offset() != 0 {
264            use tokio::io::AsyncSeekExt;
265
266            f.seek(SeekFrom::Start(args.range().offset()))
267                .await
268                .map_err(new_std_io_error)?;
269        }
270
271        let r = FsReader::new(
272            self.core.clone(),
273            f,
274            args.range().size().unwrap_or(u64::MAX) as _,
275        );
276        Ok((RpRead::new(), r))
277    }
278
279    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281            let target_path = self
282                .core
283                .ensure_write_abs_path(&self.core.root, path)
284                .await?;
285            let tmp_path = self
286                .core
287                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288                .await?;
289
290            // If the target file exists, we should append to the end of it directly.
291            if op.append()
292                && tokio::fs::try_exists(&target_path)
293                    .await
294                    .map_err(new_std_io_error)?
295            {
296                (target_path, None)
297            } else {
298                (target_path, Some(tmp_path))
299            }
300        } else {
301            let p = self
302                .core
303                .ensure_write_abs_path(&self.core.root, path)
304                .await?;
305
306            (p, None)
307        };
308
309        let mut open_options = tokio::fs::OpenOptions::new();
310        if op.if_not_exists() {
311            open_options.create_new(true);
312        } else {
313            open_options.create(true);
314        }
315
316        open_options.write(true);
317
318        if op.append() {
319            open_options.append(true);
320        } else {
321            open_options.truncate(true);
322        }
323
324        let f = open_options
325            .open(tmp_path.as_ref().unwrap_or(&target_path))
326            .await
327            .map_err(|e| {
328                match e.kind() {
329                    std::io::ErrorKind::AlreadyExists => {
330                        // Map io AlreadyExists to opendal ConditionNotMatch
331                        Error::new(
332                            ErrorKind::ConditionNotMatch,
333                            "The file already exists in the filesystem",
334                        )
335                        .set_source(e)
336                    }
337                    _ => new_std_io_error(e),
338                }
339            })?;
340
341        let w = FsWriter::new(target_path, tmp_path, f);
342
343        let w = if op.append() {
344            FsWriters::One(w)
345        } else {
346            FsWriters::Two(oio::PositionWriter::new(
347                self.info().clone(),
348                w,
349                op.concurrent(),
350            ))
351        };
352
353        Ok((RpWrite::default(), w))
354    }
355
356    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357        Ok((
358            RpDelete::default(),
359            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360        ))
361    }
362
363    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364        let p = self.core.root.join(path.trim_end_matches('/'));
365
366        let f = match tokio::fs::read_dir(&p).await {
367            Ok(rd) => rd,
368            Err(e) => {
369                return match e.kind() {
370                    // Return empty list if the directory not found
371                    std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
372                    // If the path is not a directory, return an empty list
373                    //
374                    // The path could be a file or a symbolic link in this case.
375                    // Returning a NotADirectory error to the user isn't helpful; instead,
376                    // providing an empty directory is a more user-friendly. In fact, the dir
377                    // `path/` does not exist.
378                    std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
379                    _ => Err(new_std_io_error(e)),
380                };
381            }
382        };
383
384        let rd = FsLister::new(&self.core.root, path, f);
385        Ok((RpList::default(), Some(rd)))
386    }
387
388    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
389        let from = self.core.root.join(from.trim_end_matches('/'));
390
391        // try to get the metadata of the source file to ensure it exists
392        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
393
394        let to = self
395            .core
396            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
397            .await?;
398
399        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
400
401        Ok(RpCopy::default())
402    }
403
404    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
405        let from = self.core.root.join(from.trim_end_matches('/'));
406
407        // try to get the metadata of the source file to ensure it exists
408        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
409
410        let to = self
411            .core
412            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
413            .await?;
414
415        tokio::fs::rename(from, to)
416            .await
417            .map_err(new_std_io_error)?;
418
419        Ok(RpRename::default())
420    }
421
422    fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
423        let p = self.core.root.join(path.trim_end_matches('/'));
424
425        std::fs::create_dir_all(p).map_err(new_std_io_error)?;
426
427        Ok(RpCreateDir::default())
428    }
429
430    fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
431        let p = self.core.root.join(path.trim_end_matches('/'));
432
433        let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
434
435        let mode = if meta.is_dir() {
436            EntryMode::DIR
437        } else if meta.is_file() {
438            EntryMode::FILE
439        } else {
440            EntryMode::Unknown
441        };
442        let m = Metadata::new(mode)
443            .with_content_length(meta.len())
444            .with_last_modified(
445                meta.modified()
446                    .map(DateTime::from)
447                    .map_err(new_std_io_error)?,
448            );
449
450        Ok(RpStat::new(m))
451    }
452
453    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
454        let p = self.core.root.join(path.trim_end_matches('/'));
455
456        let mut f = std::fs::OpenOptions::new()
457            .read(true)
458            .open(p)
459            .map_err(new_std_io_error)?;
460
461        if args.range().offset() != 0 {
462            use std::io::Seek;
463
464            f.seek(SeekFrom::Start(args.range().offset()))
465                .map_err(new_std_io_error)?;
466        }
467
468        let r = FsReader::new(
469            self.core.clone(),
470            f,
471            args.range().size().unwrap_or(u64::MAX) as _,
472        );
473        Ok((RpRead::new(), r))
474    }
475
476    fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
477        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
478            let target_path = self
479                .core
480                .blocking_ensure_write_abs_path(&self.core.root, path)?;
481            let tmp_path = self
482                .core
483                .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
484
485            // If the target file exists, we should append to the end of it directly.
486            if op.append()
487                && Path::new(&target_path)
488                    .try_exists()
489                    .map_err(new_std_io_error)?
490            {
491                (target_path, None)
492            } else {
493                (target_path, Some(tmp_path))
494            }
495        } else {
496            let p = self
497                .core
498                .blocking_ensure_write_abs_path(&self.core.root, path)?;
499
500            (p, None)
501        };
502
503        let mut f = std::fs::OpenOptions::new();
504
505        if op.if_not_exists() {
506            f.create_new(true);
507        } else {
508            f.create(true);
509        }
510
511        f.write(true);
512
513        if op.append() {
514            f.append(true);
515        } else {
516            f.truncate(true);
517        }
518
519        let f = f
520            .open(tmp_path.as_ref().unwrap_or(&target_path))
521            .map_err(|e| {
522                match e.kind() {
523                    std::io::ErrorKind::AlreadyExists => {
524                        // Map io AlreadyExists to opendal ConditionNotMatch
525                        Error::new(
526                            ErrorKind::ConditionNotMatch,
527                            "The file already exists in the filesystem",
528                        )
529                        .set_source(e)
530                    }
531                    _ => new_std_io_error(e),
532                }
533            })?;
534
535        Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
536    }
537
538    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
539        Ok((
540            RpDelete::default(),
541            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
542        ))
543    }
544
545    fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
546        let p = self.core.root.join(path.trim_end_matches('/'));
547
548        let f = match std::fs::read_dir(p) {
549            Ok(rd) => rd,
550            Err(e) => {
551                return match e.kind() {
552                    // Return empty list if the directory not found
553                    std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
554                    // If the path is not a directory, return an empty list
555                    //
556                    // The path could be a file or a symbolic link in this case.
557                    // Returning a NotADirectory error to the user isn't helpful; instead,
558                    // providing an empty directory is a more user-friendly. In fact, the dir
559                    // `path/` does not exist.
560                    std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
561                    _ => Err(new_std_io_error(e)),
562                };
563            }
564        };
565
566        let rd = FsLister::new(&self.core.root, path, f);
567        Ok((RpList::default(), Some(rd)))
568    }
569
570    fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
571        let from = self.core.root.join(from.trim_end_matches('/'));
572
573        // try to get the metadata of the source file to ensure it exists
574        std::fs::metadata(&from).map_err(new_std_io_error)?;
575
576        let to = self
577            .core
578            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
579
580        std::fs::copy(from, to).map_err(new_std_io_error)?;
581
582        Ok(RpCopy::default())
583    }
584
585    fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
586        let from = self.core.root.join(from.trim_end_matches('/'));
587
588        // try to get the metadata of the source file to ensure it exists
589        std::fs::metadata(&from).map_err(new_std_io_error)?;
590
591        let to = self
592            .core
593            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
594
595        std::fs::rename(from, to).map_err(new_std_io_error)?;
596
597        Ok(RpRename::default())
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604
605    #[test]
606    fn test_tmp_file_of() {
607        let cases = vec![
608            ("hello.txt", "hello.txt"),
609            ("/tmp/opendal.log", "opendal.log"),
610            ("/abc/def/hello.parquet", "hello.parquet"),
611        ];
612
613        for (path, expected_prefix) in cases {
614            let tmp_file = tmp_file_of(path);
615            assert!(tmp_file.len() > expected_prefix.len());
616            assert!(tmp_file.starts_with(expected_prefix));
617        }
618    }
619}