opendal/services/ghac/
writer.rs1use std::str::FromStr;
19use std::sync::Arc;
20
21use super::core::*;
22use super::error::parse_error;
23use crate::raw::*;
24use crate::services::core::AzblobCore;
25use crate::services::writer::AzblobWriter;
26use crate::*;
27
28pub type GhacWriter = TwoWays<GhacWriterV1, GhacWriterV2>;
29
30impl GhacWriter {
31 pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
33 match core.service_version {
34 GhacVersion::V1 => Ok(TwoWays::One(GhacWriterV1 {
35 core,
36 path: write_path,
37 url,
38 size: 0,
39 })),
40 GhacVersion::V2 => {
41 let uri = http::Uri::from_str(&url)
42 .map_err(new_http_uri_invalid_error)?
43 .into_parts();
44 let (Some(scheme), Some(authority), Some(pq)) =
45 (uri.scheme, uri.authority, uri.path_and_query)
46 else {
47 return Err(Error::new(
48 ErrorKind::Unexpected,
49 "ghac returns invalid signed url",
50 )
51 .with_context("url", &url));
52 };
53 let endpoint = format!("{scheme}://{authority}");
54 let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
55 return Err(Error::new(
56 ErrorKind::Unexpected,
57 "ghac returns invalid signed url that bucket or path is missing",
58 )
59 .with_context("url", &url));
60 };
61 let Some(query) = pq.query() else {
62 return Err(Error::new(
63 ErrorKind::Unexpected,
64 "ghac returns invalid signed url that sas is missing",
65 )
66 .with_context("url", &url));
67 };
68 let azure_core = Arc::new(AzblobCore {
69 info: {
70 let am = AccessorInfo::default();
71 am.set_scheme(Scheme::Azblob)
72 .set_root("/")
73 .set_name(container)
74 .set_native_capability(Capability {
75 stat: true,
76 stat_with_if_match: true,
77 stat_with_if_none_match: true,
78
79 read: true,
80
81 read_with_if_match: true,
82 read_with_if_none_match: true,
83 read_with_override_content_disposition: true,
84 read_with_if_modified_since: true,
85 read_with_if_unmodified_since: true,
86
87 write: true,
88 write_can_append: true,
89 write_can_empty: true,
90 write_can_multi: true,
91 write_with_cache_control: true,
92 write_with_content_type: true,
93 write_with_if_not_exists: true,
94 write_with_if_none_match: true,
95 write_with_user_metadata: true,
96
97 copy: true,
98
99 list: true,
100 list_with_recursive: true,
101
102 shared: true,
103
104 ..Default::default()
105 });
106
107 am.into()
108 },
109 container: container.to_string(),
110 root: "/".to_string(),
111 endpoint,
112 encryption_key: None,
113 encryption_key_sha256: None,
114 encryption_algorithm: None,
115 loader: {
116 let config = reqsign::AzureStorageConfig {
117 sas_token: Some(query.to_string()),
118 ..Default::default()
119 };
120 reqsign::AzureStorageLoader::new(config)
121 },
122 signer: { reqsign::AzureStorageSigner::new() },
123 });
124 let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
125 let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
126 Ok(TwoWays::Two(GhacWriterV2 {
127 core,
128 writer,
129 path: write_path,
130 url,
131 size: 0,
132 }))
133 }
134 }
135 }
136}
137
138pub struct GhacWriterV1 {
139 core: Arc<GhacCore>,
140
141 path: String,
142 url: String,
143 size: u64,
144}
145
146impl oio::Write for GhacWriterV1 {
147 async fn write(&mut self, bs: Buffer) -> Result<()> {
148 let size = bs.len() as u64;
149 let offset = self.size;
150
151 let resp = self.core.ghac_v1_write(&self.url, size, offset, bs).await?;
152 if !resp.status().is_success() {
153 return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_upload")));
154 }
155 self.size += size;
156 Ok(())
157 }
158
159 async fn abort(&mut self) -> Result<()> {
160 Ok(())
161 }
162
163 async fn close(&mut self) -> Result<Metadata> {
164 self.core
165 .ghac_finalize_upload(&self.path, &self.url, self.size)
166 .await?;
167 Ok(Metadata::default().with_content_length(self.size))
168 }
169}
170
171pub struct GhacWriterV2 {
172 core: Arc<GhacCore>,
173 writer: oio::BlockWriter<AzblobWriter>,
174
175 path: String,
176 url: String,
177 size: u64,
178}
179
180impl oio::Write for GhacWriterV2 {
181 async fn write(&mut self, bs: Buffer) -> Result<()> {
182 let size = bs.len() as u64;
183
184 self.writer.write(bs).await?;
185 self.size += size;
186 Ok(())
187 }
188
189 async fn close(&mut self) -> Result<Metadata> {
190 self.writer.close().await?;
191 let _ = self
192 .core
193 .ghac_finalize_upload(&self.path, &self.url, self.size)
194 .await;
195 Ok(Metadata::default().with_content_length(self.size))
196 }
197
198 async fn abort(&mut self) -> Result<()> {
199 Ok(())
200 }
201}