opendal/services/cos/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use http::Uri;
24use log::debug;
25use reqsign::TencentCosConfig;
26use reqsign::TencentCosCredentialLoader;
27use reqsign::TencentCosSigner;
28
29use super::COS_SCHEME;
30use super::config::CosConfig;
31use super::core::*;
32use super::deleter::CosDeleter;
33use super::error::parse_error;
34use super::lister::CosLister;
35use super::lister::CosListers;
36use super::lister::CosObjectVersionsLister;
37use super::writer::CosWriter;
38use super::writer::CosWriters;
39use crate::raw::*;
40use crate::*;
41
42/// Tencent-Cloud COS services support.
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct CosBuilder {
46    pub(super) config: CosConfig,
47
48    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
49    pub(super) http_client: Option<HttpClient>,
50}
51
52impl Debug for CosBuilder {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("CosBuilder")
55            .field("config", &self.config)
56            .finish_non_exhaustive()
57    }
58}
59
60impl CosBuilder {
61    /// Set root of this backend.
62    ///
63    /// All operations will happen under this root.
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint of this backend.
75    ///
76    /// NOTE: no bucket or account id in endpoint, we will trim them if exists.
77    ///
78    /// # Examples
79    ///
80    /// - `https://cos.ap-singapore.myqcloud.com`
81    pub fn endpoint(mut self, endpoint: &str) -> Self {
82        if !endpoint.is_empty() {
83            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
84        }
85
86        self
87    }
88
89    /// Set secret_id of this backend.
90    /// - If it is set, we will take user's input first.
91    /// - If not, we will try to load it from environment.
92    pub fn secret_id(mut self, secret_id: &str) -> Self {
93        if !secret_id.is_empty() {
94            self.config.secret_id = Some(secret_id.to_string());
95        }
96
97        self
98    }
99
100    /// Set secret_key of this backend.
101    /// - If it is set, we will take user's input first.
102    /// - If not, we will try to load it from environment.
103    pub fn secret_key(mut self, secret_key: &str) -> Self {
104        if !secret_key.is_empty() {
105            self.config.secret_key = Some(secret_key.to_string());
106        }
107
108        self
109    }
110
111    /// Set bucket of this backend.
112    /// The param is required.
113    pub fn bucket(mut self, bucket: &str) -> Self {
114        if !bucket.is_empty() {
115            self.config.bucket = Some(bucket.to_string());
116        }
117
118        self
119    }
120
121    /// Set bucket versioning status for this backend
122    pub fn enable_versioning(mut self, enabled: bool) -> Self {
123        self.config.enable_versioning = enabled;
124
125        self
126    }
127
128    /// Disable config load so that opendal will not load config from
129    /// environment.
130    ///
131    /// For examples:
132    ///
133    /// - envs like `TENCENTCLOUD_SECRET_ID`
134    pub fn disable_config_load(mut self) -> Self {
135        self.config.disable_config_load = true;
136        self
137    }
138
139    /// Specify the http client that used by this service.
140    ///
141    /// # Notes
142    ///
143    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
144    /// during minor updates.
145    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
146    #[allow(deprecated)]
147    pub fn http_client(mut self, client: HttpClient) -> Self {
148        self.http_client = Some(client);
149        self
150    }
151}
152
153impl Builder for CosBuilder {
154    type Config = CosConfig;
155
156    fn build(self) -> Result<impl Access> {
157        debug!("backend build started: {:?}", &self);
158
159        let root = normalize_root(&self.config.root.unwrap_or_default());
160        debug!("backend use root {root}");
161
162        let bucket = match &self.config.bucket {
163            Some(bucket) => Ok(bucket.to_string()),
164            None => Err(
165                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
166                    .with_context("service", COS_SCHEME),
167            ),
168        }?;
169        debug!("backend use bucket {}", &bucket);
170
171        let uri = match &self.config.endpoint {
172            Some(endpoint) => endpoint.parse::<Uri>().map_err(|err| {
173                Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
174                    .with_context("service", COS_SCHEME)
175                    .with_context("endpoint", endpoint)
176                    .set_source(err)
177            }),
178            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
179                .with_context("service", COS_SCHEME)),
180        }?;
181
182        let scheme = match uri.scheme_str() {
183            Some(scheme) => scheme.to_string(),
184            None => "https".to_string(),
185        };
186
187        // If endpoint contains bucket name, we should trim them.
188        let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."), "//");
189        debug!("backend use endpoint {}", &endpoint);
190
191        let mut cfg = TencentCosConfig::default();
192        if !self.config.disable_config_load {
193            cfg = cfg.from_env();
194        }
195
196        if let Some(v) = self.config.secret_id {
197            cfg.secret_id = Some(v);
198        }
199        if let Some(v) = self.config.secret_key {
200            cfg.secret_key = Some(v);
201        }
202
203        let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
204
205        let signer = TencentCosSigner::new();
206
207        Ok(CosBackend {
208            core: Arc::new(CosCore {
209                info: {
210                    let am = AccessorInfo::default();
211                    am.set_scheme(COS_SCHEME)
212                        .set_root(&root)
213                        .set_name(&bucket)
214                        .set_native_capability(Capability {
215                            stat: true,
216                            stat_with_if_match: true,
217                            stat_with_if_none_match: true,
218                            stat_with_version: self.config.enable_versioning,
219
220                            read: true,
221
222                            read_with_if_match: true,
223                            read_with_if_none_match: true,
224                            read_with_if_modified_since: true,
225                            read_with_if_unmodified_since: true,
226                            read_with_version: self.config.enable_versioning,
227
228                            write: true,
229                            write_can_empty: true,
230                            write_can_append: true,
231                            write_can_multi: true,
232                            write_with_content_type: true,
233                            write_with_cache_control: true,
234                            write_with_content_disposition: true,
235                            // Cos doesn't support forbid overwrite while version has been enabled.
236                            write_with_if_not_exists: !self.config.enable_versioning,
237                            // The min multipart size of COS is 1 MiB.
238                            //
239                            // ref: <https://www.tencentcloud.com/document/product/436/14112>
240                            write_multi_min_size: Some(1024 * 1024),
241                            // The max multipart size of COS is 5 GiB.
242                            //
243                            // ref: <https://www.tencentcloud.com/document/product/436/14112>
244                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
245                                Some(5 * 1024 * 1024 * 1024)
246                            } else {
247                                Some(usize::MAX)
248                            },
249                            write_with_user_metadata: true,
250
251                            delete: true,
252                            delete_with_version: self.config.enable_versioning,
253                            copy: true,
254
255                            list: true,
256                            list_with_recursive: true,
257                            list_with_versions: self.config.enable_versioning,
258                            list_with_deleted: self.config.enable_versioning,
259
260                            presign: true,
261                            presign_stat: true,
262                            presign_read: true,
263                            presign_write: true,
264
265                            shared: true,
266
267                            ..Default::default()
268                        });
269
270                    // allow deprecated api here for compatibility
271                    #[allow(deprecated)]
272                    if let Some(client) = self.http_client {
273                        am.update_http_client(|_| client);
274                    }
275
276                    am.into()
277                },
278                bucket: bucket.clone(),
279                root,
280                endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
281                signer,
282                loader: cred_loader,
283            }),
284        })
285    }
286}
287
288/// Backend for Tencent-Cloud COS services.
289#[derive(Debug, Clone)]
290pub struct CosBackend {
291    core: Arc<CosCore>,
292}
293
294impl Access for CosBackend {
295    type Reader = HttpBody;
296    type Writer = CosWriters;
297    type Lister = CosListers;
298    type Deleter = oio::OneShotDeleter<CosDeleter>;
299
300    fn info(&self) -> Arc<AccessorInfo> {
301        self.core.info.clone()
302    }
303
304    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
305        let resp = self.core.cos_head_object(path, &args).await?;
306
307        let status = resp.status();
308
309        match status {
310            StatusCode::OK => {
311                let headers = resp.headers();
312                let mut meta = parse_into_metadata(path, headers)?;
313
314                let user_meta = parse_prefixed_headers(headers, "x-cos-meta-");
315                if !user_meta.is_empty() {
316                    meta = meta.with_user_metadata(user_meta);
317                }
318
319                if let Some(v) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
320                    if v != "null" {
321                        meta.set_version(v);
322                    }
323                }
324
325                Ok(RpStat::new(meta))
326            }
327            _ => Err(parse_error(resp)),
328        }
329    }
330
331    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
332        let resp = self.core.cos_get_object(path, args.range(), &args).await?;
333
334        let status = resp.status();
335
336        match status {
337            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
338                Ok((RpRead::default(), resp.into_body()))
339            }
340            _ => {
341                let (part, mut body) = resp.into_parts();
342                let buf = body.to_buffer().await?;
343                Err(parse_error(Response::from_parts(part, buf)))
344            }
345        }
346    }
347
348    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
349        let writer = CosWriter::new(self.core.clone(), path, args.clone());
350
351        let w = if args.append() {
352            CosWriters::Two(oio::AppendWriter::new(writer))
353        } else {
354            CosWriters::One(oio::MultipartWriter::new(
355                self.core.info.clone(),
356                writer,
357                args.concurrent(),
358            ))
359        };
360
361        Ok((RpWrite::default(), w))
362    }
363
364    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
365        Ok((
366            RpDelete::default(),
367            oio::OneShotDeleter::new(CosDeleter::new(self.core.clone())),
368        ))
369    }
370
371    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
372        let l = if args.versions() || args.deleted() {
373            TwoWays::Two(oio::PageLister::new(CosObjectVersionsLister::new(
374                self.core.clone(),
375                path,
376                args,
377            )))
378        } else {
379            TwoWays::One(oio::PageLister::new(CosLister::new(
380                self.core.clone(),
381                path,
382                args.recursive(),
383                args.limit(),
384            )))
385        };
386
387        Ok((RpList::default(), l))
388    }
389
390    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
391        let resp = self.core.cos_copy_object(from, to).await?;
392
393        let status = resp.status();
394
395        match status {
396            StatusCode::OK => Ok(RpCopy::default()),
397            _ => Err(parse_error(resp)),
398        }
399    }
400
401    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
402        let req = match args.operation() {
403            PresignOperation::Stat(v) => self.core.cos_head_object_request(path, v),
404            PresignOperation::Read(v) => {
405                self.core
406                    .cos_get_object_request(path, BytesRange::default(), v)
407            }
408            PresignOperation::Write(v) => {
409                self.core
410                    .cos_put_object_request(path, None, v, Buffer::new())
411            }
412            PresignOperation::Delete(_) => Err(Error::new(
413                ErrorKind::Unsupported,
414                "operation is not supported",
415            )),
416        };
417        let mut req = req?;
418        self.core.sign_query(&mut req, args.expire()).await?;
419
420        // We don't need this request anymore, consume it directly.
421        let (parts, _) = req.into_parts();
422
423        Ok(RpPresign::new(PresignedRequest::new(
424            parts.method,
425            parts.uri,
426            parts.headers,
427        )))
428    }
429}