opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::sync::LazyLock;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use constants::X_AMZ_META_PREFIX;
30use constants::X_AMZ_VERSION_ID;
31use http::Response;
32use http::StatusCode;
33use http::Uri;
34use log::debug;
35use log::warn;
36use md5::Digest;
37use md5::Md5;
38use percent_encoding::percent_decode_str;
39use reqsign::AwsAssumeRoleLoader;
40use reqsign::AwsConfig;
41use reqsign::AwsCredentialLoad;
42use reqsign::AwsDefaultLoader;
43use reqsign::AwsV4Signer;
44use reqwest::Url;
45
46use super::core::*;
47use super::delete::S3Deleter;
48use super::error::parse_error;
49use super::lister::S3ListerV1;
50use super::lister::S3ListerV2;
51use super::lister::S3Listers;
52use super::lister::S3ObjectVersionsLister;
53use super::writer::S3Writer;
54use super::writer::S3Writers;
55use super::S3_SCHEME;
56use crate::raw::oio::PageLister;
57use crate::raw::*;
58use crate::services::S3Config;
59use crate::*;
60
61/// Allow constructing correct region endpoint if user gives a global endpoint.
62static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
63    let mut m = HashMap::new();
64    // AWS S3 Service.
65    m.insert(
66        "https://s3.amazonaws.com",
67        "https://s3.{region}.amazonaws.com",
68    );
69    m
70});
71
72const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
73
74impl Configurator for S3Config {
75    type Builder = S3Builder;
76
77    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
78        let mut map = options.clone();
79
80        let bucket_missing = map.get("bucket").map(|v| v.is_empty()).unwrap_or(true);
81        if bucket_missing {
82            let bucket = uri
83                .authority()
84                .map(|authority| authority.host())
85                .filter(|host| !host.is_empty())
86                .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri requires bucket"))?;
87            map.insert("bucket".to_string(), bucket.to_string());
88        }
89
90        if !map.contains_key("root") {
91            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
92            let trimmed = path.trim_matches('/');
93            if !trimmed.is_empty() {
94                map.insert("root".to_string(), trimmed.to_string());
95            }
96        }
97
98        Self::from_iter(map)
99    }
100
101    #[allow(deprecated)]
102    fn into_builder(self) -> Self::Builder {
103        S3Builder {
104            config: self,
105            customized_credential_load: None,
106
107            http_client: None,
108        }
109    }
110}
111
112/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
113/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
114#[doc = include_str!("docs.md")]
115#[doc = include_str!("compatible_services.md")]
116#[derive(Default)]
117pub struct S3Builder {
118    config: S3Config,
119
120    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
121
122    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
123    http_client: Option<HttpClient>,
124}
125
126impl Debug for S3Builder {
127    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
128        let mut d = f.debug_struct("S3Builder");
129
130        d.field("config", &self.config);
131        d.finish_non_exhaustive()
132    }
133}
134
135impl S3Builder {
136    /// Set root of this backend.
137    ///
138    /// All operations will happen under this root.
139    pub fn root(mut self, root: &str) -> Self {
140        self.config.root = if root.is_empty() {
141            None
142        } else {
143            Some(root.to_string())
144        };
145
146        self
147    }
148
149    /// Set bucket name of this backend.
150    pub fn bucket(mut self, bucket: &str) -> Self {
151        self.config.bucket = bucket.to_string();
152
153        self
154    }
155
156    /// Set endpoint of this backend.
157    ///
158    /// Endpoint must be full uri, e.g.
159    ///
160    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
161    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
162    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
163    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
164    /// - Minio: `http://127.0.0.1:9000`
165    ///
166    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
167    /// will prepend "https://" before it.
168    pub fn endpoint(mut self, endpoint: &str) -> Self {
169        if !endpoint.is_empty() {
170            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
171            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
172        }
173
174        self
175    }
176
177    /// Region represent the signing region of this endpoint. This is required
178    /// if you are using the default AWS S3 endpoint.
179    ///
180    /// If using a custom endpoint,
181    /// - If region is set, we will take user's input first.
182    /// - If not, we will try to load it from environment.
183    pub fn region(mut self, region: &str) -> Self {
184        if !region.is_empty() {
185            self.config.region = Some(region.to_string())
186        }
187
188        self
189    }
190
191    /// Set access_key_id of this backend.
192    ///
193    /// - If access_key_id is set, we will take user's input first.
194    /// - If not, we will try to load it from environment.
195    pub fn access_key_id(mut self, v: &str) -> Self {
196        if !v.is_empty() {
197            self.config.access_key_id = Some(v.to_string())
198        }
199
200        self
201    }
202
203    /// Set secret_access_key of this backend.
204    ///
205    /// - If secret_access_key is set, we will take user's input first.
206    /// - If not, we will try to load it from environment.
207    pub fn secret_access_key(mut self, v: &str) -> Self {
208        if !v.is_empty() {
209            self.config.secret_access_key = Some(v.to_string())
210        }
211
212        self
213    }
214
215    /// Set role_arn for this backend.
216    ///
217    /// If `role_arn` is set, we will use already known config as source
218    /// credential to assume role with `role_arn`.
219    pub fn role_arn(mut self, v: &str) -> Self {
220        if !v.is_empty() {
221            self.config.role_arn = Some(v.to_string())
222        }
223
224        self
225    }
226
227    /// Set external_id for this backend.
228    pub fn external_id(mut self, v: &str) -> Self {
229        if !v.is_empty() {
230            self.config.external_id = Some(v.to_string())
231        }
232
233        self
234    }
235
236    /// Set role_session_name for this backend.
237    pub fn role_session_name(mut self, v: &str) -> Self {
238        if !v.is_empty() {
239            self.config.role_session_name = Some(v.to_string())
240        }
241
242        self
243    }
244
245    /// Set default storage_class for this backend.
246    ///
247    /// Available values:
248    /// - `DEEP_ARCHIVE`
249    /// - `GLACIER`
250    /// - `GLACIER_IR`
251    /// - `INTELLIGENT_TIERING`
252    /// - `ONEZONE_IA`
253    /// - `OUTPOSTS`
254    /// - `REDUCED_REDUNDANCY`
255    /// - `STANDARD`
256    /// - `STANDARD_IA`
257    pub fn default_storage_class(mut self, v: &str) -> Self {
258        if !v.is_empty() {
259            self.config.default_storage_class = Some(v.to_string())
260        }
261
262        self
263    }
264
265    /// Set server_side_encryption for this backend.
266    ///
267    /// Available values: `AES256`, `aws:kms`.
268    ///
269    /// # Note
270    ///
271    /// This function is the low-level setting for SSE related features.
272    ///
273    /// SSE related options should be set carefully to make them works.
274    /// Please use `server_side_encryption_with_*` helpers if even possible.
275    pub fn server_side_encryption(mut self, v: &str) -> Self {
276        if !v.is_empty() {
277            self.config.server_side_encryption = Some(v.to_string())
278        }
279
280        self
281    }
282
283    /// Set server_side_encryption_aws_kms_key_id for this backend
284    ///
285    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
286    ///   is not set, S3 will use aws managed kms key to encrypt data.
287    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
288    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
289    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
290    ///   returned.
291    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
292    ///
293    /// # Note
294    ///
295    /// This function is the low-level setting for SSE related features.
296    ///
297    /// SSE related options should be set carefully to make them works.
298    /// Please use `server_side_encryption_with_*` helpers if even possible.
299    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
300        if !v.is_empty() {
301            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
302        }
303
304        self
305    }
306
307    /// Set server_side_encryption_customer_algorithm for this backend.
308    ///
309    /// Available values: `AES256`.
310    ///
311    /// # Note
312    ///
313    /// This function is the low-level setting for SSE related features.
314    ///
315    /// SSE related options should be set carefully to make them works.
316    /// Please use `server_side_encryption_with_*` helpers if even possible.
317    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
318        if !v.is_empty() {
319            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
320        }
321
322        self
323    }
324
325    /// Set server_side_encryption_customer_key for this backend.
326    ///
327    /// # Args
328    ///
329    /// `v`: base64 encoded key that matches algorithm specified in
330    /// `server_side_encryption_customer_algorithm`.
331    ///
332    /// # Note
333    ///
334    /// This function is the low-level setting for SSE related features.
335    ///
336    /// SSE related options should be set carefully to make them works.
337    /// Please use `server_side_encryption_with_*` helpers if even possible.
338    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
339        if !v.is_empty() {
340            self.config.server_side_encryption_customer_key = Some(v.to_string())
341        }
342
343        self
344    }
345
346    /// Set server_side_encryption_customer_key_md5 for this backend.
347    ///
348    /// # Args
349    ///
350    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
351    ///
352    /// # Note
353    ///
354    /// This function is the low-level setting for SSE related features.
355    ///
356    /// SSE related options should be set carefully to make them works.
357    /// Please use `server_side_encryption_with_*` helpers if even possible.
358    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
359        if !v.is_empty() {
360            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
361        }
362
363        self
364    }
365
366    /// Enable server side encryption with aws managed kms key
367    ///
368    /// As known as: SSE-KMS
369    ///
370    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
371    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
372        self.config.server_side_encryption = Some("aws:kms".to_string());
373        self
374    }
375
376    /// Enable server side encryption with customer managed kms key
377    ///
378    /// As known as: SSE-KMS
379    ///
380    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
381    pub fn server_side_encryption_with_customer_managed_kms_key(
382        mut self,
383        aws_kms_key_id: &str,
384    ) -> Self {
385        self.config.server_side_encryption = Some("aws:kms".to_string());
386        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
387        self
388    }
389
390    /// Enable server side encryption with s3 managed key
391    ///
392    /// As known as: SSE-S3
393    ///
394    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
395    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
396        self.config.server_side_encryption = Some("AES256".to_string());
397        self
398    }
399
400    /// Enable server side encryption with customer key.
401    ///
402    /// As known as: SSE-C
403    ///
404    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
405    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
406        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
407        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
408        self.config.server_side_encryption_customer_key_md5 =
409            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
410        self
411    }
412
413    /// Set temporary credential used in AWS S3 connections
414    ///
415    /// # Warning
416    ///
417    /// session token's lifetime is short and requires users to refresh in time.
418    pub fn session_token(mut self, token: &str) -> Self {
419        if !token.is_empty() {
420            self.config.session_token = Some(token.to_string());
421        }
422        self
423    }
424
425    /// Set temporary credential used in AWS S3 connections
426    #[deprecated(note = "Please use `session_token` instead")]
427    pub fn security_token(self, token: &str) -> Self {
428        self.session_token(token)
429    }
430
431    /// Disable config load so that opendal will not load config from
432    /// environment.
433    ///
434    /// For examples:
435    ///
436    /// - envs like `AWS_ACCESS_KEY_ID`
437    /// - files like `~/.aws/config`
438    pub fn disable_config_load(mut self) -> Self {
439        self.config.disable_config_load = true;
440        self
441    }
442
443    /// Disable list objects v2 so that opendal will not use the older
444    /// List Objects V1 to list objects.
445    ///
446    /// By default, OpenDAL uses List Objects V2 to list objects. However,
447    /// some legacy services do not yet support V2.
448    pub fn disable_list_objects_v2(mut self) -> Self {
449        self.config.disable_list_objects_v2 = true;
450        self
451    }
452
453    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
454    ///
455    /// With this option the client accepts to pay for the request and data transfer costs.
456    pub fn enable_request_payer(mut self) -> Self {
457        self.config.enable_request_payer = true;
458        self
459    }
460
461    /// Disable load credential from ec2 metadata.
462    ///
463    /// This option is used to disable the default behavior of opendal
464    /// to load credential from ec2 metadata, a.k.a, IMDSv2
465    pub fn disable_ec2_metadata(mut self) -> Self {
466        self.config.disable_ec2_metadata = true;
467        self
468    }
469
470    /// Allow anonymous will allow opendal to send request without signing
471    /// when credential is not loaded.
472    pub fn allow_anonymous(mut self) -> Self {
473        self.config.allow_anonymous = true;
474        self
475    }
476
477    /// Enable virtual host style so that opendal will send API requests
478    /// in virtual host style instead of path style.
479    ///
480    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
481    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
482    pub fn enable_virtual_host_style(mut self) -> Self {
483        self.config.enable_virtual_host_style = true;
484        self
485    }
486
487    /// Disable stat with override so that opendal will not send stat request with override queries.
488    ///
489    /// For example, R2 doesn't support stat with `response_content_type` query.
490    pub fn disable_stat_with_override(mut self) -> Self {
491        self.config.disable_stat_with_override = true;
492        self
493    }
494
495    /// Adding a customized credential load for service.
496    ///
497    /// If customized_credential_load has been set, we will ignore all other
498    /// credential load methods.
499    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
500        self.customized_credential_load = Some(cred);
501        self
502    }
503
504    /// Specify the http client that used by this service.
505    ///
506    /// # Notes
507    ///
508    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
509    /// during minor updates.
510    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
511    #[allow(deprecated)]
512    pub fn http_client(mut self, client: HttpClient) -> Self {
513        self.http_client = Some(client);
514        self
515    }
516
517    /// Set bucket versioning status for this backend
518    pub fn enable_versioning(mut self, enabled: bool) -> Self {
519        self.config.enable_versioning = enabled;
520
521        self
522    }
523
524    /// Check if `bucket` is valid
525    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
526    /// it couldn't contain dot(.) character
527    fn is_bucket_valid(&self) -> bool {
528        if self.config.bucket.is_empty() {
529            return false;
530        }
531        // If enable virtual host style, `bucket` will reside in domain part,
532        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
533        // so `bucket` with dot can't be recognized correctly for this format.
534        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
535            return false;
536        }
537        true
538    }
539
540    /// Build endpoint with given region.
541    fn build_endpoint(&self, region: &str) -> String {
542        let bucket = {
543            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
544
545            self.config.bucket.as_str()
546        };
547
548        let mut endpoint = match &self.config.endpoint {
549            Some(endpoint) => {
550                if endpoint.starts_with("http") {
551                    endpoint.to_string()
552                } else {
553                    // Prefix https if endpoint doesn't start with scheme.
554                    format!("https://{endpoint}")
555                }
556            }
557            None => "https://s3.amazonaws.com".to_string(),
558        };
559
560        // If endpoint contains bucket name, we should trim them.
561        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
562
563        // Omit default ports if specified.
564        if let Ok(url) = Url::from_str(&endpoint) {
565            // Remove the trailing `/` of root path.
566            endpoint = url.to_string().trim_end_matches('/').to_string();
567        }
568
569        // Update with endpoint templates.
570        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
571            template.replace("{region}", region)
572        } else {
573            // If we don't know where about this endpoint, just leave
574            // them as it.
575            endpoint.to_string()
576        };
577
578        // Apply virtual host style.
579        if self.config.enable_virtual_host_style {
580            endpoint = endpoint.replace("//", &format!("//{bucket}."))
581        } else {
582            write!(endpoint, "/{bucket}").expect("write into string must succeed");
583        };
584
585        endpoint
586    }
587
588    /// Set maximum batch operations of this backend.
589    #[deprecated(
590        since = "0.52.0",
591        note = "Please use `delete_max_size` instead of `batch_max_operations`"
592    )]
593    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
594        self.config.delete_max_size = Some(batch_max_operations);
595
596        self
597    }
598
599    /// Set maximum delete operations of this backend.
600    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
601        self.config.delete_max_size = Some(delete_max_size);
602
603        self
604    }
605
606    /// Set checksum algorithm of this backend.
607    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
608    ///
609    /// Available options:
610    /// - "crc32c"
611    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
612        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
613
614        self
615    }
616
617    /// Disable write with if match so that opendal will not send write request with if match headers.
618    pub fn disable_write_with_if_match(mut self) -> Self {
619        self.config.disable_write_with_if_match = true;
620        self
621    }
622
623    /// Enable write with append so that opendal will send write request with append headers.
624    pub fn enable_write_with_append(mut self) -> Self {
625        self.config.enable_write_with_append = true;
626        self
627    }
628
629    /// Detect region of S3 bucket.
630    ///
631    /// # Args
632    ///
633    /// - endpoint: the endpoint of S3 service
634    /// - bucket: the bucket of S3 service
635    ///
636    /// # Return
637    ///
638    /// - `Some(region)` means we detect the region successfully
639    /// - `None` means we can't detect the region or meeting errors.
640    ///
641    /// # Notes
642    ///
643    /// We will try to detect region by the following methods.
644    ///
645    /// - Match endpoint with given rules to get region
646    ///   - Cloudflare R2
647    ///   - AWS S3
648    ///   - Aliyun OSS
649    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
650    ///
651    /// # Examples
652    ///
653    /// ```no_run
654    /// use opendal::services::S3;
655    ///
656    /// # async fn example() {
657    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
658    /// # }
659    /// ```
660    ///
661    /// # Reference
662    ///
663    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
664    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
665        // Remove the possible trailing `/` in endpoint.
666        let endpoint = endpoint.trim_end_matches('/');
667
668        // Make sure the endpoint contains the scheme.
669        let mut endpoint = if endpoint.starts_with("http") {
670            endpoint.to_string()
671        } else {
672            // Prefix https if endpoint doesn't start with scheme.
673            format!("https://{endpoint}")
674        };
675
676        // Remove bucket name from endpoint.
677        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
678        let url = format!("{endpoint}/{bucket}");
679
680        debug!("detect region with url: {url}");
681
682        // Try to detect region by endpoint.
683
684        // If this bucket is R2, we can return auto directly.
685        //
686        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
687        if endpoint.ends_with("r2.cloudflarestorage.com") {
688            return Some("auto".to_string());
689        }
690
691        // If this bucket is AWS, we can try to match the endpoint.
692        if let Some(v) = endpoint.strip_prefix("https://s3.") {
693            if let Some(region) = v.strip_suffix(".amazonaws.com") {
694                return Some(region.to_string());
695            }
696        }
697
698        // If this bucket is OSS, we can try to match the endpoint.
699        //
700        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
701        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
702        if let Some(v) = endpoint.strip_prefix("https://") {
703            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
704                return Some(region.to_string());
705            }
706
707            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
708                return Some(region.to_string());
709            }
710        }
711
712        // Try to detect region by HeadBucket.
713        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
714
715        let client = HttpClient::new().ok()?;
716        let res = client
717            .send(req)
718            .await
719            .map_err(|err| warn!("detect region failed for: {err:?}"))
720            .ok()?;
721
722        debug!(
723            "auto detect region got response: status {:?}, header: {:?}",
724            res.status(),
725            res.headers()
726        );
727
728        // Get region from response header no matter status code.
729        if let Some(header) = res.headers().get("x-amz-bucket-region") {
730            if let Ok(regin) = header.to_str() {
731                return Some(regin.to_string());
732            }
733        }
734
735        // Status code is 403 or 200 means we already visit the correct
736        // region, we can use the default region directly.
737        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
738            return Some("us-east-1".to_string());
739        }
740
741        None
742    }
743}
744
745impl Builder for S3Builder {
746    type Config = S3Config;
747
748    fn build(mut self) -> Result<impl Access> {
749        debug!("backend build started: {:?}", &self);
750
751        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
752        debug!("backend use root {}", &root);
753
754        // Handle bucket name.
755        let bucket = if self.is_bucket_valid() {
756            Ok(&self.config.bucket)
757        } else {
758            Err(
759                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
760                    .with_context("service", Scheme::S3),
761            )
762        }?;
763        debug!("backend use bucket {}", &bucket);
764
765        let default_storage_class = match &self.config.default_storage_class {
766            None => None,
767            Some(v) => Some(
768                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
769            ),
770        };
771
772        let server_side_encryption = match &self.config.server_side_encryption {
773            None => None,
774            Some(v) => Some(
775                build_header_value(v)
776                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
777            ),
778        };
779
780        let server_side_encryption_aws_kms_key_id =
781            match &self.config.server_side_encryption_aws_kms_key_id {
782                None => None,
783                Some(v) => Some(build_header_value(v).map_err(|err| {
784                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
785                })?),
786            };
787
788        let server_side_encryption_customer_algorithm =
789            match &self.config.server_side_encryption_customer_algorithm {
790                None => None,
791                Some(v) => Some(build_header_value(v).map_err(|err| {
792                    err.with_context("key", "server_side_encryption_customer_algorithm")
793                })?),
794            };
795
796        let server_side_encryption_customer_key =
797            match &self.config.server_side_encryption_customer_key {
798                None => None,
799                Some(v) => Some(build_header_value(v).map_err(|err| {
800                    err.with_context("key", "server_side_encryption_customer_key")
801                })?),
802            };
803
804        let server_side_encryption_customer_key_md5 =
805            match &self.config.server_side_encryption_customer_key_md5 {
806                None => None,
807                Some(v) => Some(build_header_value(v).map_err(|err| {
808                    err.with_context("key", "server_side_encryption_customer_key_md5")
809                })?),
810            };
811
812        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
813            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
814            None => None,
815            v => {
816                return Err(Error::new(
817                    ErrorKind::ConfigInvalid,
818                    format!("{v:?} is not a supported checksum_algorithm."),
819                ))
820            }
821        };
822
823        // This is our current config.
824        let mut cfg = AwsConfig::default();
825        if !self.config.disable_config_load {
826            #[cfg(not(target_arch = "wasm32"))]
827            {
828                cfg = cfg.from_profile();
829                cfg = cfg.from_env();
830            }
831        }
832
833        if let Some(ref v) = self.config.region {
834            cfg.region = Some(v.to_string());
835        }
836
837        if cfg.region.is_none() {
838            return Err(Error::new(
839                ErrorKind::ConfigInvalid,
840                "region is missing. Please find it by S3::detect_region() or set them in env.",
841            )
842            .with_operation("Builder::build")
843            .with_context("service", Scheme::S3));
844        }
845
846        let region = cfg.region.to_owned().unwrap();
847        debug!("backend use region: {region}");
848
849        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
850        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
851
852        // Building endpoint.
853        let endpoint = self.build_endpoint(&region);
854        debug!("backend use endpoint: {endpoint}");
855
856        // Setting all value from user input if available.
857        if let Some(v) = self.config.access_key_id {
858            cfg.access_key_id = Some(v)
859        }
860        if let Some(v) = self.config.secret_access_key {
861            cfg.secret_access_key = Some(v)
862        }
863        if let Some(v) = self.config.session_token {
864            cfg.session_token = Some(v)
865        }
866
867        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
868        // If customized_credential_load is set, we will use it.
869        if let Some(v) = self.customized_credential_load {
870            loader = Some(v);
871        }
872
873        // If role_arn is set, we must use AssumeRoleLoad.
874        if let Some(role_arn) = self.config.role_arn {
875            // use current env as source credential loader.
876            let default_loader =
877                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
878
879            // Build the config for assume role.
880            let mut assume_role_cfg = AwsConfig {
881                region: Some(region.clone()),
882                role_arn: Some(role_arn),
883                external_id: self.config.external_id.clone(),
884                sts_regional_endpoints: "regional".to_string(),
885                ..Default::default()
886            };
887
888            // override default role_session_name if set
889            if let Some(name) = self.config.role_session_name {
890                assume_role_cfg.role_session_name = name;
891            }
892
893            let assume_role_loader = AwsAssumeRoleLoader::new(
894                GLOBAL_REQWEST_CLIENT.clone().clone(),
895                assume_role_cfg,
896                Box::new(default_loader),
897            )
898            .map_err(|err| {
899                Error::new(
900                    ErrorKind::ConfigInvalid,
901                    "The assume_role_loader is misconfigured",
902                )
903                .with_context("service", Scheme::S3)
904                .set_source(err)
905            })?;
906            loader = Some(Box::new(assume_role_loader));
907        }
908        // If loader is not set, we will use default loader.
909        let loader = match loader {
910            Some(v) => v,
911            None => {
912                let mut default_loader =
913                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
914                if self.config.disable_ec2_metadata {
915                    default_loader = default_loader.with_disable_ec2_metadata();
916                }
917
918                Box::new(default_loader)
919            }
920        };
921
922        let signer = AwsV4Signer::new("s3", &region);
923
924        let delete_max_size = self
925            .config
926            .delete_max_size
927            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
928
929        Ok(S3Backend {
930            core: Arc::new(S3Core {
931                info: {
932                    let am = AccessorInfo::default();
933                    am.set_scheme(S3_SCHEME)
934                        .set_root(&root)
935                        .set_name(bucket)
936                        .set_native_capability(Capability {
937                            stat: true,
938                            stat_with_if_match: true,
939                            stat_with_if_none_match: true,
940                            stat_with_if_modified_since: true,
941                            stat_with_if_unmodified_since: true,
942                            stat_with_override_cache_control: !self
943                                .config
944                                .disable_stat_with_override,
945                            stat_with_override_content_disposition: !self
946                                .config
947                                .disable_stat_with_override,
948                            stat_with_override_content_type: !self
949                                .config
950                                .disable_stat_with_override,
951                            stat_with_version: self.config.enable_versioning,
952
953                            read: true,
954                            read_with_if_match: true,
955                            read_with_if_none_match: true,
956                            read_with_if_modified_since: true,
957                            read_with_if_unmodified_since: true,
958                            read_with_override_cache_control: true,
959                            read_with_override_content_disposition: true,
960                            read_with_override_content_type: true,
961                            read_with_version: self.config.enable_versioning,
962
963                            write: true,
964                            write_can_empty: true,
965                            write_can_multi: true,
966                            write_can_append: self.config.enable_write_with_append,
967
968                            write_with_cache_control: true,
969                            write_with_content_type: true,
970                            write_with_content_encoding: true,
971                            write_with_if_match: !self.config.disable_write_with_if_match,
972                            write_with_if_not_exists: true,
973                            write_with_user_metadata: true,
974
975                            // The min multipart size of S3 is 5 MiB.
976                            //
977                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
978                            write_multi_min_size: Some(5 * 1024 * 1024),
979                            // The max multipart size of S3 is 5 GiB.
980                            //
981                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
982                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
983                                Some(5 * 1024 * 1024 * 1024)
984                            } else {
985                                Some(usize::MAX)
986                            },
987
988                            delete: true,
989                            delete_max_size: Some(delete_max_size),
990                            delete_with_version: self.config.enable_versioning,
991
992                            copy: true,
993
994                            list: true,
995                            list_with_limit: true,
996                            list_with_start_after: true,
997                            list_with_recursive: true,
998                            list_with_versions: self.config.enable_versioning,
999                            list_with_deleted: self.config.enable_versioning,
1000
1001                            presign: true,
1002                            presign_stat: true,
1003                            presign_read: true,
1004                            presign_write: true,
1005
1006                            shared: true,
1007
1008                            ..Default::default()
1009                        });
1010
1011                    // allow deprecated api here for compatibility
1012                    #[allow(deprecated)]
1013                    if let Some(client) = self.http_client {
1014                        am.update_http_client(|_| client);
1015                    }
1016
1017                    am.into()
1018                },
1019                bucket: bucket.to_string(),
1020                endpoint,
1021                root,
1022                server_side_encryption,
1023                server_side_encryption_aws_kms_key_id,
1024                server_side_encryption_customer_algorithm,
1025                server_side_encryption_customer_key,
1026                server_side_encryption_customer_key_md5,
1027                default_storage_class,
1028                allow_anonymous: self.config.allow_anonymous,
1029                disable_list_objects_v2: self.config.disable_list_objects_v2,
1030                enable_request_payer: self.config.enable_request_payer,
1031                signer,
1032                loader,
1033                credential_loaded: AtomicBool::new(false),
1034                checksum_algorithm,
1035            }),
1036        })
1037    }
1038}
1039
1040/// Backend for s3 services.
1041#[derive(Debug, Clone)]
1042pub struct S3Backend {
1043    core: Arc<S3Core>,
1044}
1045
1046impl Access for S3Backend {
1047    type Reader = HttpBody;
1048    type Writer = S3Writers;
1049    type Lister = S3Listers;
1050    type Deleter = oio::BatchDeleter<S3Deleter>;
1051
1052    fn info(&self) -> Arc<AccessorInfo> {
1053        self.core.info.clone()
1054    }
1055
1056    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1057        let resp = self.core.s3_head_object(path, args).await?;
1058
1059        let status = resp.status();
1060
1061        match status {
1062            StatusCode::OK => {
1063                let headers = resp.headers();
1064                let mut meta = parse_into_metadata(path, headers)?;
1065
1066                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1067                if !user_meta.is_empty() {
1068                    meta = meta.with_user_metadata(user_meta);
1069                }
1070
1071                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1072                    meta.set_version(v);
1073                }
1074
1075                Ok(RpStat::new(meta))
1076            }
1077            _ => Err(parse_error(resp)),
1078        }
1079    }
1080
1081    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1082        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1083
1084        let status = resp.status();
1085        match status {
1086            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1087                Ok((RpRead::default(), resp.into_body()))
1088            }
1089            _ => {
1090                let (part, mut body) = resp.into_parts();
1091                let buf = body.to_buffer().await?;
1092                Err(parse_error(Response::from_parts(part, buf)))
1093            }
1094        }
1095    }
1096
1097    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1098        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1099
1100        let w = if args.append() {
1101            S3Writers::Two(oio::AppendWriter::new(writer))
1102        } else {
1103            S3Writers::One(oio::MultipartWriter::new(
1104                self.core.info.clone(),
1105                writer,
1106                args.concurrent(),
1107            ))
1108        };
1109
1110        Ok((RpWrite::default(), w))
1111    }
1112
1113    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1114        Ok((
1115            RpDelete::default(),
1116            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1117        ))
1118    }
1119
1120    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1121        let l = if args.versions() || args.deleted() {
1122            ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1123                self.core.clone(),
1124                path,
1125                args,
1126            )))
1127        } else if self.core.disable_list_objects_v2 {
1128            ThreeWays::One(PageLister::new(S3ListerV1::new(
1129                self.core.clone(),
1130                path,
1131                args,
1132            )))
1133        } else {
1134            ThreeWays::Two(PageLister::new(S3ListerV2::new(
1135                self.core.clone(),
1136                path,
1137                args,
1138            )))
1139        };
1140
1141        Ok((RpList::default(), l))
1142    }
1143
1144    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1145        let resp = self.core.s3_copy_object(from, to).await?;
1146
1147        let status = resp.status();
1148
1149        match status {
1150            StatusCode::OK => Ok(RpCopy::default()),
1151            _ => Err(parse_error(resp)),
1152        }
1153    }
1154
1155    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1156        let (expire, op) = args.into_parts();
1157        // We will not send this request out, just for signing.
1158        let req = match op {
1159            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1160            PresignOperation::Read(v) => {
1161                self.core
1162                    .s3_get_object_request(path, BytesRange::default(), &v)
1163            }
1164            PresignOperation::Write(_) => {
1165                self.core
1166                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1167            }
1168            PresignOperation::Delete(_) => Err(Error::new(
1169                ErrorKind::Unsupported,
1170                "operation is not supported",
1171            )),
1172        };
1173        let mut req = req?;
1174
1175        self.core.sign_query(&mut req, expire).await?;
1176
1177        // We don't need this request anymore, consume it directly.
1178        let (parts, _) = req.into_parts();
1179
1180        Ok(RpPresign::new(PresignedRequest::new(
1181            parts.method,
1182            parts.uri,
1183            parts.headers,
1184        )))
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::*;
1191
1192    #[test]
1193    fn test_is_valid_bucket() {
1194        let bucket_cases = vec![
1195            ("", false, false),
1196            ("test", false, true),
1197            ("test.xyz", false, true),
1198            ("", true, false),
1199            ("test", true, true),
1200            ("test.xyz", true, false),
1201        ];
1202
1203        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1204            let mut b = S3Builder::default();
1205            b = b.bucket(bucket);
1206            if enable_virtual_host_style {
1207                b = b.enable_virtual_host_style();
1208            }
1209            assert_eq!(b.is_bucket_valid(), expected)
1210        }
1211    }
1212
1213    #[test]
1214    fn test_build_endpoint() {
1215        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1216
1217        let endpoint_cases = vec![
1218            Some("s3.amazonaws.com"),
1219            Some("https://s3.amazonaws.com"),
1220            Some("https://s3.us-east-2.amazonaws.com"),
1221            None,
1222        ];
1223
1224        for endpoint in &endpoint_cases {
1225            let mut b = S3Builder::default().bucket("test");
1226            if let Some(endpoint) = endpoint {
1227                b = b.endpoint(endpoint);
1228            }
1229
1230            let endpoint = b.build_endpoint("us-east-2");
1231            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1232        }
1233
1234        for endpoint in &endpoint_cases {
1235            let mut b = S3Builder::default()
1236                .bucket("test")
1237                .enable_virtual_host_style();
1238            if let Some(endpoint) = endpoint {
1239                b = b.endpoint(endpoint);
1240            }
1241
1242            let endpoint = b.build_endpoint("us-east-2");
1243            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1244        }
1245    }
1246
1247    #[tokio::test]
1248    async fn test_detect_region() {
1249        let cases = vec![
1250            (
1251                "aws s3 without region in endpoint",
1252                "https://s3.amazonaws.com",
1253                "example",
1254                Some("us-east-1"),
1255            ),
1256            (
1257                "aws s3 with region in endpoint",
1258                "https://s3.us-east-1.amazonaws.com",
1259                "example",
1260                Some("us-east-1"),
1261            ),
1262            (
1263                "oss with public endpoint",
1264                "https://oss-ap-southeast-1.aliyuncs.com",
1265                "example",
1266                Some("oss-ap-southeast-1"),
1267            ),
1268            (
1269                "oss with internal endpoint",
1270                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1271                "example",
1272                Some("oss-cn-hangzhou-internal"),
1273            ),
1274            (
1275                "r2",
1276                "https://abc.xxxxx.r2.cloudflarestorage.com",
1277                "example",
1278                Some("auto"),
1279            ),
1280            (
1281                "invalid service",
1282                "https://opendal.apache.org",
1283                "example",
1284                None,
1285            ),
1286        ];
1287
1288        for (name, endpoint, bucket, expected) in cases {
1289            let region = S3Builder::detect_region(endpoint, bucket).await;
1290            assert_eq!(region.as_deref(), expected, "{name}");
1291        }
1292    }
1293}