opendal/services/fs/
backend.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use log::debug;
22
23use super::core::*;
24use super::delete::FsDeleter;
25use super::lister::FsLister;
26use super::reader::FsReader;
27use super::writer::FsWriter;
28use super::writer::FsWriters;
29use crate::raw::*;
30use crate::services::FsConfig;
31use crate::*;
32
33impl Configurator for FsConfig {
34 type Builder = FsBuilder;
35 fn into_builder(self) -> Self::Builder {
36 FsBuilder { config: self }
37 }
38}
39
40#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct FsBuilder {
44 config: FsConfig,
45}
46
47impl FsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58
59 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
66 if !dir.is_empty() {
67 self.config.atomic_write_dir = Some(dir.to_string());
68 }
69
70 self
71 }
72}
73
74impl Builder for FsBuilder {
75 const SCHEME: Scheme = Scheme::Fs;
76 type Config = FsConfig;
77
78 fn build(self) -> Result<impl Access> {
79 debug!("backend build started: {:?}", &self);
80
81 let root = match self.config.root.map(PathBuf::from) {
82 Some(root) => Ok(root),
83 None => Err(Error::new(
84 ErrorKind::ConfigInvalid,
85 "root is not specified",
86 )),
87 }?;
88 debug!("backend use root {}", root.to_string_lossy());
89
90 if let Err(e) = std::fs::metadata(&root) {
92 if e.kind() == std::io::ErrorKind::NotFound {
93 std::fs::create_dir_all(&root).map_err(|e| {
94 Error::new(ErrorKind::Unexpected, "create root dir failed")
95 .with_operation("Builder::build")
96 .with_context("root", root.to_string_lossy())
97 .set_source(e)
98 })?;
99 }
100 }
101
102 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
103
104 if let Some(d) = &atomic_write_dir {
106 if let Err(e) = std::fs::metadata(d) {
107 if e.kind() == std::io::ErrorKind::NotFound {
108 std::fs::create_dir_all(d).map_err(|e| {
109 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
110 .with_operation("Builder::build")
111 .with_context("atomic_write_dir", d.to_string_lossy())
112 .set_source(e)
113 })?;
114 }
115 }
116 }
117
118 let root = root.canonicalize().map_err(|e| {
121 Error::new(
122 ErrorKind::Unexpected,
123 "canonicalize of root directory failed",
124 )
125 .set_source(e)
126 })?;
127
128 let atomic_write_dir = atomic_write_dir
131 .map(|p| {
132 p.canonicalize().map(Some).map_err(|e| {
133 Error::new(
134 ErrorKind::Unexpected,
135 "canonicalize of atomic_write_dir directory failed",
136 )
137 .with_operation("Builder::build")
138 .with_context("root", root.to_string_lossy())
139 .set_source(e)
140 })
141 })
142 .unwrap_or(Ok(None))?;
143
144 Ok(FsBackend {
145 core: Arc::new(FsCore {
146 info: {
147 let am = AccessorInfo::default();
148 am.set_scheme(Scheme::Fs)
149 .set_root(&root.to_string_lossy())
150 .set_native_capability(Capability {
151 stat: true,
152
153 read: true,
154
155 write: true,
156 write_can_empty: true,
157 write_can_append: true,
158 write_can_multi: true,
159 write_with_if_not_exists: true,
160
161 create_dir: true,
162 delete: true,
163
164 list: true,
165
166 copy: true,
167 rename: true,
168
169 shared: true,
170
171 ..Default::default()
172 });
173
174 am.into()
175 },
176 root,
177 atomic_write_dir,
178 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
179 }),
180 })
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct FsBackend {
187 core: Arc<FsCore>,
188}
189
190impl Access for FsBackend {
191 type Reader = FsReader<tokio::fs::File>;
192 type Writer = FsWriters;
193 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
194 type Deleter = oio::OneShotDeleter<FsDeleter>;
195
196 fn info(&self) -> Arc<AccessorInfo> {
197 self.core.info.clone()
198 }
199
200 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
201 self.core.fs_create_dir(path).await?;
202 Ok(RpCreateDir::default())
203 }
204
205 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
206 let m = self.core.fs_stat(path).await?;
207 Ok(RpStat::new(m))
208 }
209
210 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
220 let f = self.core.fs_read(path, &args).await?;
221 let r = FsReader::new(
222 self.core.clone(),
223 f,
224 args.range().size().unwrap_or(u64::MAX) as _,
225 );
226 Ok((RpRead::new(), r))
227 }
228
229 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
230 let is_append = op.append();
231 let concurrent = op.concurrent();
232
233 let writer = FsWriter::create(self.core.clone(), path, op).await?;
234
235 let writer = if is_append {
236 FsWriters::One(writer)
237 } else {
238 FsWriters::Two(oio::PositionWriter::new(
239 self.info().clone(),
240 writer,
241 concurrent,
242 ))
243 };
244
245 Ok((RpWrite::default(), writer))
246 }
247
248 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
249 Ok((
250 RpDelete::default(),
251 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
252 ))
253 }
254
255 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
256 match self.core.fs_list(path).await? {
257 Some(f) => {
258 let rd = FsLister::new(&self.core.root, path, f);
259 Ok((RpList::default(), Some(rd)))
260 }
261 None => Ok((RpList::default(), None)),
262 }
263 }
264
265 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
266 self.core.fs_copy(from, to).await?;
267 Ok(RpCopy::default())
268 }
269
270 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
271 self.core.fs_rename(from, to).await?;
272 Ok(RpRename::default())
273 }
274}