1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37 type Builder = FsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 FsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47 config: FsConfig,
48}
49
50impl FsBuilder {
51 pub fn root(mut self, root: &str) -> Self {
53 self.config.root = if root.is_empty() {
54 None
55 } else {
56 Some(root.to_string())
57 };
58
59 self
60 }
61
62 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69 if !dir.is_empty() {
70 self.config.atomic_write_dir = Some(dir.to_string());
71 }
72
73 self
74 }
75}
76
77impl Builder for FsBuilder {
78 const SCHEME: Scheme = Scheme::Fs;
79 type Config = FsConfig;
80
81 fn build(self) -> Result<impl Access> {
82 debug!("backend build started: {:?}", &self);
83
84 let root = match self.config.root.map(PathBuf::from) {
85 Some(root) => Ok(root),
86 None => Err(Error::new(
87 ErrorKind::ConfigInvalid,
88 "root is not specified",
89 )),
90 }?;
91 debug!("backend use root {}", root.to_string_lossy());
92
93 if let Err(e) = std::fs::metadata(&root) {
95 if e.kind() == std::io::ErrorKind::NotFound {
96 std::fs::create_dir_all(&root).map_err(|e| {
97 Error::new(ErrorKind::Unexpected, "create root dir failed")
98 .with_operation("Builder::build")
99 .with_context("root", root.to_string_lossy())
100 .set_source(e)
101 })?;
102 }
103 }
104
105 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107 if let Some(d) = &atomic_write_dir {
109 if let Err(e) = std::fs::metadata(d) {
110 if e.kind() == std::io::ErrorKind::NotFound {
111 std::fs::create_dir_all(d).map_err(|e| {
112 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113 .with_operation("Builder::build")
114 .with_context("atomic_write_dir", d.to_string_lossy())
115 .set_source(e)
116 })?;
117 }
118 }
119 }
120
121 let root = root.canonicalize().map_err(|e| {
124 Error::new(
125 ErrorKind::Unexpected,
126 "canonicalize of root directory failed",
127 )
128 .with_operation("Builder::build")
129 .with_context("root", root.to_string_lossy())
130 .set_source(e)
131 })?;
132
133 let atomic_write_dir = atomic_write_dir
136 .map(|p| {
137 p.canonicalize().map(Some).map_err(|e| {
138 Error::new(
139 ErrorKind::Unexpected,
140 "canonicalize of atomic_write_dir directory failed",
141 )
142 .with_operation("Builder::build")
143 .with_context("root", root.to_string_lossy())
144 .set_source(e)
145 })
146 })
147 .unwrap_or(Ok(None))?;
148
149 Ok(FsBackend {
150 core: Arc::new(FsCore {
151 info: {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::Fs)
154 .set_root(&root.to_string_lossy())
155 .set_native_capability(Capability {
156 stat: true,
157 stat_has_content_length: true,
158 stat_has_last_modified: true,
159
160 read: true,
161
162 write: true,
163 write_can_empty: true,
164 write_can_append: true,
165 write_can_multi: true,
166 write_with_if_not_exists: true,
167
168 create_dir: true,
169 delete: true,
170
171 list: true,
172
173 copy: true,
174 rename: true,
175 blocking: true,
176
177 shared: true,
178
179 ..Default::default()
180 });
181
182 am.into()
183 },
184 root,
185 atomic_write_dir,
186 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187 }),
188 })
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct FsBackend {
195 core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199 type Reader = FsReader<tokio::fs::File>;
200 type Writer = FsWriters;
201 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202 type Deleter = oio::OneShotDeleter<FsDeleter>;
203 type BlockingReader = FsReader<std::fs::File>;
204 type BlockingWriter = FsWriter<std::fs::File>;
205 type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206 type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208 fn info(&self) -> Arc<AccessorInfo> {
209 self.core.info.clone()
210 }
211
212 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213 let p = self.core.root.join(path.trim_end_matches('/'));
214
215 tokio::fs::create_dir_all(&p)
216 .await
217 .map_err(new_std_io_error)?;
218
219 Ok(RpCreateDir::default())
220 }
221
222 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223 let p = self.core.root.join(path.trim_end_matches('/'));
224
225 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227 let mode = if meta.is_dir() {
228 EntryMode::DIR
229 } else if meta.is_file() {
230 EntryMode::FILE
231 } else {
232 EntryMode::Unknown
233 };
234 let m = Metadata::new(mode)
235 .with_content_length(meta.len())
236 .with_last_modified(
237 meta.modified()
238 .map(DateTime::from)
239 .map_err(new_std_io_error)?,
240 );
241
242 Ok(RpStat::new(m))
243 }
244
245 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255 let p = self.core.root.join(path.trim_end_matches('/'));
256
257 let mut f = tokio::fs::OpenOptions::new()
258 .read(true)
259 .open(&p)
260 .await
261 .map_err(new_std_io_error)?;
262
263 if args.range().offset() != 0 {
264 use tokio::io::AsyncSeekExt;
265
266 f.seek(SeekFrom::Start(args.range().offset()))
267 .await
268 .map_err(new_std_io_error)?;
269 }
270
271 let r = FsReader::new(
272 self.core.clone(),
273 f,
274 args.range().size().unwrap_or(u64::MAX) as _,
275 );
276 Ok((RpRead::new(), r))
277 }
278
279 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281 let target_path = self
282 .core
283 .ensure_write_abs_path(&self.core.root, path)
284 .await?;
285 let tmp_path = self
286 .core
287 .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288 .await?;
289
290 if op.append()
292 && tokio::fs::try_exists(&target_path)
293 .await
294 .map_err(new_std_io_error)?
295 {
296 (target_path, None)
297 } else {
298 (target_path, Some(tmp_path))
299 }
300 } else {
301 let p = self
302 .core
303 .ensure_write_abs_path(&self.core.root, path)
304 .await?;
305
306 (p, None)
307 };
308
309 let mut open_options = tokio::fs::OpenOptions::new();
310 if op.if_not_exists() {
311 open_options.create_new(true);
312 } else {
313 open_options.create(true);
314 }
315
316 open_options.write(true);
317
318 if op.append() {
319 open_options.append(true);
320 } else {
321 open_options.truncate(true);
322 }
323
324 let f = open_options
325 .open(tmp_path.as_ref().unwrap_or(&target_path))
326 .await
327 .map_err(|e| {
328 match e.kind() {
329 std::io::ErrorKind::AlreadyExists => {
330 Error::new(
332 ErrorKind::ConditionNotMatch,
333 "The file already exists in the filesystem",
334 )
335 .set_source(e)
336 }
337 _ => new_std_io_error(e),
338 }
339 })?;
340
341 let w = FsWriter::new(target_path, tmp_path, f);
342
343 let w = if op.append() {
344 FsWriters::One(w)
345 } else {
346 FsWriters::Two(oio::PositionWriter::new(
347 self.info().clone(),
348 w,
349 op.concurrent(),
350 ))
351 };
352
353 Ok((RpWrite::default(), w))
354 }
355
356 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357 Ok((
358 RpDelete::default(),
359 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360 ))
361 }
362
363 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364 let p = self.core.root.join(path.trim_end_matches('/'));
365
366 let f = match tokio::fs::read_dir(&p).await {
367 Ok(rd) => rd,
368 Err(e) => {
369 return match e.kind() {
370 std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
372 std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
379 _ => Err(new_std_io_error(e)),
380 };
381 }
382 };
383
384 let rd = FsLister::new(&self.core.root, path, f);
385 Ok((RpList::default(), Some(rd)))
386 }
387
388 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
389 let from = self.core.root.join(from.trim_end_matches('/'));
390
391 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
393
394 let to = self
395 .core
396 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
397 .await?;
398
399 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
400
401 Ok(RpCopy::default())
402 }
403
404 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
405 let from = self.core.root.join(from.trim_end_matches('/'));
406
407 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
409
410 let to = self
411 .core
412 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
413 .await?;
414
415 tokio::fs::rename(from, to)
416 .await
417 .map_err(new_std_io_error)?;
418
419 Ok(RpRename::default())
420 }
421
422 fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
423 let p = self.core.root.join(path.trim_end_matches('/'));
424
425 std::fs::create_dir_all(p).map_err(new_std_io_error)?;
426
427 Ok(RpCreateDir::default())
428 }
429
430 fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
431 let p = self.core.root.join(path.trim_end_matches('/'));
432
433 let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
434
435 let mode = if meta.is_dir() {
436 EntryMode::DIR
437 } else if meta.is_file() {
438 EntryMode::FILE
439 } else {
440 EntryMode::Unknown
441 };
442 let m = Metadata::new(mode)
443 .with_content_length(meta.len())
444 .with_last_modified(
445 meta.modified()
446 .map(DateTime::from)
447 .map_err(new_std_io_error)?,
448 );
449
450 Ok(RpStat::new(m))
451 }
452
453 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
454 let p = self.core.root.join(path.trim_end_matches('/'));
455
456 let mut f = std::fs::OpenOptions::new()
457 .read(true)
458 .open(p)
459 .map_err(new_std_io_error)?;
460
461 if args.range().offset() != 0 {
462 use std::io::Seek;
463
464 f.seek(SeekFrom::Start(args.range().offset()))
465 .map_err(new_std_io_error)?;
466 }
467
468 let r = FsReader::new(
469 self.core.clone(),
470 f,
471 args.range().size().unwrap_or(u64::MAX) as _,
472 );
473 Ok((RpRead::new(), r))
474 }
475
476 fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
477 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
478 let target_path = self
479 .core
480 .blocking_ensure_write_abs_path(&self.core.root, path)?;
481 let tmp_path = self
482 .core
483 .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
484
485 if op.append()
487 && Path::new(&target_path)
488 .try_exists()
489 .map_err(new_std_io_error)?
490 {
491 (target_path, None)
492 } else {
493 (target_path, Some(tmp_path))
494 }
495 } else {
496 let p = self
497 .core
498 .blocking_ensure_write_abs_path(&self.core.root, path)?;
499
500 (p, None)
501 };
502
503 let mut f = std::fs::OpenOptions::new();
504
505 if op.if_not_exists() {
506 f.create_new(true);
507 } else {
508 f.create(true);
509 }
510
511 f.write(true);
512
513 if op.append() {
514 f.append(true);
515 } else {
516 f.truncate(true);
517 }
518
519 let f = f
520 .open(tmp_path.as_ref().unwrap_or(&target_path))
521 .map_err(|e| {
522 match e.kind() {
523 std::io::ErrorKind::AlreadyExists => {
524 Error::new(
526 ErrorKind::ConditionNotMatch,
527 "The file already exists in the filesystem",
528 )
529 .set_source(e)
530 }
531 _ => new_std_io_error(e),
532 }
533 })?;
534
535 Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
536 }
537
538 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
539 Ok((
540 RpDelete::default(),
541 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
542 ))
543 }
544
545 fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
546 let p = self.core.root.join(path.trim_end_matches('/'));
547
548 let f = match std::fs::read_dir(p) {
549 Ok(rd) => rd,
550 Err(e) => {
551 return match e.kind() {
552 std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
554 std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
561 _ => Err(new_std_io_error(e)),
562 };
563 }
564 };
565
566 let rd = FsLister::new(&self.core.root, path, f);
567 Ok((RpList::default(), Some(rd)))
568 }
569
570 fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
571 let from = self.core.root.join(from.trim_end_matches('/'));
572
573 std::fs::metadata(&from).map_err(new_std_io_error)?;
575
576 let to = self
577 .core
578 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
579
580 std::fs::copy(from, to).map_err(new_std_io_error)?;
581
582 Ok(RpCopy::default())
583 }
584
585 fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
586 let from = self.core.root.join(from.trim_end_matches('/'));
587
588 std::fs::metadata(&from).map_err(new_std_io_error)?;
590
591 let to = self
592 .core
593 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
594
595 std::fs::rename(from, to).map_err(new_std_io_error)?;
596
597 Ok(RpRename::default())
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn test_tmp_file_of() {
607 let cases = vec![
608 ("hello.txt", "hello.txt"),
609 ("/tmp/opendal.log", "opendal.log"),
610 ("/abc/def/hello.parquet", "hello.parquet"),
611 ];
612
613 for (path, expected_prefix) in cases {
614 let tmp_file = tmp_file_of(path);
615 assert!(tmp_file.len() > expected_prefix.len());
616 assert!(tmp_file.starts_with(expected_prefix));
617 }
618 }
619}