1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::AliyunConfig;
27use reqsign::AliyunLoader;
28use reqsign::AliyunOssSigner;
29
30use super::core::*;
31use super::delete::OssDeleter;
32use super::error::parse_error;
33use super::lister::OssLister;
34use super::lister::OssListers;
35use super::lister::OssObjectVersionsLister;
36use super::writer::OssWriter;
37use super::writer::OssWriters;
38use super::DEFAULT_SCHEME;
39use crate::raw::*;
40use crate::services::OssConfig;
41use crate::*;
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44impl Configurator for OssConfig {
45 type Builder = OssBuilder;
46
47 #[allow(deprecated)]
48 fn into_builder(self) -> Self::Builder {
49 OssBuilder {
50 config: self,
51
52 http_client: None,
53 }
54 }
55}
56
57#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct OssBuilder {
61 config: OssConfig,
62
63 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64 http_client: Option<HttpClient>,
65}
66
67impl Debug for OssBuilder {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let mut d = f.debug_struct("OssBuilder");
70
71 d.field("config", &self.config);
72 d.finish_non_exhaustive()
73 }
74}
75
76impl OssBuilder {
77 pub fn root(mut self, root: &str) -> Self {
81 self.config.root = if root.is_empty() {
82 None
83 } else {
84 Some(root.to_string())
85 };
86
87 self
88 }
89
90 pub fn bucket(mut self, bucket: &str) -> Self {
92 self.config.bucket = bucket.to_string();
93
94 self
95 }
96
97 pub fn endpoint(mut self, endpoint: &str) -> Self {
99 if !endpoint.is_empty() {
100 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
102 }
103
104 self
105 }
106
107 pub fn addressing_style(mut self, addressing_style: &str) -> Self {
117 self.config.addressing_style = Some(addressing_style.to_string());
118
119 self
120 }
121
122 pub fn enable_versioning(mut self, enabled: bool) -> Self {
124 self.config.enable_versioning = enabled;
125
126 self
127 }
128
129 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
138 if !endpoint.is_empty() {
139 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
141 }
142
143 self
144 }
145
146 pub fn presign_addressing_style(mut self, addressing_style: &str) -> Self {
154 self.config.presign_addressing_style = Some(addressing_style.to_string());
155
156 self
157 }
158
159 pub fn access_key_id(mut self, v: &str) -> Self {
164 if !v.is_empty() {
165 self.config.access_key_id = Some(v.to_string())
166 }
167
168 self
169 }
170
171 pub fn access_key_secret(mut self, v: &str) -> Self {
176 if !v.is_empty() {
177 self.config.access_key_secret = Some(v.to_string())
178 }
179
180 self
181 }
182
183 pub fn security_token(mut self, security_token: &str) -> Self {
188 if !security_token.is_empty() {
189 self.config.security_token = Some(security_token.to_string())
190 }
191
192 self
193 }
194
195 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
202 #[allow(deprecated)]
203 pub fn http_client(mut self, client: HttpClient) -> Self {
204 self.http_client = Some(client);
205 self
206 }
207
208 fn parse_endpoint(
210 &self,
211 endpoint: &Option<String>,
212 bucket: &str,
213 addressing_style: AddressingStyle,
214 ) -> Result<(String, String)> {
215 let (endpoint, host) = match endpoint.clone() {
216 Some(ep) => {
217 let uri = ep.parse::<Uri>().map_err(|err| {
218 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
219 .with_context("service", Scheme::Oss)
220 .with_context("endpoint", &ep)
221 .set_source(err)
222 })?;
223 let host = uri.host().ok_or_else(|| {
224 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
225 .with_context("service", Scheme::Oss)
226 .with_context("endpoint", &ep)
227 })?;
228 let full_host = match addressing_style {
229 AddressingStyle::Virtual => {
230 if let Some(port) = uri.port_u16() {
231 format!("{bucket}.{host}:{port}")
232 } else {
233 format!("{bucket}.{host}")
234 }
235 }
236 AddressingStyle::Cname | AddressingStyle::Path => {
237 if let Some(port) = uri.port_u16() {
238 format!("{host}:{port}")
239 } else {
240 host.to_string()
241 }
242 }
243 };
244 if let Some(port) = uri.port_u16() {
245 format!("{bucket}.{host}:{port}")
246 } else {
247 format!("{bucket}.{host}")
248 };
249 let endpoint = match uri.scheme_str() {
250 Some(scheme_str) => match scheme_str {
251 "http" | "https" => format!("{scheme_str}://{full_host}"),
252 _ => {
253 return Err(Error::new(
254 ErrorKind::ConfigInvalid,
255 "endpoint protocol is invalid",
256 )
257 .with_context("service", Scheme::Oss));
258 }
259 },
260 None => format!("https://{full_host}"),
261 };
262 let endpoint = match addressing_style {
263 AddressingStyle::Path => format!("{}/{}", endpoint, bucket),
264 AddressingStyle::Cname | AddressingStyle::Virtual => endpoint,
265 };
266 (endpoint, full_host)
267 }
268 None => {
269 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
270 .with_context("service", Scheme::Oss));
271 }
272 };
273 Ok((endpoint, host))
274 }
275
276 pub fn server_side_encryption(mut self, v: &str) -> Self {
293 if !v.is_empty() {
294 self.config.server_side_encryption = Some(v.to_string())
295 }
296 self
297 }
298
299 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
305 if !v.is_empty() {
306 self.config.server_side_encryption_key_id = Some(v.to_string())
307 }
308 self
309 }
310
311 #[deprecated(
313 since = "0.52.0",
314 note = "Please use `delete_max_size` instead of `batch_max_operations`"
315 )]
316 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
317 self.config.delete_max_size = Some(delete_max_size);
318
319 self
320 }
321
322 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
324 self.config.delete_max_size = Some(delete_max_size);
325
326 self
327 }
328
329 pub fn allow_anonymous(mut self) -> Self {
332 self.config.allow_anonymous = true;
333 self
334 }
335
336 pub fn role_arn(mut self, role_arn: &str) -> Self {
341 if !role_arn.is_empty() {
342 self.config.role_arn = Some(role_arn.to_string())
343 }
344
345 self
346 }
347
348 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
350 if !role_session_name.is_empty() {
351 self.config.role_session_name = Some(role_session_name.to_string())
352 }
353
354 self
355 }
356
357 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
359 if !oidc_provider_arn.is_empty() {
360 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
361 }
362
363 self
364 }
365
366 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
368 if !oidc_token_file.is_empty() {
369 self.config.oidc_token_file = Some(oidc_token_file.to_string())
370 }
371
372 self
373 }
374
375 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
377 if !sts_endpoint.is_empty() {
378 self.config.sts_endpoint = Some(sts_endpoint.to_string())
379 }
380
381 self
382 }
383}
384
385enum AddressingStyle {
386 Path,
387 Cname,
388 Virtual,
389}
390
391impl TryFrom<&Option<String>> for AddressingStyle {
392 type Error = Error;
393
394 fn try_from(value: &Option<String>) -> Result<Self> {
395 match value.as_deref() {
396 None | Some("virtual") => Ok(AddressingStyle::Virtual),
397 Some("path") => Ok(AddressingStyle::Path),
398 Some("cname") => Ok(AddressingStyle::Cname),
399 Some(v) => Err(Error::new(
400 ErrorKind::ConfigInvalid,
401 "Invalid addressing style, available: `virtual`, `path`, `cname`",
402 )
403 .with_context("service", Scheme::Oss)
404 .with_context("addressing_style", v)),
405 }
406 }
407}
408
409impl Builder for OssBuilder {
410 type Config = OssConfig;
411
412 fn build(self) -> Result<impl Access> {
413 debug!("backend build started: {:?}", &self);
414
415 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
416 debug!("backend use root {}", &root);
417
418 let bucket = match self.config.bucket.is_empty() {
420 false => Ok(&self.config.bucket),
421 true => Err(
422 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
423 .with_context("service", Scheme::Oss),
424 ),
425 }?;
426
427 let (endpoint, host) = self.parse_endpoint(
430 &self.config.endpoint,
431 bucket,
432 (&self.config.addressing_style).try_into()?,
433 )?;
434 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
435
436 let presign_endpoint = if self.config.presign_endpoint.is_some() {
437 self.parse_endpoint(
438 &self.config.presign_endpoint,
439 bucket,
440 (&self.config.presign_addressing_style).try_into()?,
441 )?
442 .0
443 } else {
444 endpoint.clone()
445 };
446 debug!("backend use presign_endpoint: {}", &presign_endpoint);
447
448 let server_side_encryption = match &self.config.server_side_encryption {
449 None => None,
450 Some(v) => Some(
451 build_header_value(v)
452 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
453 ),
454 };
455
456 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
457 None => None,
458 Some(v) => Some(
459 build_header_value(v)
460 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
461 ),
462 };
463
464 let mut cfg = AliyunConfig::default();
465 cfg = cfg.from_env();
467
468 if let Some(v) = self.config.access_key_id {
469 cfg.access_key_id = Some(v);
470 }
471
472 if let Some(v) = self.config.access_key_secret {
473 cfg.access_key_secret = Some(v);
474 }
475
476 if let Some(v) = self.config.security_token {
477 cfg.security_token = Some(v);
478 }
479
480 if let Some(v) = self.config.role_arn {
481 cfg.role_arn = Some(v);
482 }
483
484 if let Some(v) = self.config.role_session_name {
486 cfg.role_session_name = v;
487 }
488
489 if let Some(v) = self.config.oidc_provider_arn {
490 cfg.oidc_provider_arn = Some(v);
491 }
492
493 if let Some(v) = self.config.oidc_token_file {
494 cfg.oidc_token_file = Some(v);
495 }
496
497 if let Some(v) = self.config.sts_endpoint {
498 cfg.sts_endpoint = Some(v);
499 }
500
501 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
502
503 let signer = AliyunOssSigner::new(bucket);
504
505 let delete_max_size = self
506 .config
507 .delete_max_size
508 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
509
510 Ok(OssBackend {
511 core: Arc::new(OssCore {
512 info: {
513 let am = AccessorInfo::default();
514 am.set_scheme(DEFAULT_SCHEME)
515 .set_root(&root)
516 .set_name(bucket)
517 .set_native_capability(Capability {
518 stat: true,
519 stat_with_if_match: true,
520 stat_with_if_none_match: true,
521 stat_with_version: self.config.enable_versioning,
522
523 read: true,
524
525 read_with_if_match: true,
526 read_with_if_none_match: true,
527 read_with_version: self.config.enable_versioning,
528 read_with_if_modified_since: true,
529 read_with_if_unmodified_since: true,
530
531 write: true,
532 write_can_empty: true,
533 write_can_append: true,
534 write_can_multi: true,
535 write_with_cache_control: true,
536 write_with_content_type: true,
537 write_with_content_disposition: true,
538 write_with_if_not_exists: !self.config.enable_versioning,
540
541 write_multi_min_size: Some(100 * 1024),
545 write_multi_max_size: if cfg!(target_pointer_width = "64") {
549 Some(5 * 1024 * 1024 * 1024)
550 } else {
551 Some(usize::MAX)
552 },
553 write_with_user_metadata: true,
554
555 delete: true,
556 delete_with_version: self.config.enable_versioning,
557 delete_max_size: Some(delete_max_size),
558
559 copy: true,
560
561 list: true,
562 list_with_limit: true,
563 list_with_start_after: true,
564 list_with_recursive: true,
565 list_with_versions: self.config.enable_versioning,
566 list_with_deleted: self.config.enable_versioning,
567
568 presign: true,
569 presign_stat: true,
570 presign_read: true,
571 presign_write: true,
572
573 shared: true,
574
575 ..Default::default()
576 });
577
578 #[allow(deprecated)]
580 if let Some(client) = self.http_client {
581 am.update_http_client(|_| client);
582 }
583
584 am.into()
585 },
586 root,
587 bucket: bucket.to_owned(),
588 endpoint,
589 host,
590 presign_endpoint,
591 allow_anonymous: self.config.allow_anonymous,
592 signer,
593 loader,
594 server_side_encryption,
595 server_side_encryption_key_id,
596 }),
597 })
598 }
599}
600
601#[derive(Debug, Clone)]
602pub struct OssBackend {
604 core: Arc<OssCore>,
605}
606
607impl Access for OssBackend {
608 type Reader = HttpBody;
609 type Writer = OssWriters;
610 type Lister = OssListers;
611 type Deleter = oio::BatchDeleter<OssDeleter>;
612
613 fn info(&self) -> Arc<AccessorInfo> {
614 self.core.info.clone()
615 }
616
617 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
618 let resp = self.core.oss_head_object(path, &args).await?;
619
620 let status = resp.status();
621
622 match status {
623 StatusCode::OK => {
624 let headers = resp.headers();
625 let mut meta = self.core.parse_metadata(path, resp.headers())?;
626
627 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
628 meta.set_version(v);
629 }
630
631 Ok(RpStat::new(meta))
632 }
633 _ => Err(parse_error(resp)),
634 }
635 }
636
637 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
638 let resp = self.core.oss_get_object(path, &args).await?;
639
640 let status = resp.status();
641
642 match status {
643 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
644 Ok((RpRead::default(), resp.into_body()))
645 }
646 _ => {
647 let (part, mut body) = resp.into_parts();
648 let buf = body.to_buffer().await?;
649 Err(parse_error(Response::from_parts(part, buf)))
650 }
651 }
652 }
653
654 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
655 let writer = OssWriter::new(self.core.clone(), path, args.clone());
656
657 let w = if args.append() {
658 OssWriters::Two(oio::AppendWriter::new(writer))
659 } else {
660 OssWriters::One(oio::MultipartWriter::new(
661 self.core.info.clone(),
662 writer,
663 args.concurrent(),
664 ))
665 };
666
667 Ok((RpWrite::default(), w))
668 }
669
670 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
671 Ok((
672 RpDelete::default(),
673 oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
674 ))
675 }
676
677 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
678 let l = if args.versions() || args.deleted() {
679 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
680 self.core.clone(),
681 path,
682 args,
683 )))
684 } else {
685 TwoWays::One(oio::PageLister::new(OssLister::new(
686 self.core.clone(),
687 path,
688 args.recursive(),
689 args.limit(),
690 args.start_after(),
691 )))
692 };
693
694 Ok((RpList::default(), l))
695 }
696
697 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
698 let resp = self.core.oss_copy_object(from, to).await?;
699 let status = resp.status();
700
701 match status {
702 StatusCode::OK => Ok(RpCopy::default()),
703 _ => Err(parse_error(resp)),
704 }
705 }
706
707 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
708 let req = match args.operation() {
710 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
711 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
712 PresignOperation::Write(v) => {
713 self.core
714 .oss_put_object_request(path, None, v, Buffer::new(), true)
715 }
716 PresignOperation::Delete(_) => Err(Error::new(
717 ErrorKind::Unsupported,
718 "operation is not supported",
719 )),
720 };
721 let mut req = req?;
722
723 self.core.sign_query(&mut req, args.expire()).await?;
724
725 let (parts, _) = req.into_parts();
727
728 Ok(RpPresign::new(PresignedRequest::new(
729 parts.method,
730 parts.uri,
731 parts.headers,
732 )))
733 }
734}