opendal_core/services/b2/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Request;
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use mea::rwlock::RwLock;
26
27use super::B2_SCHEME;
28use super::config::B2Config;
29use super::core::B2Core;
30use super::core::B2Signer;
31use super::core::constants;
32use super::core::parse_file_info;
33use super::deleter::B2Deleter;
34use super::error::parse_error;
35use super::lister::B2Lister;
36use super::writer::B2Writer;
37use super::writer::B2Writers;
38use crate::raw::*;
39use crate::*;
40
41#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct B2Builder {
45 pub(super) config: B2Config,
46}
47
48impl Debug for B2Builder {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("B2Builder")
51 .field("config", &self.config)
52 .finish_non_exhaustive()
53 }
54}
55
56impl B2Builder {
57 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn application_key_id(mut self, application_key_id: &str) -> Self {
72 self.config.application_key_id = if application_key_id.is_empty() {
73 None
74 } else {
75 Some(application_key_id.to_string())
76 };
77
78 self
79 }
80
81 pub fn application_key(mut self, application_key: &str) -> Self {
83 self.config.application_key = if application_key.is_empty() {
84 None
85 } else {
86 Some(application_key.to_string())
87 };
88
89 self
90 }
91
92 pub fn bucket(mut self, bucket: &str) -> Self {
95 self.config.bucket = bucket.to_string();
96
97 self
98 }
99
100 pub fn bucket_id(mut self, bucket_id: &str) -> Self {
103 self.config.bucket_id = bucket_id.to_string();
104
105 self
106 }
107}
108
109impl Builder for B2Builder {
110 type Config = B2Config;
111
112 fn build(self) -> Result<impl Access> {
114 debug!("backend build started: {:?}", &self);
115
116 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
117 debug!("backend use root {}", &root);
118
119 if self.config.bucket.is_empty() {
121 return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
122 .with_operation("Builder::build")
123 .with_context("service", B2_SCHEME));
124 }
125
126 debug!("backend use bucket {}", &self.config.bucket);
127
128 if self.config.bucket_id.is_empty() {
130 return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is empty")
131 .with_operation("Builder::build")
132 .with_context("service", B2_SCHEME));
133 }
134
135 debug!("backend bucket_id {}", &self.config.bucket_id);
136
137 let application_key_id = match &self.config.application_key_id {
138 Some(application_key_id) => Ok(application_key_id.clone()),
139 None => Err(
140 Error::new(ErrorKind::ConfigInvalid, "application_key_id is empty")
141 .with_operation("Builder::build")
142 .with_context("service", B2_SCHEME),
143 ),
144 }?;
145
146 let application_key = match &self.config.application_key {
147 Some(key_id) => Ok(key_id.clone()),
148 None => Err(
149 Error::new(ErrorKind::ConfigInvalid, "application_key is empty")
150 .with_operation("Builder::build")
151 .with_context("service", B2_SCHEME),
152 ),
153 }?;
154
155 let signer = B2Signer {
156 application_key_id,
157 application_key,
158 ..Default::default()
159 };
160
161 Ok(B2Backend {
162 core: Arc::new(B2Core {
163 info: {
164 let am = AccessorInfo::default();
165 am.set_scheme(B2_SCHEME)
166 .set_root(&root)
167 .set_native_capability(Capability {
168 stat: true,
169
170 read: true,
171
172 write: true,
173 write_can_empty: true,
174 write_can_multi: true,
175 write_with_content_type: true,
176 write_multi_min_size: Some(5 * 1024 * 1024),
180 write_multi_max_size: if cfg!(target_pointer_width = "64") {
184 Some(5 * 1024 * 1024 * 1024)
185 } else {
186 Some(usize::MAX)
187 },
188
189 delete: true,
190 copy: true,
191
192 list: true,
193 list_with_limit: true,
194 list_with_start_after: true,
195 list_with_recursive: true,
196
197 presign: true,
198 presign_read: true,
199 presign_write: true,
200 presign_stat: true,
201
202 shared: true,
203
204 ..Default::default()
205 });
206
207 am.into()
208 },
209 signer: Arc::new(RwLock::new(signer)),
210 root,
211
212 bucket: self.config.bucket.clone(),
213 bucket_id: self.config.bucket_id.clone(),
214 }),
215 })
216 }
217}
218
219#[derive(Debug, Clone)]
221pub struct B2Backend {
222 core: Arc<B2Core>,
223}
224
225impl Access for B2Backend {
226 type Reader = HttpBody;
227 type Writer = B2Writers;
228 type Lister = oio::PageLister<B2Lister>;
229 type Deleter = oio::OneShotDeleter<B2Deleter>;
230
231 fn info(&self) -> Arc<AccessorInfo> {
232 self.core.info.clone()
233 }
234
235 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
238 if path == "/" {
240 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
241 }
242
243 let delimiter = if path.ends_with('/') { Some("/") } else { None };
244
245 let file_info = self.core.get_file_info(path, delimiter).await?;
246 let meta = parse_file_info(&file_info);
247 Ok(RpStat::new(meta))
248 }
249
250 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
251 let resp = self
252 .core
253 .download_file_by_name(path, args.range(), &args)
254 .await?;
255
256 let status = resp.status();
257 match status {
258 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
259 Ok((RpRead::default(), resp.into_body()))
260 }
261 _ => {
262 let (part, mut body) = resp.into_parts();
263 let buf = body.to_buffer().await?;
264 Err(parse_error(Response::from_parts(part, buf)))
265 }
266 }
267 }
268
269 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
270 let concurrent = args.concurrent();
271 let writer = B2Writer::new(self.core.clone(), path, args);
272
273 let w = oio::MultipartWriter::new(self.core.info.clone(), writer, concurrent);
274
275 Ok((RpWrite::default(), w))
276 }
277
278 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
279 Ok((
280 RpDelete::default(),
281 oio::OneShotDeleter::new(B2Deleter::new(self.core.clone())),
282 ))
283 }
284
285 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
286 Ok((
287 RpList::default(),
288 oio::PageLister::new(B2Lister::new(
289 self.core.clone(),
290 path,
291 args.recursive(),
292 args.limit(),
293 args.start_after(),
294 )),
295 ))
296 }
297
298 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
299 let file_info = self.core.get_file_info(from, None).await?;
300
301 let source_file_id = file_info.file_id;
302
303 let Some(source_file_id) = source_file_id else {
304 return Err(Error::new(ErrorKind::IsADirectory, "is a directory"));
305 };
306
307 let resp = self.core.copy_file(source_file_id, to).await?;
308
309 let status = resp.status();
310
311 match status {
312 StatusCode::OK => Ok(RpCopy::default()),
313 _ => Err(parse_error(resp)),
314 }
315 }
316
317 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
318 match args.operation() {
319 PresignOperation::Stat(_) => {
320 let resp = self
321 .core
322 .get_download_authorization(path, args.expire())
323 .await?;
324 let path = build_abs_path(&self.core.root, path);
325
326 let auth_info = self.core.get_auth_info().await?;
327
328 let url = format!(
329 "{}/file/{}/{}?Authorization={}",
330 auth_info.download_url, self.core.bucket, path, resp.authorization_token
331 );
332
333 let req = Request::get(url);
334
335 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
336
337 let (parts, _) = req.into_parts();
339
340 Ok(RpPresign::new(PresignedRequest::new(
341 parts.method,
342 parts.uri,
343 parts.headers,
344 )))
345 }
346 PresignOperation::Read(_) => {
347 let resp = self
348 .core
349 .get_download_authorization(path, args.expire())
350 .await?;
351 let path = build_abs_path(&self.core.root, path);
352
353 let auth_info = self.core.get_auth_info().await?;
354
355 let url = format!(
356 "{}/file/{}/{}?Authorization={}",
357 auth_info.download_url, self.core.bucket, path, resp.authorization_token
358 );
359
360 let req = Request::get(url);
361
362 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
363
364 let (parts, _) = req.into_parts();
366
367 Ok(RpPresign::new(PresignedRequest::new(
368 parts.method,
369 parts.uri,
370 parts.headers,
371 )))
372 }
373 PresignOperation::Write(_) => {
374 let resp = self.core.get_upload_url().await?;
375
376 let mut req = Request::post(&resp.upload_url);
377
378 req = req.header(http::header::AUTHORIZATION, resp.authorization_token);
379 req = req.header("X-Bz-File-Name", build_abs_path(&self.core.root, path));
380 req = req.header(http::header::CONTENT_TYPE, "b2/x-auto");
381 req = req.header(constants::X_BZ_CONTENT_SHA1, "do_not_verify");
382
383 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
384 let (parts, _) = req.into_parts();
386
387 Ok(RpPresign::new(PresignedRequest::new(
388 parts.method,
389 parts.uri,
390 parts.headers,
391 )))
392 }
393 PresignOperation::Delete(_) => Err(Error::new(
394 ErrorKind::Unsupported,
395 "operation is not supported",
396 )),
397 }
398 }
399}