opendal/services/compfs/
backend.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35 type Builder = CompfsBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CompfsBuilder { config: self }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44 config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58}
59
60impl Builder for CompfsBuilder {
61 const SCHEME: Scheme = Scheme::Compfs;
62 type Config = CompfsConfig;
63
64 fn build(self) -> Result<impl Access> {
65 let root = match self.config.root {
66 Some(root) => Ok(root),
67 None => Err(Error::new(
68 ErrorKind::ConfigInvalid,
69 "root is not specified",
70 )),
71 }?;
72
73 if let Err(e) = std::fs::metadata(&root) {
75 if e.kind() == std::io::ErrorKind::NotFound {
76 std::fs::create_dir_all(&root).map_err(|e| {
77 Error::new(ErrorKind::Unexpected, "create root dir failed")
78 .with_operation("Builder::build")
79 .with_context("root", root.as_str())
80 .set_source(e)
81 })?;
82 }
83 }
84
85 let dispatcher = Dispatcher::new().map_err(|_| {
86 Error::new(
87 ErrorKind::Unexpected,
88 "failed to initiate compio dispatcher",
89 )
90 })?;
91 let core = CompfsCore {
92 info: {
93 let am = AccessorInfo::default();
94 am.set_scheme(Scheme::Compfs)
95 .set_root(&root)
96 .set_native_capability(Capability {
97 stat: true,
98
99 read: true,
100
101 write: true,
102 write_can_empty: true,
103 write_can_multi: true,
104 create_dir: true,
105 delete: true,
106
107 list: true,
108
109 copy: true,
110 rename: true,
111
112 shared: true,
113
114 ..Default::default()
115 });
116
117 am.into()
118 },
119 root: root.into(),
120 dispatcher,
121 buf_pool: oio::PooledBuf::new(16),
122 };
123 Ok(CompfsBackend {
124 core: Arc::new(core),
125 })
126 }
127}
128
129#[derive(Debug)]
130pub struct CompfsBackend {
131 core: Arc<CompfsCore>,
132}
133
134impl Access for CompfsBackend {
135 type Reader = CompfsReader;
136 type Writer = CompfsWriter;
137 type Lister = Option<CompfsLister>;
138 type Deleter = OneShotDeleter<CompfsDeleter>;
139
140 fn info(&self) -> Arc<AccessorInfo> {
141 self.core.info.clone()
142 }
143
144 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
145 let path = self.core.prepare_path(path);
146
147 self.core
148 .exec(move || async move { compio::fs::create_dir_all(path).await })
149 .await?;
150
151 Ok(RpCreateDir::default())
152 }
153
154 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
155 let path = self.core.prepare_path(path);
156
157 let meta = self
158 .core
159 .exec(move || async move { compio::fs::metadata(path).await })
160 .await?;
161 let ty = meta.file_type();
162 let mode = if ty.is_dir() {
163 EntryMode::DIR
164 } else if ty.is_file() {
165 EntryMode::FILE
166 } else {
167 EntryMode::Unknown
168 };
169 let last_mod = meta.modified().map_err(new_std_io_error)?.into();
170 let ret = Metadata::new(mode)
171 .with_last_modified(last_mod)
172 .with_content_length(meta.len());
173
174 Ok(RpStat::new(ret))
175 }
176
177 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
178 Ok((
179 RpDelete::default(),
180 OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
181 ))
182 }
183
184 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
185 let from = self.core.prepare_path(from);
186 let to = self.core.prepare_path(to);
187
188 self.core
189 .exec(move || async move {
190 let from = OpenOptions::new().read(true).open(from).await?;
191 if let Some(parent) = to.parent() {
192 compio::fs::create_dir_all(parent).await?;
193 }
194 let to = OpenOptions::new()
195 .write(true)
196 .create(true)
197 .truncate(true)
198 .open(to)
199 .await?;
200
201 let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
202 compio::io::copy(&mut from, &mut to).await?;
203
204 Ok(())
205 })
206 .await?;
207
208 Ok(RpCopy::default())
209 }
210
211 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
212 let from = self.core.prepare_path(from);
213 let to = self.core.prepare_path(to);
214
215 self.core
216 .exec(move || async move {
217 if let Some(parent) = to.parent() {
218 compio::fs::create_dir_all(parent).await?;
219 }
220 compio::fs::rename(from, to).await
221 })
222 .await?;
223
224 Ok(RpRename::default())
225 }
226
227 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
228 let path = self.core.prepare_path(path);
229
230 let file = self
231 .core
232 .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
233 .await?;
234
235 let r = CompfsReader::new(self.core.clone(), file, op.range());
236 Ok((RpRead::new(), r))
237 }
238
239 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
240 let path = self.core.prepare_path(path);
241 let append = args.append();
242 let file = self
243 .core
244 .exec(move || async move {
245 if let Some(parent) = path.parent() {
246 compio::fs::create_dir_all(parent).await?;
247 }
248 let file = compio::fs::OpenOptions::new()
249 .create(true)
250 .write(true)
251 .truncate(!append)
252 .open(path)
253 .await?;
254 let mut file = Cursor::new(file);
255 if append {
256 let len = file.get_ref().metadata().await?.len();
257 file.set_position(len);
258 }
259 Ok(file)
260 })
261 .await?;
262
263 let w = CompfsWriter::new(self.core.clone(), file);
264 Ok((RpWrite::new(), w))
265 }
266
267 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
268 let path = self.core.prepare_path(path);
269
270 let read_dir = match self
271 .core
272 .exec_blocking({
273 let path = path.clone();
274 move || std::fs::read_dir(path)
275 })
276 .await?
277 {
278 Ok(rd) => rd,
279 Err(e) => {
280 return if e.kind() == std::io::ErrorKind::NotFound {
281 Ok((RpList::default(), None))
282 } else {
283 Err(new_std_io_error(e))
284 };
285 }
286 };
287
288 let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
289 Ok((RpList::default(), Some(lister)))
290 }
291}