opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::sync::LazyLock;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use constants::X_AMZ_META_PREFIX;
30use constants::X_AMZ_VERSION_ID;
31use http::Response;
32use http::StatusCode;
33use log::debug;
34use log::warn;
35use md5::Digest;
36use md5::Md5;
37use reqsign::AwsAssumeRoleLoader;
38use reqsign::AwsConfig;
39use reqsign::AwsCredentialLoad;
40use reqsign::AwsDefaultLoader;
41use reqsign::AwsV4Signer;
42use reqwest::Url;
43
44use super::core::*;
45use super::delete::S3Deleter;
46use super::error::parse_error;
47use super::lister::S3ListerV1;
48use super::lister::S3ListerV2;
49use super::lister::S3Listers;
50use super::lister::S3ObjectVersionsLister;
51use super::writer::S3Writer;
52use super::writer::S3Writers;
53use super::DEFAULT_SCHEME;
54use crate::raw::oio::PageLister;
55use crate::raw::*;
56use crate::services::S3Config;
57use crate::*;
58
59/// Allow constructing correct region endpoint if user gives a global endpoint.
60static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
61    let mut m = HashMap::new();
62    // AWS S3 Service.
63    m.insert(
64        "https://s3.amazonaws.com",
65        "https://s3.{region}.amazonaws.com",
66    );
67    m
68});
69
70const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
71
72impl Configurator for S3Config {
73    type Builder = S3Builder;
74
75    #[allow(deprecated)]
76    fn into_builder(self) -> Self::Builder {
77        S3Builder {
78            config: self,
79            customized_credential_load: None,
80
81            http_client: None,
82        }
83    }
84}
85
86/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
87/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
88#[doc = include_str!("docs.md")]
89#[doc = include_str!("compatible_services.md")]
90#[derive(Default)]
91pub struct S3Builder {
92    config: S3Config,
93
94    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
95
96    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
97    http_client: Option<HttpClient>,
98}
99
100impl Debug for S3Builder {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        let mut d = f.debug_struct("S3Builder");
103
104        d.field("config", &self.config);
105        d.finish_non_exhaustive()
106    }
107}
108
109impl S3Builder {
110    /// Set root of this backend.
111    ///
112    /// All operations will happen under this root.
113    pub fn root(mut self, root: &str) -> Self {
114        self.config.root = if root.is_empty() {
115            None
116        } else {
117            Some(root.to_string())
118        };
119
120        self
121    }
122
123    /// Set bucket name of this backend.
124    pub fn bucket(mut self, bucket: &str) -> Self {
125        self.config.bucket = bucket.to_string();
126
127        self
128    }
129
130    /// Set endpoint of this backend.
131    ///
132    /// Endpoint must be full uri, e.g.
133    ///
134    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
135    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
136    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
137    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
138    /// - Minio: `http://127.0.0.1:9000`
139    ///
140    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
141    /// will prepend "https://" before it.
142    pub fn endpoint(mut self, endpoint: &str) -> Self {
143        if !endpoint.is_empty() {
144            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
145            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
146        }
147
148        self
149    }
150
151    /// Region represent the signing region of this endpoint. This is required
152    /// if you are using the default AWS S3 endpoint.
153    ///
154    /// If using a custom endpoint,
155    /// - If region is set, we will take user's input first.
156    /// - If not, we will try to load it from environment.
157    pub fn region(mut self, region: &str) -> Self {
158        if !region.is_empty() {
159            self.config.region = Some(region.to_string())
160        }
161
162        self
163    }
164
165    /// Set access_key_id of this backend.
166    ///
167    /// - If access_key_id is set, we will take user's input first.
168    /// - If not, we will try to load it from environment.
169    pub fn access_key_id(mut self, v: &str) -> Self {
170        if !v.is_empty() {
171            self.config.access_key_id = Some(v.to_string())
172        }
173
174        self
175    }
176
177    /// Set secret_access_key of this backend.
178    ///
179    /// - If secret_access_key is set, we will take user's input first.
180    /// - If not, we will try to load it from environment.
181    pub fn secret_access_key(mut self, v: &str) -> Self {
182        if !v.is_empty() {
183            self.config.secret_access_key = Some(v.to_string())
184        }
185
186        self
187    }
188
189    /// Set role_arn for this backend.
190    ///
191    /// If `role_arn` is set, we will use already known config as source
192    /// credential to assume role with `role_arn`.
193    pub fn role_arn(mut self, v: &str) -> Self {
194        if !v.is_empty() {
195            self.config.role_arn = Some(v.to_string())
196        }
197
198        self
199    }
200
201    /// Set external_id for this backend.
202    pub fn external_id(mut self, v: &str) -> Self {
203        if !v.is_empty() {
204            self.config.external_id = Some(v.to_string())
205        }
206
207        self
208    }
209
210    /// Set role_session_name for this backend.
211    pub fn role_session_name(mut self, v: &str) -> Self {
212        if !v.is_empty() {
213            self.config.role_session_name = Some(v.to_string())
214        }
215
216        self
217    }
218
219    /// Set default storage_class for this backend.
220    ///
221    /// Available values:
222    /// - `DEEP_ARCHIVE`
223    /// - `GLACIER`
224    /// - `GLACIER_IR`
225    /// - `INTELLIGENT_TIERING`
226    /// - `ONEZONE_IA`
227    /// - `OUTPOSTS`
228    /// - `REDUCED_REDUNDANCY`
229    /// - `STANDARD`
230    /// - `STANDARD_IA`
231    pub fn default_storage_class(mut self, v: &str) -> Self {
232        if !v.is_empty() {
233            self.config.default_storage_class = Some(v.to_string())
234        }
235
236        self
237    }
238
239    /// Set server_side_encryption for this backend.
240    ///
241    /// Available values: `AES256`, `aws:kms`.
242    ///
243    /// # Note
244    ///
245    /// This function is the low-level setting for SSE related features.
246    ///
247    /// SSE related options should be set carefully to make them works.
248    /// Please use `server_side_encryption_with_*` helpers if even possible.
249    pub fn server_side_encryption(mut self, v: &str) -> Self {
250        if !v.is_empty() {
251            self.config.server_side_encryption = Some(v.to_string())
252        }
253
254        self
255    }
256
257    /// Set server_side_encryption_aws_kms_key_id for this backend
258    ///
259    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
260    ///   is not set, S3 will use aws managed kms key to encrypt data.
261    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
262    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
263    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
264    ///   returned.
265    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
266    ///
267    /// # Note
268    ///
269    /// This function is the low-level setting for SSE related features.
270    ///
271    /// SSE related options should be set carefully to make them works.
272    /// Please use `server_side_encryption_with_*` helpers if even possible.
273    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
274        if !v.is_empty() {
275            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
276        }
277
278        self
279    }
280
281    /// Set server_side_encryption_customer_algorithm for this backend.
282    ///
283    /// Available values: `AES256`.
284    ///
285    /// # Note
286    ///
287    /// This function is the low-level setting for SSE related features.
288    ///
289    /// SSE related options should be set carefully to make them works.
290    /// Please use `server_side_encryption_with_*` helpers if even possible.
291    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
292        if !v.is_empty() {
293            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
294        }
295
296        self
297    }
298
299    /// Set server_side_encryption_customer_key for this backend.
300    ///
301    /// # Args
302    ///
303    /// `v`: base64 encoded key that matches algorithm specified in
304    /// `server_side_encryption_customer_algorithm`.
305    ///
306    /// # Note
307    ///
308    /// This function is the low-level setting for SSE related features.
309    ///
310    /// SSE related options should be set carefully to make them works.
311    /// Please use `server_side_encryption_with_*` helpers if even possible.
312    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
313        if !v.is_empty() {
314            self.config.server_side_encryption_customer_key = Some(v.to_string())
315        }
316
317        self
318    }
319
320    /// Set server_side_encryption_customer_key_md5 for this backend.
321    ///
322    /// # Args
323    ///
324    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
325    ///
326    /// # Note
327    ///
328    /// This function is the low-level setting for SSE related features.
329    ///
330    /// SSE related options should be set carefully to make them works.
331    /// Please use `server_side_encryption_with_*` helpers if even possible.
332    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
333        if !v.is_empty() {
334            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
335        }
336
337        self
338    }
339
340    /// Enable server side encryption with aws managed kms key
341    ///
342    /// As known as: SSE-KMS
343    ///
344    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
345    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
346        self.config.server_side_encryption = Some("aws:kms".to_string());
347        self
348    }
349
350    /// Enable server side encryption with customer managed kms key
351    ///
352    /// As known as: SSE-KMS
353    ///
354    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
355    pub fn server_side_encryption_with_customer_managed_kms_key(
356        mut self,
357        aws_kms_key_id: &str,
358    ) -> Self {
359        self.config.server_side_encryption = Some("aws:kms".to_string());
360        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
361        self
362    }
363
364    /// Enable server side encryption with s3 managed key
365    ///
366    /// As known as: SSE-S3
367    ///
368    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
369    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
370        self.config.server_side_encryption = Some("AES256".to_string());
371        self
372    }
373
374    /// Enable server side encryption with customer key.
375    ///
376    /// As known as: SSE-C
377    ///
378    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
379    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
380        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
381        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
382        self.config.server_side_encryption_customer_key_md5 =
383            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
384        self
385    }
386
387    /// Set temporary credential used in AWS S3 connections
388    ///
389    /// # Warning
390    ///
391    /// session token's lifetime is short and requires users to refresh in time.
392    pub fn session_token(mut self, token: &str) -> Self {
393        if !token.is_empty() {
394            self.config.session_token = Some(token.to_string());
395        }
396        self
397    }
398
399    /// Set temporary credential used in AWS S3 connections
400    #[deprecated(note = "Please use `session_token` instead")]
401    pub fn security_token(self, token: &str) -> Self {
402        self.session_token(token)
403    }
404
405    /// Disable config load so that opendal will not load config from
406    /// environment.
407    ///
408    /// For examples:
409    ///
410    /// - envs like `AWS_ACCESS_KEY_ID`
411    /// - files like `~/.aws/config`
412    pub fn disable_config_load(mut self) -> Self {
413        self.config.disable_config_load = true;
414        self
415    }
416
417    /// Disable list objects v2 so that opendal will not use the older
418    /// List Objects V1 to list objects.
419    ///
420    /// By default, OpenDAL uses List Objects V2 to list objects. However,
421    /// some legacy services do not yet support V2.
422    pub fn disable_list_objects_v2(mut self) -> Self {
423        self.config.disable_list_objects_v2 = true;
424        self
425    }
426
427    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
428    ///
429    /// With this option the client accepts to pay for the request and data transfer costs.
430    pub fn enable_request_payer(mut self) -> Self {
431        self.config.enable_request_payer = true;
432        self
433    }
434
435    /// Disable load credential from ec2 metadata.
436    ///
437    /// This option is used to disable the default behavior of opendal
438    /// to load credential from ec2 metadata, a.k.a, IMDSv2
439    pub fn disable_ec2_metadata(mut self) -> Self {
440        self.config.disable_ec2_metadata = true;
441        self
442    }
443
444    /// Allow anonymous will allow opendal to send request without signing
445    /// when credential is not loaded.
446    pub fn allow_anonymous(mut self) -> Self {
447        self.config.allow_anonymous = true;
448        self
449    }
450
451    /// Enable virtual host style so that opendal will send API requests
452    /// in virtual host style instead of path style.
453    ///
454    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
455    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
456    pub fn enable_virtual_host_style(mut self) -> Self {
457        self.config.enable_virtual_host_style = true;
458        self
459    }
460
461    /// Disable stat with override so that opendal will not send stat request with override queries.
462    ///
463    /// For example, R2 doesn't support stat with `response_content_type` query.
464    pub fn disable_stat_with_override(mut self) -> Self {
465        self.config.disable_stat_with_override = true;
466        self
467    }
468
469    /// Adding a customized credential load for service.
470    ///
471    /// If customized_credential_load has been set, we will ignore all other
472    /// credential load methods.
473    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
474        self.customized_credential_load = Some(cred);
475        self
476    }
477
478    /// Specify the http client that used by this service.
479    ///
480    /// # Notes
481    ///
482    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
483    /// during minor updates.
484    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
485    #[allow(deprecated)]
486    pub fn http_client(mut self, client: HttpClient) -> Self {
487        self.http_client = Some(client);
488        self
489    }
490
491    /// Set bucket versioning status for this backend
492    pub fn enable_versioning(mut self, enabled: bool) -> Self {
493        self.config.enable_versioning = enabled;
494
495        self
496    }
497
498    /// Check if `bucket` is valid
499    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
500    /// it couldn't contain dot(.) character
501    fn is_bucket_valid(&self) -> bool {
502        if self.config.bucket.is_empty() {
503            return false;
504        }
505        // If enable virtual host style, `bucket` will reside in domain part,
506        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
507        // so `bucket` with dot can't be recognized correctly for this format.
508        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
509            return false;
510        }
511        true
512    }
513
514    /// Build endpoint with given region.
515    fn build_endpoint(&self, region: &str) -> String {
516        let bucket = {
517            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
518
519            self.config.bucket.as_str()
520        };
521
522        let mut endpoint = match &self.config.endpoint {
523            Some(endpoint) => {
524                if endpoint.starts_with("http") {
525                    endpoint.to_string()
526                } else {
527                    // Prefix https if endpoint doesn't start with scheme.
528                    format!("https://{endpoint}")
529                }
530            }
531            None => "https://s3.amazonaws.com".to_string(),
532        };
533
534        // If endpoint contains bucket name, we should trim them.
535        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
536
537        // Omit default ports if specified.
538        if let Ok(url) = Url::from_str(&endpoint) {
539            // Remove the trailing `/` of root path.
540            endpoint = url.to_string().trim_end_matches('/').to_string();
541        }
542
543        // Update with endpoint templates.
544        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
545            template.replace("{region}", region)
546        } else {
547            // If we don't know where about this endpoint, just leave
548            // them as it.
549            endpoint.to_string()
550        };
551
552        // Apply virtual host style.
553        if self.config.enable_virtual_host_style {
554            endpoint = endpoint.replace("//", &format!("//{bucket}."))
555        } else {
556            write!(endpoint, "/{bucket}").expect("write into string must succeed");
557        };
558
559        endpoint
560    }
561
562    /// Set maximum batch operations of this backend.
563    #[deprecated(
564        since = "0.52.0",
565        note = "Please use `delete_max_size` instead of `batch_max_operations`"
566    )]
567    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
568        self.config.delete_max_size = Some(batch_max_operations);
569
570        self
571    }
572
573    /// Set maximum delete operations of this backend.
574    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
575        self.config.delete_max_size = Some(delete_max_size);
576
577        self
578    }
579
580    /// Set checksum algorithm of this backend.
581    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
582    ///
583    /// Available options:
584    /// - "crc32c"
585    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
586        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
587
588        self
589    }
590
591    /// Disable write with if match so that opendal will not send write request with if match headers.
592    pub fn disable_write_with_if_match(mut self) -> Self {
593        self.config.disable_write_with_if_match = true;
594        self
595    }
596
597    /// Enable write with append so that opendal will send write request with append headers.
598    pub fn enable_write_with_append(mut self) -> Self {
599        self.config.enable_write_with_append = true;
600        self
601    }
602
603    /// Detect region of S3 bucket.
604    ///
605    /// # Args
606    ///
607    /// - endpoint: the endpoint of S3 service
608    /// - bucket: the bucket of S3 service
609    ///
610    /// # Return
611    ///
612    /// - `Some(region)` means we detect the region successfully
613    /// - `None` means we can't detect the region or meeting errors.
614    ///
615    /// # Notes
616    ///
617    /// We will try to detect region by the following methods.
618    ///
619    /// - Match endpoint with given rules to get region
620    ///   - Cloudflare R2
621    ///   - AWS S3
622    ///   - Aliyun OSS
623    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
624    ///
625    /// # Examples
626    ///
627    /// ```no_run
628    /// use opendal::services::S3;
629    ///
630    /// # async fn example() {
631    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
632    /// # }
633    /// ```
634    ///
635    /// # Reference
636    ///
637    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
638    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
639        // Remove the possible trailing `/` in endpoint.
640        let endpoint = endpoint.trim_end_matches('/');
641
642        // Make sure the endpoint contains the scheme.
643        let mut endpoint = if endpoint.starts_with("http") {
644            endpoint.to_string()
645        } else {
646            // Prefix https if endpoint doesn't start with scheme.
647            format!("https://{endpoint}")
648        };
649
650        // Remove bucket name from endpoint.
651        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
652        let url = format!("{endpoint}/{bucket}");
653
654        debug!("detect region with url: {url}");
655
656        // Try to detect region by endpoint.
657
658        // If this bucket is R2, we can return auto directly.
659        //
660        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
661        if endpoint.ends_with("r2.cloudflarestorage.com") {
662            return Some("auto".to_string());
663        }
664
665        // If this bucket is AWS, we can try to match the endpoint.
666        if let Some(v) = endpoint.strip_prefix("https://s3.") {
667            if let Some(region) = v.strip_suffix(".amazonaws.com") {
668                return Some(region.to_string());
669            }
670        }
671
672        // If this bucket is OSS, we can try to match the endpoint.
673        //
674        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
675        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
676        if let Some(v) = endpoint.strip_prefix("https://") {
677            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
678                return Some(region.to_string());
679            }
680
681            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
682                return Some(region.to_string());
683            }
684        }
685
686        // Try to detect region by HeadBucket.
687        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
688
689        let client = HttpClient::new().ok()?;
690        let res = client
691            .send(req)
692            .await
693            .map_err(|err| warn!("detect region failed for: {err:?}"))
694            .ok()?;
695
696        debug!(
697            "auto detect region got response: status {:?}, header: {:?}",
698            res.status(),
699            res.headers()
700        );
701
702        // Get region from response header no matter status code.
703        if let Some(header) = res.headers().get("x-amz-bucket-region") {
704            if let Ok(regin) = header.to_str() {
705                return Some(regin.to_string());
706            }
707        }
708
709        // Status code is 403 or 200 means we already visit the correct
710        // region, we can use the default region directly.
711        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
712            return Some("us-east-1".to_string());
713        }
714
715        None
716    }
717}
718
719impl Builder for S3Builder {
720    type Config = S3Config;
721
722    fn build(mut self) -> Result<impl Access> {
723        debug!("backend build started: {:?}", &self);
724
725        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
726        debug!("backend use root {}", &root);
727
728        // Handle bucket name.
729        let bucket = if self.is_bucket_valid() {
730            Ok(&self.config.bucket)
731        } else {
732            Err(
733                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
734                    .with_context("service", Scheme::S3),
735            )
736        }?;
737        debug!("backend use bucket {}", &bucket);
738
739        let default_storage_class = match &self.config.default_storage_class {
740            None => None,
741            Some(v) => Some(
742                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
743            ),
744        };
745
746        let server_side_encryption = match &self.config.server_side_encryption {
747            None => None,
748            Some(v) => Some(
749                build_header_value(v)
750                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
751            ),
752        };
753
754        let server_side_encryption_aws_kms_key_id =
755            match &self.config.server_side_encryption_aws_kms_key_id {
756                None => None,
757                Some(v) => Some(build_header_value(v).map_err(|err| {
758                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
759                })?),
760            };
761
762        let server_side_encryption_customer_algorithm =
763            match &self.config.server_side_encryption_customer_algorithm {
764                None => None,
765                Some(v) => Some(build_header_value(v).map_err(|err| {
766                    err.with_context("key", "server_side_encryption_customer_algorithm")
767                })?),
768            };
769
770        let server_side_encryption_customer_key =
771            match &self.config.server_side_encryption_customer_key {
772                None => None,
773                Some(v) => Some(build_header_value(v).map_err(|err| {
774                    err.with_context("key", "server_side_encryption_customer_key")
775                })?),
776            };
777
778        let server_side_encryption_customer_key_md5 =
779            match &self.config.server_side_encryption_customer_key_md5 {
780                None => None,
781                Some(v) => Some(build_header_value(v).map_err(|err| {
782                    err.with_context("key", "server_side_encryption_customer_key_md5")
783                })?),
784            };
785
786        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
787            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
788            None => None,
789            v => {
790                return Err(Error::new(
791                    ErrorKind::ConfigInvalid,
792                    format!("{v:?} is not a supported checksum_algorithm."),
793                ))
794            }
795        };
796
797        // This is our current config.
798        let mut cfg = AwsConfig::default();
799        if !self.config.disable_config_load {
800            #[cfg(not(target_arch = "wasm32"))]
801            {
802                cfg = cfg.from_profile();
803                cfg = cfg.from_env();
804            }
805        }
806
807        if let Some(ref v) = self.config.region {
808            cfg.region = Some(v.to_string());
809        }
810
811        if cfg.region.is_none() {
812            return Err(Error::new(
813                ErrorKind::ConfigInvalid,
814                "region is missing. Please find it by S3::detect_region() or set them in env.",
815            )
816            .with_operation("Builder::build")
817            .with_context("service", Scheme::S3));
818        }
819
820        let region = cfg.region.to_owned().unwrap();
821        debug!("backend use region: {region}");
822
823        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
824        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
825
826        // Building endpoint.
827        let endpoint = self.build_endpoint(&region);
828        debug!("backend use endpoint: {endpoint}");
829
830        // Setting all value from user input if available.
831        if let Some(v) = self.config.access_key_id {
832            cfg.access_key_id = Some(v)
833        }
834        if let Some(v) = self.config.secret_access_key {
835            cfg.secret_access_key = Some(v)
836        }
837        if let Some(v) = self.config.session_token {
838            cfg.session_token = Some(v)
839        }
840
841        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
842        // If customized_credential_load is set, we will use it.
843        if let Some(v) = self.customized_credential_load {
844            loader = Some(v);
845        }
846
847        // If role_arn is set, we must use AssumeRoleLoad.
848        if let Some(role_arn) = self.config.role_arn {
849            // use current env as source credential loader.
850            let default_loader =
851                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
852
853            // Build the config for assume role.
854            let mut assume_role_cfg = AwsConfig {
855                region: Some(region.clone()),
856                role_arn: Some(role_arn),
857                external_id: self.config.external_id.clone(),
858                sts_regional_endpoints: "regional".to_string(),
859                ..Default::default()
860            };
861
862            // override default role_session_name if set
863            if let Some(name) = self.config.role_session_name {
864                assume_role_cfg.role_session_name = name;
865            }
866
867            let assume_role_loader = AwsAssumeRoleLoader::new(
868                GLOBAL_REQWEST_CLIENT.clone().clone(),
869                assume_role_cfg,
870                Box::new(default_loader),
871            )
872            .map_err(|err| {
873                Error::new(
874                    ErrorKind::ConfigInvalid,
875                    "The assume_role_loader is misconfigured",
876                )
877                .with_context("service", Scheme::S3)
878                .set_source(err)
879            })?;
880            loader = Some(Box::new(assume_role_loader));
881        }
882        // If loader is not set, we will use default loader.
883        let loader = match loader {
884            Some(v) => v,
885            None => {
886                let mut default_loader =
887                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
888                if self.config.disable_ec2_metadata {
889                    default_loader = default_loader.with_disable_ec2_metadata();
890                }
891
892                Box::new(default_loader)
893            }
894        };
895
896        let signer = AwsV4Signer::new("s3", &region);
897
898        let delete_max_size = self
899            .config
900            .delete_max_size
901            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
902
903        Ok(S3Backend {
904            core: Arc::new(S3Core {
905                info: {
906                    let am = AccessorInfo::default();
907                    am.set_scheme(DEFAULT_SCHEME)
908                        .set_root(&root)
909                        .set_name(bucket)
910                        .set_native_capability(Capability {
911                            stat: true,
912                            stat_with_if_match: true,
913                            stat_with_if_none_match: true,
914                            stat_with_if_modified_since: true,
915                            stat_with_if_unmodified_since: true,
916                            stat_with_override_cache_control: !self
917                                .config
918                                .disable_stat_with_override,
919                            stat_with_override_content_disposition: !self
920                                .config
921                                .disable_stat_with_override,
922                            stat_with_override_content_type: !self
923                                .config
924                                .disable_stat_with_override,
925                            stat_with_version: self.config.enable_versioning,
926
927                            read: true,
928                            read_with_if_match: true,
929                            read_with_if_none_match: true,
930                            read_with_if_modified_since: true,
931                            read_with_if_unmodified_since: true,
932                            read_with_override_cache_control: true,
933                            read_with_override_content_disposition: true,
934                            read_with_override_content_type: true,
935                            read_with_version: self.config.enable_versioning,
936
937                            write: true,
938                            write_can_empty: true,
939                            write_can_multi: true,
940                            write_can_append: self.config.enable_write_with_append,
941
942                            write_with_cache_control: true,
943                            write_with_content_type: true,
944                            write_with_content_encoding: true,
945                            write_with_if_match: !self.config.disable_write_with_if_match,
946                            write_with_if_not_exists: true,
947                            write_with_user_metadata: true,
948
949                            // The min multipart size of S3 is 5 MiB.
950                            //
951                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
952                            write_multi_min_size: Some(5 * 1024 * 1024),
953                            // The max multipart size of S3 is 5 GiB.
954                            //
955                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
956                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
957                                Some(5 * 1024 * 1024 * 1024)
958                            } else {
959                                Some(usize::MAX)
960                            },
961
962                            delete: true,
963                            delete_max_size: Some(delete_max_size),
964                            delete_with_version: self.config.enable_versioning,
965
966                            copy: true,
967
968                            list: true,
969                            list_with_limit: true,
970                            list_with_start_after: true,
971                            list_with_recursive: true,
972                            list_with_versions: self.config.enable_versioning,
973                            list_with_deleted: self.config.enable_versioning,
974
975                            presign: true,
976                            presign_stat: true,
977                            presign_read: true,
978                            presign_write: true,
979
980                            shared: true,
981
982                            ..Default::default()
983                        });
984
985                    // allow deprecated api here for compatibility
986                    #[allow(deprecated)]
987                    if let Some(client) = self.http_client {
988                        am.update_http_client(|_| client);
989                    }
990
991                    am.into()
992                },
993                bucket: bucket.to_string(),
994                endpoint,
995                root,
996                server_side_encryption,
997                server_side_encryption_aws_kms_key_id,
998                server_side_encryption_customer_algorithm,
999                server_side_encryption_customer_key,
1000                server_side_encryption_customer_key_md5,
1001                default_storage_class,
1002                allow_anonymous: self.config.allow_anonymous,
1003                disable_list_objects_v2: self.config.disable_list_objects_v2,
1004                enable_request_payer: self.config.enable_request_payer,
1005                signer,
1006                loader,
1007                credential_loaded: AtomicBool::new(false),
1008                checksum_algorithm,
1009            }),
1010        })
1011    }
1012}
1013
1014/// Backend for s3 services.
1015#[derive(Debug, Clone)]
1016pub struct S3Backend {
1017    core: Arc<S3Core>,
1018}
1019
1020impl Access for S3Backend {
1021    type Reader = HttpBody;
1022    type Writer = S3Writers;
1023    type Lister = S3Listers;
1024    type Deleter = oio::BatchDeleter<S3Deleter>;
1025
1026    fn info(&self) -> Arc<AccessorInfo> {
1027        self.core.info.clone()
1028    }
1029
1030    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1031        let resp = self.core.s3_head_object(path, args).await?;
1032
1033        let status = resp.status();
1034
1035        match status {
1036            StatusCode::OK => {
1037                let headers = resp.headers();
1038                let mut meta = parse_into_metadata(path, headers)?;
1039
1040                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1041                if !user_meta.is_empty() {
1042                    meta = meta.with_user_metadata(user_meta);
1043                }
1044
1045                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1046                    meta.set_version(v);
1047                }
1048
1049                Ok(RpStat::new(meta))
1050            }
1051            _ => Err(parse_error(resp)),
1052        }
1053    }
1054
1055    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1056        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1057
1058        let status = resp.status();
1059        match status {
1060            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1061                Ok((RpRead::default(), resp.into_body()))
1062            }
1063            _ => {
1064                let (part, mut body) = resp.into_parts();
1065                let buf = body.to_buffer().await?;
1066                Err(parse_error(Response::from_parts(part, buf)))
1067            }
1068        }
1069    }
1070
1071    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1072        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1073
1074        let w = if args.append() {
1075            S3Writers::Two(oio::AppendWriter::new(writer))
1076        } else {
1077            S3Writers::One(oio::MultipartWriter::new(
1078                self.core.info.clone(),
1079                writer,
1080                args.concurrent(),
1081            ))
1082        };
1083
1084        Ok((RpWrite::default(), w))
1085    }
1086
1087    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1088        Ok((
1089            RpDelete::default(),
1090            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1091        ))
1092    }
1093
1094    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1095        let l = if args.versions() || args.deleted() {
1096            ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1097                self.core.clone(),
1098                path,
1099                args,
1100            )))
1101        } else if self.core.disable_list_objects_v2 {
1102            ThreeWays::One(PageLister::new(S3ListerV1::new(
1103                self.core.clone(),
1104                path,
1105                args,
1106            )))
1107        } else {
1108            ThreeWays::Two(PageLister::new(S3ListerV2::new(
1109                self.core.clone(),
1110                path,
1111                args,
1112            )))
1113        };
1114
1115        Ok((RpList::default(), l))
1116    }
1117
1118    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1119        let resp = self.core.s3_copy_object(from, to).await?;
1120
1121        let status = resp.status();
1122
1123        match status {
1124            StatusCode::OK => Ok(RpCopy::default()),
1125            _ => Err(parse_error(resp)),
1126        }
1127    }
1128
1129    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1130        let (expire, op) = args.into_parts();
1131        // We will not send this request out, just for signing.
1132        let req = match op {
1133            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1134            PresignOperation::Read(v) => {
1135                self.core
1136                    .s3_get_object_request(path, BytesRange::default(), &v)
1137            }
1138            PresignOperation::Write(_) => {
1139                self.core
1140                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1141            }
1142            PresignOperation::Delete(_) => Err(Error::new(
1143                ErrorKind::Unsupported,
1144                "operation is not supported",
1145            )),
1146        };
1147        let mut req = req?;
1148
1149        self.core.sign_query(&mut req, expire).await?;
1150
1151        // We don't need this request anymore, consume it directly.
1152        let (parts, _) = req.into_parts();
1153
1154        Ok(RpPresign::new(PresignedRequest::new(
1155            parts.method,
1156            parts.uri,
1157            parts.headers,
1158        )))
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use super::*;
1165
1166    #[test]
1167    fn test_is_valid_bucket() {
1168        let bucket_cases = vec![
1169            ("", false, false),
1170            ("test", false, true),
1171            ("test.xyz", false, true),
1172            ("", true, false),
1173            ("test", true, true),
1174            ("test.xyz", true, false),
1175        ];
1176
1177        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1178            let mut b = S3Builder::default();
1179            b = b.bucket(bucket);
1180            if enable_virtual_host_style {
1181                b = b.enable_virtual_host_style();
1182            }
1183            assert_eq!(b.is_bucket_valid(), expected)
1184        }
1185    }
1186
1187    #[test]
1188    fn test_build_endpoint() {
1189        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1190
1191        let endpoint_cases = vec![
1192            Some("s3.amazonaws.com"),
1193            Some("https://s3.amazonaws.com"),
1194            Some("https://s3.us-east-2.amazonaws.com"),
1195            None,
1196        ];
1197
1198        for endpoint in &endpoint_cases {
1199            let mut b = S3Builder::default().bucket("test");
1200            if let Some(endpoint) = endpoint {
1201                b = b.endpoint(endpoint);
1202            }
1203
1204            let endpoint = b.build_endpoint("us-east-2");
1205            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1206        }
1207
1208        for endpoint in &endpoint_cases {
1209            let mut b = S3Builder::default()
1210                .bucket("test")
1211                .enable_virtual_host_style();
1212            if let Some(endpoint) = endpoint {
1213                b = b.endpoint(endpoint);
1214            }
1215
1216            let endpoint = b.build_endpoint("us-east-2");
1217            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1218        }
1219    }
1220
1221    #[tokio::test]
1222    async fn test_detect_region() {
1223        let cases = vec![
1224            (
1225                "aws s3 without region in endpoint",
1226                "https://s3.amazonaws.com",
1227                "example",
1228                Some("us-east-1"),
1229            ),
1230            (
1231                "aws s3 with region in endpoint",
1232                "https://s3.us-east-1.amazonaws.com",
1233                "example",
1234                Some("us-east-1"),
1235            ),
1236            (
1237                "oss with public endpoint",
1238                "https://oss-ap-southeast-1.aliyuncs.com",
1239                "example",
1240                Some("oss-ap-southeast-1"),
1241            ),
1242            (
1243                "oss with internal endpoint",
1244                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1245                "example",
1246                Some("oss-cn-hangzhou-internal"),
1247            ),
1248            (
1249                "r2",
1250                "https://abc.xxxxx.r2.cloudflarestorage.com",
1251                "example",
1252                Some("auto"),
1253            ),
1254            (
1255                "invalid service",
1256                "https://opendal.apache.org",
1257                "example",
1258                None,
1259            ),
1260        ];
1261
1262        for (name, endpoint, bucket, expected) in cases {
1263            let region = S3Builder::detect_region(endpoint, bucket).await;
1264            assert_eq!(region.as_deref(), expected, "{name}");
1265        }
1266    }
1267}