1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::AliyunConfig;
27use reqsign::AliyunLoader;
28use reqsign::AliyunOssSigner;
29
30use super::core::*;
31use super::delete::OssDeleter;
32use super::error::parse_error;
33use super::lister::OssLister;
34use super::lister::OssListers;
35use super::lister::OssObjectVersionsLister;
36use super::writer::OssWriter;
37use super::writer::OssWriters;
38use crate::raw::*;
39use crate::services::OssConfig;
40use crate::*;
41
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44impl Configurator for OssConfig {
45 type Builder = OssBuilder;
46
47 #[allow(deprecated)]
48 fn into_builder(self) -> Self::Builder {
49 OssBuilder {
50 config: self,
51
52 http_client: None,
53 }
54 }
55}
56
57#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct OssBuilder {
61 config: OssConfig,
62
63 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64 http_client: Option<HttpClient>,
65}
66
67impl Debug for OssBuilder {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let mut d = f.debug_struct("OssBuilder");
70
71 d.field("config", &self.config);
72 d.finish_non_exhaustive()
73 }
74}
75
76impl OssBuilder {
77 pub fn root(mut self, root: &str) -> Self {
81 self.config.root = if root.is_empty() {
82 None
83 } else {
84 Some(root.to_string())
85 };
86
87 self
88 }
89
90 pub fn bucket(mut self, bucket: &str) -> Self {
92 self.config.bucket = bucket.to_string();
93
94 self
95 }
96
97 pub fn endpoint(mut self, endpoint: &str) -> Self {
99 if !endpoint.is_empty() {
100 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
102 }
103
104 self
105 }
106
107 pub fn enable_versioning(mut self, enabled: bool) -> Self {
109 self.config.enable_versioning = enabled;
110
111 self
112 }
113
114 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
123 if !endpoint.is_empty() {
124 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
126 }
127
128 self
129 }
130
131 pub fn access_key_id(mut self, v: &str) -> Self {
136 if !v.is_empty() {
137 self.config.access_key_id = Some(v.to_string())
138 }
139
140 self
141 }
142
143 pub fn access_key_secret(mut self, v: &str) -> Self {
148 if !v.is_empty() {
149 self.config.access_key_secret = Some(v.to_string())
150 }
151
152 self
153 }
154
155 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
162 #[allow(deprecated)]
163 pub fn http_client(mut self, client: HttpClient) -> Self {
164 self.http_client = Some(client);
165 self
166 }
167
168 fn parse_endpoint(&self, endpoint: &Option<String>, bucket: &str) -> Result<(String, String)> {
170 let (endpoint, host) = match endpoint.clone() {
171 Some(ep) => {
172 let uri = ep.parse::<Uri>().map_err(|err| {
173 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
174 .with_context("service", Scheme::Oss)
175 .with_context("endpoint", &ep)
176 .set_source(err)
177 })?;
178 let host = uri.host().ok_or_else(|| {
179 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
180 .with_context("service", Scheme::Oss)
181 .with_context("endpoint", &ep)
182 })?;
183 let full_host = if let Some(port) = uri.port_u16() {
184 format!("{bucket}.{host}:{port}")
185 } else {
186 format!("{bucket}.{host}")
187 };
188 let endpoint = match uri.scheme_str() {
189 Some(scheme_str) => match scheme_str {
190 "http" | "https" => format!("{scheme_str}://{full_host}"),
191 _ => {
192 return Err(Error::new(
193 ErrorKind::ConfigInvalid,
194 "endpoint protocol is invalid",
195 )
196 .with_context("service", Scheme::Oss));
197 }
198 },
199 None => format!("https://{full_host}"),
200 };
201 (endpoint, full_host)
202 }
203 None => {
204 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
205 .with_context("service", Scheme::Oss));
206 }
207 };
208 Ok((endpoint, host))
209 }
210
211 pub fn server_side_encryption(mut self, v: &str) -> Self {
228 if !v.is_empty() {
229 self.config.server_side_encryption = Some(v.to_string())
230 }
231 self
232 }
233
234 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
240 if !v.is_empty() {
241 self.config.server_side_encryption_key_id = Some(v.to_string())
242 }
243 self
244 }
245
246 #[deprecated(
248 since = "0.52.0",
249 note = "Please use `delete_max_size` instead of `batch_max_operations`"
250 )]
251 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
252 self.config.delete_max_size = Some(delete_max_size);
253
254 self
255 }
256
257 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
259 self.config.delete_max_size = Some(delete_max_size);
260
261 self
262 }
263
264 pub fn allow_anonymous(mut self) -> Self {
267 self.config.allow_anonymous = true;
268 self
269 }
270
271 pub fn role_arn(mut self, role_arn: &str) -> Self {
276 if !role_arn.is_empty() {
277 self.config.role_arn = Some(role_arn.to_string())
278 }
279
280 self
281 }
282
283 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
285 if !role_session_name.is_empty() {
286 self.config.role_session_name = Some(role_session_name.to_string())
287 }
288
289 self
290 }
291
292 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
294 if !oidc_provider_arn.is_empty() {
295 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
296 }
297
298 self
299 }
300
301 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
303 if !oidc_token_file.is_empty() {
304 self.config.oidc_token_file = Some(oidc_token_file.to_string())
305 }
306
307 self
308 }
309
310 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
312 if !sts_endpoint.is_empty() {
313 self.config.sts_endpoint = Some(sts_endpoint.to_string())
314 }
315
316 self
317 }
318}
319
320impl Builder for OssBuilder {
321 const SCHEME: Scheme = Scheme::Oss;
322 type Config = OssConfig;
323
324 fn build(self) -> Result<impl Access> {
325 debug!("backend build started: {:?}", &self);
326
327 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
328 debug!("backend use root {}", &root);
329
330 let bucket = match self.config.bucket.is_empty() {
332 false => Ok(&self.config.bucket),
333 true => Err(
334 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
335 .with_context("service", Scheme::Oss),
336 ),
337 }?;
338
339 let (endpoint, host) = self.parse_endpoint(&self.config.endpoint, bucket)?;
342 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
343
344 let presign_endpoint = if self.config.presign_endpoint.is_some() {
345 self.parse_endpoint(&self.config.presign_endpoint, bucket)?
346 .0
347 } else {
348 endpoint.clone()
349 };
350 debug!("backend use presign_endpoint: {}", &presign_endpoint);
351
352 let server_side_encryption = match &self.config.server_side_encryption {
353 None => None,
354 Some(v) => Some(
355 build_header_value(v)
356 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
357 ),
358 };
359
360 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
361 None => None,
362 Some(v) => Some(
363 build_header_value(v)
364 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
365 ),
366 };
367
368 let mut cfg = AliyunConfig::default();
369 cfg = cfg.from_env();
371
372 if let Some(v) = self.config.access_key_id {
373 cfg.access_key_id = Some(v);
374 }
375
376 if let Some(v) = self.config.access_key_secret {
377 cfg.access_key_secret = Some(v);
378 }
379
380 if let Some(v) = self.config.role_arn {
381 cfg.role_arn = Some(v);
382 }
383
384 if let Some(v) = self.config.role_session_name {
386 cfg.role_session_name = v;
387 }
388
389 if let Some(v) = self.config.oidc_provider_arn {
390 cfg.oidc_provider_arn = Some(v);
391 }
392
393 if let Some(v) = self.config.oidc_token_file {
394 cfg.oidc_token_file = Some(v);
395 }
396
397 if let Some(v) = self.config.sts_endpoint {
398 cfg.sts_endpoint = Some(v);
399 }
400
401 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
402
403 let signer = AliyunOssSigner::new(bucket);
404
405 let delete_max_size = self
406 .config
407 .delete_max_size
408 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
409
410 Ok(OssBackend {
411 core: Arc::new(OssCore {
412 info: {
413 let am = AccessorInfo::default();
414 am.set_scheme(Scheme::Oss)
415 .set_root(&root)
416 .set_name(bucket)
417 .set_native_capability(Capability {
418 stat: true,
419 stat_with_if_match: true,
420 stat_with_if_none_match: true,
421 stat_with_version: self.config.enable_versioning,
422
423 read: true,
424
425 read_with_if_match: true,
426 read_with_if_none_match: true,
427 read_with_version: self.config.enable_versioning,
428 read_with_if_modified_since: true,
429 read_with_if_unmodified_since: true,
430
431 write: true,
432 write_can_empty: true,
433 write_can_append: true,
434 write_can_multi: true,
435 write_with_cache_control: true,
436 write_with_content_type: true,
437 write_with_content_disposition: true,
438 write_with_if_not_exists: !self.config.enable_versioning,
440
441 write_multi_min_size: Some(100 * 1024),
445 write_multi_max_size: if cfg!(target_pointer_width = "64") {
449 Some(5 * 1024 * 1024 * 1024)
450 } else {
451 Some(usize::MAX)
452 },
453 write_with_user_metadata: true,
454
455 delete: true,
456 delete_with_version: self.config.enable_versioning,
457 delete_max_size: Some(delete_max_size),
458
459 copy: true,
460
461 list: true,
462 list_with_limit: true,
463 list_with_start_after: true,
464 list_with_recursive: true,
465 list_with_versions: self.config.enable_versioning,
466 list_with_deleted: self.config.enable_versioning,
467
468 presign: true,
469 presign_stat: true,
470 presign_read: true,
471 presign_write: true,
472
473 shared: true,
474
475 ..Default::default()
476 });
477
478 #[allow(deprecated)]
480 if let Some(client) = self.http_client {
481 am.update_http_client(|_| client);
482 }
483
484 am.into()
485 },
486 root,
487 bucket: bucket.to_owned(),
488 endpoint,
489 host,
490 presign_endpoint,
491 allow_anonymous: self.config.allow_anonymous,
492 signer,
493 loader,
494 server_side_encryption,
495 server_side_encryption_key_id,
496 }),
497 })
498 }
499}
500
501#[derive(Debug, Clone)]
502pub struct OssBackend {
504 core: Arc<OssCore>,
505}
506
507impl Access for OssBackend {
508 type Reader = HttpBody;
509 type Writer = OssWriters;
510 type Lister = OssListers;
511 type Deleter = oio::BatchDeleter<OssDeleter>;
512
513 fn info(&self) -> Arc<AccessorInfo> {
514 self.core.info.clone()
515 }
516
517 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
518 let resp = self.core.oss_head_object(path, &args).await?;
519
520 let status = resp.status();
521
522 match status {
523 StatusCode::OK => {
524 let headers = resp.headers();
525 let mut meta = self.core.parse_metadata(path, resp.headers())?;
526
527 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
528 meta.set_version(v);
529 }
530
531 Ok(RpStat::new(meta))
532 }
533 _ => Err(parse_error(resp)),
534 }
535 }
536
537 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
538 let resp = self.core.oss_get_object(path, &args).await?;
539
540 let status = resp.status();
541
542 match status {
543 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
544 Ok((RpRead::default(), resp.into_body()))
545 }
546 _ => {
547 let (part, mut body) = resp.into_parts();
548 let buf = body.to_buffer().await?;
549 Err(parse_error(Response::from_parts(part, buf)))
550 }
551 }
552 }
553
554 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
555 let writer = OssWriter::new(self.core.clone(), path, args.clone());
556
557 let w = if args.append() {
558 OssWriters::Two(oio::AppendWriter::new(writer))
559 } else {
560 OssWriters::One(oio::MultipartWriter::new(
561 self.core.info.clone(),
562 writer,
563 args.concurrent(),
564 ))
565 };
566
567 Ok((RpWrite::default(), w))
568 }
569
570 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
571 Ok((
572 RpDelete::default(),
573 oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
574 ))
575 }
576
577 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
578 let l = if args.versions() || args.deleted() {
579 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
580 self.core.clone(),
581 path,
582 args,
583 )))
584 } else {
585 TwoWays::One(oio::PageLister::new(OssLister::new(
586 self.core.clone(),
587 path,
588 args.recursive(),
589 args.limit(),
590 args.start_after(),
591 )))
592 };
593
594 Ok((RpList::default(), l))
595 }
596
597 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
598 let resp = self.core.oss_copy_object(from, to).await?;
599 let status = resp.status();
600
601 match status {
602 StatusCode::OK => Ok(RpCopy::default()),
603 _ => Err(parse_error(resp)),
604 }
605 }
606
607 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
608 let req = match args.operation() {
610 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
611 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
612 PresignOperation::Write(v) => {
613 self.core
614 .oss_put_object_request(path, None, v, Buffer::new(), true)
615 }
616 PresignOperation::Delete(_) => Err(Error::new(
617 ErrorKind::Unsupported,
618 "operation is not supported",
619 )),
620 };
621 let mut req = req?;
622
623 self.core.sign_query(&mut req, args.expire()).await?;
624
625 let (parts, _) = req.into_parts();
627
628 Ok(RpPresign::new(PresignedRequest::new(
629 parts.method,
630 parts.uri,
631 parts.headers,
632 )))
633 }
634}