1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use http::Uri;
24use log::debug;
25use reqsign::TencentCosConfig;
26use reqsign::TencentCosCredentialLoader;
27use reqsign::TencentCosSigner;
28
29use super::COS_SCHEME;
30use super::config::CosConfig;
31use super::core::*;
32use super::deleter::CosDeleter;
33use super::error::parse_error;
34use super::lister::CosLister;
35use super::lister::CosListers;
36use super::lister::CosObjectVersionsLister;
37use super::writer::CosWriter;
38use super::writer::CosWriters;
39use crate::raw::*;
40use crate::*;
41
42#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct CosBuilder {
46 pub(super) config: CosConfig,
47
48 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
49 pub(super) http_client: Option<HttpClient>,
50}
51
52impl Debug for CosBuilder {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("CosBuilder")
55 .field("config", &self.config)
56 .finish_non_exhaustive()
57 }
58}
59
60impl CosBuilder {
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
82 if !endpoint.is_empty() {
83 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
84 }
85
86 self
87 }
88
89 pub fn secret_id(mut self, secret_id: &str) -> Self {
93 if !secret_id.is_empty() {
94 self.config.secret_id = Some(secret_id.to_string());
95 }
96
97 self
98 }
99
100 pub fn secret_key(mut self, secret_key: &str) -> Self {
104 if !secret_key.is_empty() {
105 self.config.secret_key = Some(secret_key.to_string());
106 }
107
108 self
109 }
110
111 pub fn bucket(mut self, bucket: &str) -> Self {
114 if !bucket.is_empty() {
115 self.config.bucket = Some(bucket.to_string());
116 }
117
118 self
119 }
120
121 pub fn enable_versioning(mut self, enabled: bool) -> Self {
123 self.config.enable_versioning = enabled;
124
125 self
126 }
127
128 pub fn disable_config_load(mut self) -> Self {
135 self.config.disable_config_load = true;
136 self
137 }
138
139 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
146 #[allow(deprecated)]
147 pub fn http_client(mut self, client: HttpClient) -> Self {
148 self.http_client = Some(client);
149 self
150 }
151}
152
153impl Builder for CosBuilder {
154 type Config = CosConfig;
155
156 fn build(self) -> Result<impl Access> {
157 debug!("backend build started: {:?}", &self);
158
159 let root = normalize_root(&self.config.root.unwrap_or_default());
160 debug!("backend use root {root}");
161
162 let bucket = match &self.config.bucket {
163 Some(bucket) => Ok(bucket.to_string()),
164 None => Err(
165 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
166 .with_context("service", COS_SCHEME),
167 ),
168 }?;
169 debug!("backend use bucket {}", &bucket);
170
171 let uri = match &self.config.endpoint {
172 Some(endpoint) => endpoint.parse::<Uri>().map_err(|err| {
173 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
174 .with_context("service", COS_SCHEME)
175 .with_context("endpoint", endpoint)
176 .set_source(err)
177 }),
178 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
179 .with_context("service", COS_SCHEME)),
180 }?;
181
182 let scheme = match uri.scheme_str() {
183 Some(scheme) => scheme.to_string(),
184 None => "https".to_string(),
185 };
186
187 let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."), "//");
189 debug!("backend use endpoint {}", &endpoint);
190
191 let mut cfg = TencentCosConfig::default();
192 if !self.config.disable_config_load {
193 cfg = cfg.from_env();
194 }
195
196 if let Some(v) = self.config.secret_id {
197 cfg.secret_id = Some(v);
198 }
199 if let Some(v) = self.config.secret_key {
200 cfg.secret_key = Some(v);
201 }
202
203 let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
204
205 let signer = TencentCosSigner::new();
206
207 Ok(CosBackend {
208 core: Arc::new(CosCore {
209 info: {
210 let am = AccessorInfo::default();
211 am.set_scheme(COS_SCHEME)
212 .set_root(&root)
213 .set_name(&bucket)
214 .set_native_capability(Capability {
215 stat: true,
216 stat_with_if_match: true,
217 stat_with_if_none_match: true,
218 stat_with_version: self.config.enable_versioning,
219
220 read: true,
221
222 read_with_if_match: true,
223 read_with_if_none_match: true,
224 read_with_if_modified_since: true,
225 read_with_if_unmodified_since: true,
226 read_with_version: self.config.enable_versioning,
227
228 write: true,
229 write_can_empty: true,
230 write_can_append: true,
231 write_can_multi: true,
232 write_with_content_type: true,
233 write_with_cache_control: true,
234 write_with_content_disposition: true,
235 write_with_if_not_exists: !self.config.enable_versioning,
237 write_multi_min_size: Some(1024 * 1024),
241 write_multi_max_size: if cfg!(target_pointer_width = "64") {
245 Some(5 * 1024 * 1024 * 1024)
246 } else {
247 Some(usize::MAX)
248 },
249 write_with_user_metadata: true,
250
251 delete: true,
252 delete_with_version: self.config.enable_versioning,
253 copy: true,
254
255 list: true,
256 list_with_recursive: true,
257 list_with_versions: self.config.enable_versioning,
258 list_with_deleted: self.config.enable_versioning,
259
260 presign: true,
261 presign_stat: true,
262 presign_read: true,
263 presign_write: true,
264
265 shared: true,
266
267 ..Default::default()
268 });
269
270 #[allow(deprecated)]
272 if let Some(client) = self.http_client {
273 am.update_http_client(|_| client);
274 }
275
276 am.into()
277 },
278 bucket: bucket.clone(),
279 root,
280 endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
281 signer,
282 loader: cred_loader,
283 }),
284 })
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct CosBackend {
291 core: Arc<CosCore>,
292}
293
294impl Access for CosBackend {
295 type Reader = HttpBody;
296 type Writer = CosWriters;
297 type Lister = CosListers;
298 type Deleter = oio::OneShotDeleter<CosDeleter>;
299
300 fn info(&self) -> Arc<AccessorInfo> {
301 self.core.info.clone()
302 }
303
304 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
305 let resp = self.core.cos_head_object(path, &args).await?;
306
307 let status = resp.status();
308
309 match status {
310 StatusCode::OK => {
311 let headers = resp.headers();
312 let mut meta = parse_into_metadata(path, headers)?;
313
314 let user_meta = parse_prefixed_headers(headers, "x-cos-meta-");
315 if !user_meta.is_empty() {
316 meta = meta.with_user_metadata(user_meta);
317 }
318
319 if let Some(v) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
320 if v != "null" {
321 meta.set_version(v);
322 }
323 }
324
325 Ok(RpStat::new(meta))
326 }
327 _ => Err(parse_error(resp)),
328 }
329 }
330
331 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
332 let resp = self.core.cos_get_object(path, args.range(), &args).await?;
333
334 let status = resp.status();
335
336 match status {
337 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
338 Ok((RpRead::default(), resp.into_body()))
339 }
340 _ => {
341 let (part, mut body) = resp.into_parts();
342 let buf = body.to_buffer().await?;
343 Err(parse_error(Response::from_parts(part, buf)))
344 }
345 }
346 }
347
348 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
349 let writer = CosWriter::new(self.core.clone(), path, args.clone());
350
351 let w = if args.append() {
352 CosWriters::Two(oio::AppendWriter::new(writer))
353 } else {
354 CosWriters::One(oio::MultipartWriter::new(
355 self.core.info.clone(),
356 writer,
357 args.concurrent(),
358 ))
359 };
360
361 Ok((RpWrite::default(), w))
362 }
363
364 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
365 Ok((
366 RpDelete::default(),
367 oio::OneShotDeleter::new(CosDeleter::new(self.core.clone())),
368 ))
369 }
370
371 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
372 let l = if args.versions() || args.deleted() {
373 TwoWays::Two(oio::PageLister::new(CosObjectVersionsLister::new(
374 self.core.clone(),
375 path,
376 args,
377 )))
378 } else {
379 TwoWays::One(oio::PageLister::new(CosLister::new(
380 self.core.clone(),
381 path,
382 args.recursive(),
383 args.limit(),
384 )))
385 };
386
387 Ok((RpList::default(), l))
388 }
389
390 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
391 let resp = self.core.cos_copy_object(from, to).await?;
392
393 let status = resp.status();
394
395 match status {
396 StatusCode::OK => Ok(RpCopy::default()),
397 _ => Err(parse_error(resp)),
398 }
399 }
400
401 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
402 let req = match args.operation() {
403 PresignOperation::Stat(v) => self.core.cos_head_object_request(path, v),
404 PresignOperation::Read(v) => {
405 self.core
406 .cos_get_object_request(path, BytesRange::default(), v)
407 }
408 PresignOperation::Write(v) => {
409 self.core
410 .cos_put_object_request(path, None, v, Buffer::new())
411 }
412 PresignOperation::Delete(_) => Err(Error::new(
413 ErrorKind::Unsupported,
414 "operation is not supported",
415 )),
416 };
417 let mut req = req?;
418 self.core.sign_query(&mut req, args.expire()).await?;
419
420 let (parts, _) = req.into_parts();
422
423 Ok(RpPresign::new(PresignedRequest::new(
424 parts.method,
425 parts.uri,
426 parts.headers,
427 )))
428 }
429}