1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::sync::LazyLock;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use constants::X_AMZ_META_PREFIX;
30use constants::X_AMZ_VERSION_ID;
31use http::Response;
32use http::StatusCode;
33use http::Uri;
34use log::debug;
35use log::warn;
36use md5::Digest;
37use md5::Md5;
38use percent_encoding::percent_decode_str;
39use reqsign::AwsAssumeRoleLoader;
40use reqsign::AwsConfig;
41use reqsign::AwsCredentialLoad;
42use reqsign::AwsDefaultLoader;
43use reqsign::AwsV4Signer;
44use reqwest::Url;
45
46use super::core::*;
47use super::delete::S3Deleter;
48use super::error::parse_error;
49use super::lister::S3ListerV1;
50use super::lister::S3ListerV2;
51use super::lister::S3Listers;
52use super::lister::S3ObjectVersionsLister;
53use super::writer::S3Writer;
54use super::writer::S3Writers;
55use super::S3_SCHEME;
56use crate::raw::oio::PageLister;
57use crate::raw::*;
58use crate::services::S3Config;
59use crate::*;
60
61static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
63 let mut m = HashMap::new();
64 m.insert(
66 "https://s3.amazonaws.com",
67 "https://s3.{region}.amazonaws.com",
68 );
69 m
70});
71
72const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
73
74impl Configurator for S3Config {
75 type Builder = S3Builder;
76
77 fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
78 let mut map = options.clone();
79
80 let bucket_missing = map.get("bucket").map(|v| v.is_empty()).unwrap_or(true);
81 if bucket_missing {
82 let bucket = uri
83 .authority()
84 .map(|authority| authority.host())
85 .filter(|host| !host.is_empty())
86 .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri requires bucket"))?;
87 map.insert("bucket".to_string(), bucket.to_string());
88 }
89
90 if !map.contains_key("root") {
91 let path = percent_decode_str(uri.path()).decode_utf8_lossy();
92 let trimmed = path.trim_matches('/');
93 if !trimmed.is_empty() {
94 map.insert("root".to_string(), trimmed.to_string());
95 }
96 }
97
98 Self::from_iter(map)
99 }
100
101 #[allow(deprecated)]
102 fn into_builder(self) -> Self::Builder {
103 S3Builder {
104 config: self,
105 customized_credential_load: None,
106
107 http_client: None,
108 }
109 }
110}
111
112#[doc = include_str!("docs.md")]
115#[doc = include_str!("compatible_services.md")]
116#[derive(Default)]
117pub struct S3Builder {
118 config: S3Config,
119
120 customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
121
122 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
123 http_client: Option<HttpClient>,
124}
125
126impl Debug for S3Builder {
127 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
128 let mut d = f.debug_struct("S3Builder");
129
130 d.field("config", &self.config);
131 d.finish_non_exhaustive()
132 }
133}
134
135impl S3Builder {
136 pub fn root(mut self, root: &str) -> Self {
140 self.config.root = if root.is_empty() {
141 None
142 } else {
143 Some(root.to_string())
144 };
145
146 self
147 }
148
149 pub fn bucket(mut self, bucket: &str) -> Self {
151 self.config.bucket = bucket.to_string();
152
153 self
154 }
155
156 pub fn endpoint(mut self, endpoint: &str) -> Self {
169 if !endpoint.is_empty() {
170 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
172 }
173
174 self
175 }
176
177 pub fn region(mut self, region: &str) -> Self {
184 if !region.is_empty() {
185 self.config.region = Some(region.to_string())
186 }
187
188 self
189 }
190
191 pub fn access_key_id(mut self, v: &str) -> Self {
196 if !v.is_empty() {
197 self.config.access_key_id = Some(v.to_string())
198 }
199
200 self
201 }
202
203 pub fn secret_access_key(mut self, v: &str) -> Self {
208 if !v.is_empty() {
209 self.config.secret_access_key = Some(v.to_string())
210 }
211
212 self
213 }
214
215 pub fn role_arn(mut self, v: &str) -> Self {
220 if !v.is_empty() {
221 self.config.role_arn = Some(v.to_string())
222 }
223
224 self
225 }
226
227 pub fn external_id(mut self, v: &str) -> Self {
229 if !v.is_empty() {
230 self.config.external_id = Some(v.to_string())
231 }
232
233 self
234 }
235
236 pub fn role_session_name(mut self, v: &str) -> Self {
238 if !v.is_empty() {
239 self.config.role_session_name = Some(v.to_string())
240 }
241
242 self
243 }
244
245 pub fn default_storage_class(mut self, v: &str) -> Self {
258 if !v.is_empty() {
259 self.config.default_storage_class = Some(v.to_string())
260 }
261
262 self
263 }
264
265 pub fn server_side_encryption(mut self, v: &str) -> Self {
276 if !v.is_empty() {
277 self.config.server_side_encryption = Some(v.to_string())
278 }
279
280 self
281 }
282
283 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
300 if !v.is_empty() {
301 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
302 }
303
304 self
305 }
306
307 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
318 if !v.is_empty() {
319 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
320 }
321
322 self
323 }
324
325 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
339 if !v.is_empty() {
340 self.config.server_side_encryption_customer_key = Some(v.to_string())
341 }
342
343 self
344 }
345
346 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
359 if !v.is_empty() {
360 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
361 }
362
363 self
364 }
365
366 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
372 self.config.server_side_encryption = Some("aws:kms".to_string());
373 self
374 }
375
376 pub fn server_side_encryption_with_customer_managed_kms_key(
382 mut self,
383 aws_kms_key_id: &str,
384 ) -> Self {
385 self.config.server_side_encryption = Some("aws:kms".to_string());
386 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
387 self
388 }
389
390 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
396 self.config.server_side_encryption = Some("AES256".to_string());
397 self
398 }
399
400 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
406 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
407 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
408 self.config.server_side_encryption_customer_key_md5 =
409 Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
410 self
411 }
412
413 pub fn session_token(mut self, token: &str) -> Self {
419 if !token.is_empty() {
420 self.config.session_token = Some(token.to_string());
421 }
422 self
423 }
424
425 #[deprecated(note = "Please use `session_token` instead")]
427 pub fn security_token(self, token: &str) -> Self {
428 self.session_token(token)
429 }
430
431 pub fn disable_config_load(mut self) -> Self {
439 self.config.disable_config_load = true;
440 self
441 }
442
443 pub fn disable_list_objects_v2(mut self) -> Self {
449 self.config.disable_list_objects_v2 = true;
450 self
451 }
452
453 pub fn enable_request_payer(mut self) -> Self {
457 self.config.enable_request_payer = true;
458 self
459 }
460
461 pub fn disable_ec2_metadata(mut self) -> Self {
466 self.config.disable_ec2_metadata = true;
467 self
468 }
469
470 pub fn allow_anonymous(mut self) -> Self {
473 self.config.allow_anonymous = true;
474 self
475 }
476
477 pub fn enable_virtual_host_style(mut self) -> Self {
483 self.config.enable_virtual_host_style = true;
484 self
485 }
486
487 pub fn disable_stat_with_override(mut self) -> Self {
491 self.config.disable_stat_with_override = true;
492 self
493 }
494
495 pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
500 self.customized_credential_load = Some(cred);
501 self
502 }
503
504 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
511 #[allow(deprecated)]
512 pub fn http_client(mut self, client: HttpClient) -> Self {
513 self.http_client = Some(client);
514 self
515 }
516
517 pub fn enable_versioning(mut self, enabled: bool) -> Self {
519 self.config.enable_versioning = enabled;
520
521 self
522 }
523
524 fn is_bucket_valid(&self) -> bool {
528 if self.config.bucket.is_empty() {
529 return false;
530 }
531 if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
535 return false;
536 }
537 true
538 }
539
540 fn build_endpoint(&self, region: &str) -> String {
542 let bucket = {
543 debug_assert!(self.is_bucket_valid(), "bucket must be valid");
544
545 self.config.bucket.as_str()
546 };
547
548 let mut endpoint = match &self.config.endpoint {
549 Some(endpoint) => {
550 if endpoint.starts_with("http") {
551 endpoint.to_string()
552 } else {
553 format!("https://{endpoint}")
555 }
556 }
557 None => "https://s3.amazonaws.com".to_string(),
558 };
559
560 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
562
563 if let Ok(url) = Url::from_str(&endpoint) {
565 endpoint = url.to_string().trim_end_matches('/').to_string();
567 }
568
569 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
571 template.replace("{region}", region)
572 } else {
573 endpoint.to_string()
576 };
577
578 if self.config.enable_virtual_host_style {
580 endpoint = endpoint.replace("//", &format!("//{bucket}."))
581 } else {
582 write!(endpoint, "/{bucket}").expect("write into string must succeed");
583 };
584
585 endpoint
586 }
587
588 #[deprecated(
590 since = "0.52.0",
591 note = "Please use `delete_max_size` instead of `batch_max_operations`"
592 )]
593 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
594 self.config.delete_max_size = Some(batch_max_operations);
595
596 self
597 }
598
599 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
601 self.config.delete_max_size = Some(delete_max_size);
602
603 self
604 }
605
606 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
612 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
613
614 self
615 }
616
617 pub fn disable_write_with_if_match(mut self) -> Self {
619 self.config.disable_write_with_if_match = true;
620 self
621 }
622
623 pub fn enable_write_with_append(mut self) -> Self {
625 self.config.enable_write_with_append = true;
626 self
627 }
628
629 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
665 let endpoint = endpoint.trim_end_matches('/');
667
668 let mut endpoint = if endpoint.starts_with("http") {
670 endpoint.to_string()
671 } else {
672 format!("https://{endpoint}")
674 };
675
676 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
678 let url = format!("{endpoint}/{bucket}");
679
680 debug!("detect region with url: {url}");
681
682 if endpoint.ends_with("r2.cloudflarestorage.com") {
688 return Some("auto".to_string());
689 }
690
691 if let Some(v) = endpoint.strip_prefix("https://s3.") {
693 if let Some(region) = v.strip_suffix(".amazonaws.com") {
694 return Some(region.to_string());
695 }
696 }
697
698 if let Some(v) = endpoint.strip_prefix("https://") {
703 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
704 return Some(region.to_string());
705 }
706
707 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
708 return Some(region.to_string());
709 }
710 }
711
712 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
714
715 let client = HttpClient::new().ok()?;
716 let res = client
717 .send(req)
718 .await
719 .map_err(|err| warn!("detect region failed for: {err:?}"))
720 .ok()?;
721
722 debug!(
723 "auto detect region got response: status {:?}, header: {:?}",
724 res.status(),
725 res.headers()
726 );
727
728 if let Some(header) = res.headers().get("x-amz-bucket-region") {
730 if let Ok(regin) = header.to_str() {
731 return Some(regin.to_string());
732 }
733 }
734
735 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
738 return Some("us-east-1".to_string());
739 }
740
741 None
742 }
743}
744
745impl Builder for S3Builder {
746 type Config = S3Config;
747
748 fn build(mut self) -> Result<impl Access> {
749 debug!("backend build started: {:?}", &self);
750
751 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
752 debug!("backend use root {}", &root);
753
754 let bucket = if self.is_bucket_valid() {
756 Ok(&self.config.bucket)
757 } else {
758 Err(
759 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
760 .with_context("service", Scheme::S3),
761 )
762 }?;
763 debug!("backend use bucket {}", &bucket);
764
765 let default_storage_class = match &self.config.default_storage_class {
766 None => None,
767 Some(v) => Some(
768 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
769 ),
770 };
771
772 let server_side_encryption = match &self.config.server_side_encryption {
773 None => None,
774 Some(v) => Some(
775 build_header_value(v)
776 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
777 ),
778 };
779
780 let server_side_encryption_aws_kms_key_id =
781 match &self.config.server_side_encryption_aws_kms_key_id {
782 None => None,
783 Some(v) => Some(build_header_value(v).map_err(|err| {
784 err.with_context("key", "server_side_encryption_aws_kms_key_id")
785 })?),
786 };
787
788 let server_side_encryption_customer_algorithm =
789 match &self.config.server_side_encryption_customer_algorithm {
790 None => None,
791 Some(v) => Some(build_header_value(v).map_err(|err| {
792 err.with_context("key", "server_side_encryption_customer_algorithm")
793 })?),
794 };
795
796 let server_side_encryption_customer_key =
797 match &self.config.server_side_encryption_customer_key {
798 None => None,
799 Some(v) => Some(build_header_value(v).map_err(|err| {
800 err.with_context("key", "server_side_encryption_customer_key")
801 })?),
802 };
803
804 let server_side_encryption_customer_key_md5 =
805 match &self.config.server_side_encryption_customer_key_md5 {
806 None => None,
807 Some(v) => Some(build_header_value(v).map_err(|err| {
808 err.with_context("key", "server_side_encryption_customer_key_md5")
809 })?),
810 };
811
812 let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
813 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
814 None => None,
815 v => {
816 return Err(Error::new(
817 ErrorKind::ConfigInvalid,
818 format!("{v:?} is not a supported checksum_algorithm."),
819 ))
820 }
821 };
822
823 let mut cfg = AwsConfig::default();
825 if !self.config.disable_config_load {
826 #[cfg(not(target_arch = "wasm32"))]
827 {
828 cfg = cfg.from_profile();
829 cfg = cfg.from_env();
830 }
831 }
832
833 if let Some(ref v) = self.config.region {
834 cfg.region = Some(v.to_string());
835 }
836
837 if cfg.region.is_none() {
838 return Err(Error::new(
839 ErrorKind::ConfigInvalid,
840 "region is missing. Please find it by S3::detect_region() or set them in env.",
841 )
842 .with_operation("Builder::build")
843 .with_context("service", Scheme::S3));
844 }
845
846 let region = cfg.region.to_owned().unwrap();
847 debug!("backend use region: {region}");
848
849 self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
851
852 let endpoint = self.build_endpoint(®ion);
854 debug!("backend use endpoint: {endpoint}");
855
856 if let Some(v) = self.config.access_key_id {
858 cfg.access_key_id = Some(v)
859 }
860 if let Some(v) = self.config.secret_access_key {
861 cfg.secret_access_key = Some(v)
862 }
863 if let Some(v) = self.config.session_token {
864 cfg.session_token = Some(v)
865 }
866
867 let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
868 if let Some(v) = self.customized_credential_load {
870 loader = Some(v);
871 }
872
873 if let Some(role_arn) = self.config.role_arn {
875 let default_loader =
877 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
878
879 let mut assume_role_cfg = AwsConfig {
881 region: Some(region.clone()),
882 role_arn: Some(role_arn),
883 external_id: self.config.external_id.clone(),
884 sts_regional_endpoints: "regional".to_string(),
885 ..Default::default()
886 };
887
888 if let Some(name) = self.config.role_session_name {
890 assume_role_cfg.role_session_name = name;
891 }
892
893 let assume_role_loader = AwsAssumeRoleLoader::new(
894 GLOBAL_REQWEST_CLIENT.clone().clone(),
895 assume_role_cfg,
896 Box::new(default_loader),
897 )
898 .map_err(|err| {
899 Error::new(
900 ErrorKind::ConfigInvalid,
901 "The assume_role_loader is misconfigured",
902 )
903 .with_context("service", Scheme::S3)
904 .set_source(err)
905 })?;
906 loader = Some(Box::new(assume_role_loader));
907 }
908 let loader = match loader {
910 Some(v) => v,
911 None => {
912 let mut default_loader =
913 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
914 if self.config.disable_ec2_metadata {
915 default_loader = default_loader.with_disable_ec2_metadata();
916 }
917
918 Box::new(default_loader)
919 }
920 };
921
922 let signer = AwsV4Signer::new("s3", ®ion);
923
924 let delete_max_size = self
925 .config
926 .delete_max_size
927 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
928
929 Ok(S3Backend {
930 core: Arc::new(S3Core {
931 info: {
932 let am = AccessorInfo::default();
933 am.set_scheme(S3_SCHEME)
934 .set_root(&root)
935 .set_name(bucket)
936 .set_native_capability(Capability {
937 stat: true,
938 stat_with_if_match: true,
939 stat_with_if_none_match: true,
940 stat_with_if_modified_since: true,
941 stat_with_if_unmodified_since: true,
942 stat_with_override_cache_control: !self
943 .config
944 .disable_stat_with_override,
945 stat_with_override_content_disposition: !self
946 .config
947 .disable_stat_with_override,
948 stat_with_override_content_type: !self
949 .config
950 .disable_stat_with_override,
951 stat_with_version: self.config.enable_versioning,
952
953 read: true,
954 read_with_if_match: true,
955 read_with_if_none_match: true,
956 read_with_if_modified_since: true,
957 read_with_if_unmodified_since: true,
958 read_with_override_cache_control: true,
959 read_with_override_content_disposition: true,
960 read_with_override_content_type: true,
961 read_with_version: self.config.enable_versioning,
962
963 write: true,
964 write_can_empty: true,
965 write_can_multi: true,
966 write_can_append: self.config.enable_write_with_append,
967
968 write_with_cache_control: true,
969 write_with_content_type: true,
970 write_with_content_encoding: true,
971 write_with_if_match: !self.config.disable_write_with_if_match,
972 write_with_if_not_exists: true,
973 write_with_user_metadata: true,
974
975 write_multi_min_size: Some(5 * 1024 * 1024),
979 write_multi_max_size: if cfg!(target_pointer_width = "64") {
983 Some(5 * 1024 * 1024 * 1024)
984 } else {
985 Some(usize::MAX)
986 },
987
988 delete: true,
989 delete_max_size: Some(delete_max_size),
990 delete_with_version: self.config.enable_versioning,
991
992 copy: true,
993
994 list: true,
995 list_with_limit: true,
996 list_with_start_after: true,
997 list_with_recursive: true,
998 list_with_versions: self.config.enable_versioning,
999 list_with_deleted: self.config.enable_versioning,
1000
1001 presign: true,
1002 presign_stat: true,
1003 presign_read: true,
1004 presign_write: true,
1005
1006 shared: true,
1007
1008 ..Default::default()
1009 });
1010
1011 #[allow(deprecated)]
1013 if let Some(client) = self.http_client {
1014 am.update_http_client(|_| client);
1015 }
1016
1017 am.into()
1018 },
1019 bucket: bucket.to_string(),
1020 endpoint,
1021 root,
1022 server_side_encryption,
1023 server_side_encryption_aws_kms_key_id,
1024 server_side_encryption_customer_algorithm,
1025 server_side_encryption_customer_key,
1026 server_side_encryption_customer_key_md5,
1027 default_storage_class,
1028 allow_anonymous: self.config.allow_anonymous,
1029 disable_list_objects_v2: self.config.disable_list_objects_v2,
1030 enable_request_payer: self.config.enable_request_payer,
1031 signer,
1032 loader,
1033 credential_loaded: AtomicBool::new(false),
1034 checksum_algorithm,
1035 }),
1036 })
1037 }
1038}
1039
1040#[derive(Debug, Clone)]
1042pub struct S3Backend {
1043 core: Arc<S3Core>,
1044}
1045
1046impl Access for S3Backend {
1047 type Reader = HttpBody;
1048 type Writer = S3Writers;
1049 type Lister = S3Listers;
1050 type Deleter = oio::BatchDeleter<S3Deleter>;
1051
1052 fn info(&self) -> Arc<AccessorInfo> {
1053 self.core.info.clone()
1054 }
1055
1056 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1057 let resp = self.core.s3_head_object(path, args).await?;
1058
1059 let status = resp.status();
1060
1061 match status {
1062 StatusCode::OK => {
1063 let headers = resp.headers();
1064 let mut meta = parse_into_metadata(path, headers)?;
1065
1066 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1067 if !user_meta.is_empty() {
1068 meta = meta.with_user_metadata(user_meta);
1069 }
1070
1071 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1072 meta.set_version(v);
1073 }
1074
1075 Ok(RpStat::new(meta))
1076 }
1077 _ => Err(parse_error(resp)),
1078 }
1079 }
1080
1081 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1082 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1083
1084 let status = resp.status();
1085 match status {
1086 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1087 Ok((RpRead::default(), resp.into_body()))
1088 }
1089 _ => {
1090 let (part, mut body) = resp.into_parts();
1091 let buf = body.to_buffer().await?;
1092 Err(parse_error(Response::from_parts(part, buf)))
1093 }
1094 }
1095 }
1096
1097 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1098 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1099
1100 let w = if args.append() {
1101 S3Writers::Two(oio::AppendWriter::new(writer))
1102 } else {
1103 S3Writers::One(oio::MultipartWriter::new(
1104 self.core.info.clone(),
1105 writer,
1106 args.concurrent(),
1107 ))
1108 };
1109
1110 Ok((RpWrite::default(), w))
1111 }
1112
1113 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1114 Ok((
1115 RpDelete::default(),
1116 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1117 ))
1118 }
1119
1120 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1121 let l = if args.versions() || args.deleted() {
1122 ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1123 self.core.clone(),
1124 path,
1125 args,
1126 )))
1127 } else if self.core.disable_list_objects_v2 {
1128 ThreeWays::One(PageLister::new(S3ListerV1::new(
1129 self.core.clone(),
1130 path,
1131 args,
1132 )))
1133 } else {
1134 ThreeWays::Two(PageLister::new(S3ListerV2::new(
1135 self.core.clone(),
1136 path,
1137 args,
1138 )))
1139 };
1140
1141 Ok((RpList::default(), l))
1142 }
1143
1144 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1145 let resp = self.core.s3_copy_object(from, to).await?;
1146
1147 let status = resp.status();
1148
1149 match status {
1150 StatusCode::OK => Ok(RpCopy::default()),
1151 _ => Err(parse_error(resp)),
1152 }
1153 }
1154
1155 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1156 let (expire, op) = args.into_parts();
1157 let req = match op {
1159 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1160 PresignOperation::Read(v) => {
1161 self.core
1162 .s3_get_object_request(path, BytesRange::default(), &v)
1163 }
1164 PresignOperation::Write(_) => {
1165 self.core
1166 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1167 }
1168 PresignOperation::Delete(_) => Err(Error::new(
1169 ErrorKind::Unsupported,
1170 "operation is not supported",
1171 )),
1172 };
1173 let mut req = req?;
1174
1175 self.core.sign_query(&mut req, expire).await?;
1176
1177 let (parts, _) = req.into_parts();
1179
1180 Ok(RpPresign::new(PresignedRequest::new(
1181 parts.method,
1182 parts.uri,
1183 parts.headers,
1184 )))
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::*;
1191
1192 #[test]
1193 fn test_is_valid_bucket() {
1194 let bucket_cases = vec![
1195 ("", false, false),
1196 ("test", false, true),
1197 ("test.xyz", false, true),
1198 ("", true, false),
1199 ("test", true, true),
1200 ("test.xyz", true, false),
1201 ];
1202
1203 for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1204 let mut b = S3Builder::default();
1205 b = b.bucket(bucket);
1206 if enable_virtual_host_style {
1207 b = b.enable_virtual_host_style();
1208 }
1209 assert_eq!(b.is_bucket_valid(), expected)
1210 }
1211 }
1212
1213 #[test]
1214 fn test_build_endpoint() {
1215 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1216
1217 let endpoint_cases = vec![
1218 Some("s3.amazonaws.com"),
1219 Some("https://s3.amazonaws.com"),
1220 Some("https://s3.us-east-2.amazonaws.com"),
1221 None,
1222 ];
1223
1224 for endpoint in &endpoint_cases {
1225 let mut b = S3Builder::default().bucket("test");
1226 if let Some(endpoint) = endpoint {
1227 b = b.endpoint(endpoint);
1228 }
1229
1230 let endpoint = b.build_endpoint("us-east-2");
1231 assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1232 }
1233
1234 for endpoint in &endpoint_cases {
1235 let mut b = S3Builder::default()
1236 .bucket("test")
1237 .enable_virtual_host_style();
1238 if let Some(endpoint) = endpoint {
1239 b = b.endpoint(endpoint);
1240 }
1241
1242 let endpoint = b.build_endpoint("us-east-2");
1243 assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1244 }
1245 }
1246
1247 #[tokio::test]
1248 async fn test_detect_region() {
1249 let cases = vec![
1250 (
1251 "aws s3 without region in endpoint",
1252 "https://s3.amazonaws.com",
1253 "example",
1254 Some("us-east-1"),
1255 ),
1256 (
1257 "aws s3 with region in endpoint",
1258 "https://s3.us-east-1.amazonaws.com",
1259 "example",
1260 Some("us-east-1"),
1261 ),
1262 (
1263 "oss with public endpoint",
1264 "https://oss-ap-southeast-1.aliyuncs.com",
1265 "example",
1266 Some("oss-ap-southeast-1"),
1267 ),
1268 (
1269 "oss with internal endpoint",
1270 "https://oss-cn-hangzhou-internal.aliyuncs.com",
1271 "example",
1272 Some("oss-cn-hangzhou-internal"),
1273 ),
1274 (
1275 "r2",
1276 "https://abc.xxxxx.r2.cloudflarestorage.com",
1277 "example",
1278 Some("auto"),
1279 ),
1280 (
1281 "invalid service",
1282 "https://opendal.apache.org",
1283 "example",
1284 None,
1285 ),
1286 ];
1287
1288 for (name, endpoint, bucket, expected) in cases {
1289 let region = S3Builder::detect_region(endpoint, bucket).await;
1290 assert_eq!(region.as_deref(), expected, "{name}");
1291 }
1292 }
1293}