1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::GoogleCredentialLoader;
26use reqsign::GoogleSigner;
27use reqsign::GoogleTokenLoad;
28use reqsign::GoogleTokenLoader;
29
30use super::GCS_SCHEME;
31use super::core::*;
32use super::delete::GcsDeleter;
33use super::error::parse_error;
34use super::lister::GcsLister;
35use super::writer::GcsWriter;
36use super::writer::GcsWriters;
37use crate::raw::oio::BatchDeleter;
38use crate::raw::*;
39use crate::services::GcsConfig;
40use crate::*;
41const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
42const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
43
44#[doc = include_str!("docs.md")]
46#[derive(Default)]
47pub struct GcsBuilder {
48 pub(super) config: GcsConfig,
49
50 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
51 pub(super) http_client: Option<HttpClient>,
52 pub(super) customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
53}
54
55impl Debug for GcsBuilder {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 let mut ds = f.debug_struct("GcsBuilder");
58
59 ds.field("config", &self.config);
60 ds.finish_non_exhaustive()
61 }
62}
63
64impl GcsBuilder {
65 pub fn root(mut self, root: &str) -> Self {
67 self.config.root = if root.is_empty() {
68 None
69 } else {
70 Some(root.to_string())
71 };
72
73 self
74 }
75
76 pub fn bucket(mut self, bucket: &str) -> Self {
78 self.config.bucket = bucket.to_string();
79 self
80 }
81
82 pub fn scope(mut self, scope: &str) -> Self {
94 if !scope.is_empty() {
95 self.config.scope = Some(scope.to_string())
96 };
97 self
98 }
99
100 pub fn service_account(mut self, service_account: &str) -> Self {
105 if !service_account.is_empty() {
106 self.config.service_account = Some(service_account.to_string())
107 };
108 self
109 }
110
111 pub fn endpoint(mut self, endpoint: &str) -> Self {
113 if !endpoint.is_empty() {
114 self.config.endpoint = Some(endpoint.to_string())
115 };
116 self
117 }
118
119 pub fn credential(mut self, credential: &str) -> Self {
127 if !credential.is_empty() {
128 self.config.credential = Some(credential.to_string())
129 };
130 self
131 }
132
133 pub fn credential_path(mut self, path: &str) -> Self {
140 if !path.is_empty() {
141 self.config.credential_path = Some(path.to_string())
142 };
143 self
144 }
145
146 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
153 #[allow(deprecated)]
154 pub fn http_client(mut self, client: HttpClient) -> Self {
155 self.http_client = Some(client);
156 self
157 }
158
159 pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
161 self.customized_token_loader = Some(token_load);
162 self
163 }
164
165 pub fn token(mut self, token: String) -> Self {
167 self.config.token = Some(token);
168 self
169 }
170
171 pub fn disable_vm_metadata(mut self) -> Self {
173 self.config.disable_vm_metadata = true;
174 self
175 }
176
177 pub fn disable_config_load(mut self) -> Self {
179 self.config.disable_config_load = true;
180 self
181 }
182
183 pub fn predefined_acl(mut self, acl: &str) -> Self {
193 if !acl.is_empty() {
194 self.config.predefined_acl = Some(acl.to_string())
195 };
196 self
197 }
198
199 pub fn default_storage_class(mut self, class: &str) -> Self {
207 if !class.is_empty() {
208 self.config.default_storage_class = Some(class.to_string())
209 };
210 self
211 }
212
213 pub fn allow_anonymous(mut self) -> Self {
218 self.config.allow_anonymous = true;
219 self
220 }
221}
222
223impl Builder for GcsBuilder {
224 type Config = GcsConfig;
225
226 fn build(self) -> Result<impl Access> {
227 debug!("backend build started: {self:?}");
228
229 let root = normalize_root(&self.config.root.unwrap_or_default());
230 debug!("backend use root {root}");
231
232 let bucket = match self.config.bucket.is_empty() {
234 false => Ok(&self.config.bucket),
235 true => Err(
236 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
237 .with_operation("Builder::build")
238 .with_context("service", Scheme::Gcs),
239 ),
240 }?;
241
242 let endpoint = self
245 .config
246 .endpoint
247 .clone()
248 .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
249 debug!("backend use endpoint: {endpoint}");
250
251 let mut cred_loader = GoogleCredentialLoader::default();
252 if let Some(cred) = &self.config.credential {
253 cred_loader = cred_loader.with_content(cred);
254 }
255 if let Some(cred) = &self.config.credential_path {
256 cred_loader = cred_loader.with_path(cred);
257 }
258 #[cfg(target_arch = "wasm32")]
259 {
260 cred_loader = cred_loader.with_disable_env();
261 cred_loader = cred_loader.with_disable_well_known_location();
262 }
263
264 if self.config.disable_config_load {
265 cred_loader = cred_loader
266 .with_disable_env()
267 .with_disable_well_known_location();
268 }
269
270 let scope = if let Some(scope) = &self.config.scope {
271 scope
272 } else {
273 DEFAULT_GCS_SCOPE
274 };
275
276 let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
277 if let Some(account) = &self.config.service_account {
278 token_loader = token_loader.with_service_account(account);
279 }
280 if let Ok(Some(cred)) = cred_loader.load() {
281 token_loader = token_loader.with_credentials(cred)
282 }
283 if let Some(loader) = self.customized_token_loader {
284 token_loader = token_loader.with_customized_token_loader(loader)
285 }
286
287 if self.config.disable_vm_metadata {
288 token_loader = token_loader.with_disable_vm_metadata(true);
289 }
290
291 let signer = GoogleSigner::new("storage");
292
293 let backend = GcsBackend {
294 core: Arc::new(GcsCore {
295 info: {
296 let am = AccessorInfo::default();
297 am.set_scheme(GCS_SCHEME)
298 .set_root(&root)
299 .set_name(bucket)
300 .set_native_capability(Capability {
301 stat: true,
302 stat_with_if_match: true,
303 stat_with_if_none_match: true,
304
305 read: true,
306
307 read_with_if_match: true,
308 read_with_if_none_match: true,
309
310 write: true,
311 write_can_empty: true,
312 write_can_multi: true,
313 write_with_cache_control: true,
314 write_with_content_type: true,
315 write_with_content_encoding: true,
316 write_with_user_metadata: true,
317 write_with_if_not_exists: true,
318
319 write_multi_min_size: Some(5 * 1024 * 1024),
323 write_multi_max_size: if cfg!(target_pointer_width = "64") {
327 Some(5 * 1024 * 1024 * 1024)
328 } else {
329 Some(usize::MAX)
330 },
331
332 delete: true,
333 delete_max_size: Some(100),
334 copy: true,
335
336 list: true,
337 list_with_limit: true,
338 list_with_start_after: true,
339 list_with_recursive: true,
340
341 presign: true,
342 presign_stat: true,
343 presign_read: true,
344 presign_write: true,
345
346 shared: true,
347
348 ..Default::default()
349 });
350
351 #[allow(deprecated)]
353 if let Some(client) = self.http_client {
354 am.update_http_client(|_| client);
355 }
356
357 am.into()
358 },
359 endpoint,
360 bucket: bucket.to_string(),
361 root,
362 signer,
363 token_loader,
364 token: self.config.token,
365 scope: scope.to_string(),
366 credential_loader: cred_loader,
367 predefined_acl: self.config.predefined_acl.clone(),
368 default_storage_class: self.config.default_storage_class.clone(),
369 allow_anonymous: self.config.allow_anonymous,
370 }),
371 };
372
373 Ok(backend)
374 }
375}
376
377#[derive(Clone, Debug)]
379pub struct GcsBackend {
380 core: Arc<GcsCore>,
381}
382
383impl Access for GcsBackend {
384 type Reader = HttpBody;
385 type Writer = GcsWriters;
386 type Lister = oio::PageLister<GcsLister>;
387 type Deleter = oio::BatchDeleter<GcsDeleter>;
388
389 fn info(&self) -> Arc<AccessorInfo> {
390 self.core.info.clone()
391 }
392
393 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
394 let resp = self.core.gcs_get_object_metadata(path, &args).await?;
395
396 if !resp.status().is_success() {
397 return Err(parse_error(resp));
398 }
399
400 let slc = resp.into_body();
401 let m = GcsCore::build_metadata_from_object_response(path, slc)?;
402
403 Ok(RpStat::new(m))
404 }
405
406 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
407 let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
408
409 let status = resp.status();
410
411 match status {
412 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
413 Ok((RpRead::default(), resp.into_body()))
414 }
415 _ => {
416 let (part, mut body) = resp.into_parts();
417 let buf = body.to_buffer().await?;
418 Err(parse_error(Response::from_parts(part, buf)))
419 }
420 }
421 }
422
423 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
424 let concurrent = args.concurrent();
425 let w = GcsWriter::new(self.core.clone(), path, args);
426 let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
427
428 Ok((RpWrite::default(), w))
429 }
430
431 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
432 Ok((
433 RpDelete::default(),
434 BatchDeleter::new(GcsDeleter::new(self.core.clone())),
435 ))
436 }
437
438 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
439 let l = GcsLister::new(
440 self.core.clone(),
441 path,
442 args.recursive(),
443 args.limit(),
444 args.start_after(),
445 );
446
447 Ok((RpList::default(), oio::PageLister::new(l)))
448 }
449
450 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
451 let resp = self.core.gcs_copy_object(from, to).await?;
452
453 if resp.status().is_success() {
454 Ok(RpCopy::default())
455 } else {
456 Err(parse_error(resp))
457 }
458 }
459
460 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
461 let req = match args.operation() {
463 PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
464 PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
465 PresignOperation::Write(v) => {
466 self.core
467 .gcs_insert_object_xml_request(path, v, Buffer::new())
468 }
469 PresignOperation::Delete(_) => Err(Error::new(
470 ErrorKind::Unsupported,
471 "operation is not supported",
472 )),
473 };
474 let mut req = req?;
475 self.core.sign_query(&mut req, args.expire())?;
476
477 let (parts, _) = req.into_parts();
479
480 Ok(RpPresign::new(PresignedRequest::new(
481 parts.method,
482 parts.uri,
483 parts.headers,
484 )))
485 }
486}