opendal/services/dbfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use crate::raw::*;
33use crate::services::DbfsConfig;
34use crate::*;
35
36impl Configurator for DbfsConfig {
37 type Builder = DbfsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 DbfsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47 config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 let mut ds = f.debug_struct("DbfsBuilder");
53
54 ds.field("config", &self.config);
55
56 ds.finish()
57 }
58}
59
60impl DbfsBuilder {
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
81 self.config.endpoint = if endpoint.is_empty() {
82 None
83 } else {
84 Some(endpoint.trim_end_matches('/').to_string())
85 };
86 self
87 }
88
89 pub fn token(mut self, token: &str) -> Self {
91 if !token.is_empty() {
92 self.config.token = Some(token.to_string());
93 }
94 self
95 }
96}
97
98impl Builder for DbfsBuilder {
99 const SCHEME: Scheme = Scheme::Dbfs;
100 type Config = DbfsConfig;
101
102 fn build(self) -> Result<impl Access> {
104 debug!("backend build started: {:?}", &self);
105
106 let root = normalize_root(&self.config.root.unwrap_or_default());
107 debug!("backend use root {root}");
108
109 let endpoint = match &self.config.endpoint {
110 Some(endpoint) => Ok(endpoint.clone()),
111 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
112 .with_operation("Builder::build")
113 .with_context("service", Scheme::Dbfs)),
114 }?;
115 debug!("backend use endpoint: {}", &endpoint);
116
117 let token = match self.config.token {
118 Some(token) => token,
119 None => {
120 return Err(Error::new(
121 ErrorKind::ConfigInvalid,
122 "missing token for Dbfs",
123 ));
124 }
125 };
126
127 let client = HttpClient::new()?;
128 Ok(DbfsBackend {
129 core: Arc::new(DbfsCore {
130 root,
131 endpoint: endpoint.to_string(),
132 token,
133 client,
134 }),
135 })
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct DbfsBackend {
142 core: Arc<DbfsCore>,
143}
144
145impl Access for DbfsBackend {
146 type Reader = ();
147 type Writer = oio::OneShotWriter<DbfsWriter>;
148 type Lister = oio::PageLister<DbfsLister>;
149 type Deleter = oio::OneShotDeleter<DbfsDeleter>;
150
151 fn info(&self) -> Arc<AccessorInfo> {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::Dbfs)
154 .set_root(&self.core.root)
155 .set_native_capability(Capability {
156 stat: true,
157
158 write: true,
159 create_dir: true,
160 delete: true,
161 rename: true,
162
163 list: true,
164
165 shared: true,
166
167 ..Default::default()
168 });
169 am.into()
170 }
171
172 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
173 let resp = self.core.dbfs_create_dir(path).await?;
174
175 let status = resp.status();
176
177 match status {
178 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
179 _ => Err(parse_error(resp)),
180 }
181 }
182
183 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
184 if path == "/" {
186 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
187 }
188
189 let resp = self.core.dbfs_get_status(path).await?;
190
191 let status = resp.status();
192
193 match status {
194 StatusCode::OK => {
195 let mut meta = parse_into_metadata(path, resp.headers())?;
196 let bs = resp.into_body();
197 let decoded_response: DbfsStatus =
198 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
199 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
200 decoded_response.modification_time,
201 )?);
202 match decoded_response.is_dir {
203 true => meta.set_mode(EntryMode::DIR),
204 false => {
205 meta.set_mode(EntryMode::FILE);
206 meta.set_content_length(decoded_response.file_size as u64)
207 }
208 };
209 Ok(RpStat::new(meta))
210 }
211 StatusCode::NOT_FOUND if path.ends_with('/') => {
212 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
213 }
214 _ => Err(parse_error(resp)),
215 }
216 }
217
218 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
219 Ok((
220 RpWrite::default(),
221 oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
222 ))
223 }
224
225 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
226 Ok((
227 RpDelete::default(),
228 oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
229 ))
230 }
231
232 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
233 let l = DbfsLister::new(self.core.clone(), path.to_string());
234
235 Ok((RpList::default(), oio::PageLister::new(l)))
236 }
237
238 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
239 self.core.dbfs_ensure_parent_path(to).await?;
240
241 let resp = self.core.dbfs_rename(from, to).await?;
242
243 let status = resp.status();
244
245 match status {
246 StatusCode::OK => Ok(RpRename::default()),
247 _ => Err(parse_error(resp)),
248 }
249 }
250}
251
252#[derive(Deserialize)]
253struct DbfsStatus {
254 is_dir: bool,
257 file_size: i64,
258 modification_time: i64,
259}