opendal/services/azdls/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::core::DIRECTORY;
31use super::delete::AzdlsDeleter;
32use super::error::parse_error;
33use super::lister::AzdlsLister;
34use super::writer::AzdlsWriter;
35use super::writer::AzdlsWriters;
36use crate::raw::*;
37use crate::services::AzdlsConfig;
38use crate::*;
39
40impl From<AzureStorageConfig> for AzdlsConfig {
41    fn from(config: AzureStorageConfig) -> Self {
42        AzdlsConfig {
43            endpoint: config.endpoint,
44            account_name: config.account_name,
45            account_key: config.account_key,
46            client_secret: config.client_secret,
47            tenant_id: config.tenant_id,
48            client_id: config.client_id,
49            sas_token: config.sas_token,
50            authority_host: config.authority_host,
51            ..Default::default()
52        }
53    }
54}
55
56impl Configurator for AzdlsConfig {
57    type Builder = AzdlsBuilder;
58
59    #[allow(deprecated)]
60    fn into_builder(self) -> Self::Builder {
61        AzdlsBuilder {
62            config: self,
63            http_client: None,
64        }
65    }
66}
67
68/// Azure Data Lake Storage Gen2 Support.
69#[doc = include_str!("docs.md")]
70#[derive(Default, Clone)]
71pub struct AzdlsBuilder {
72    config: AzdlsConfig,
73
74    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
75    http_client: Option<HttpClient>,
76}
77
78impl Debug for AzdlsBuilder {
79    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
80        let mut ds = f.debug_struct("AzdlsBuilder");
81
82        ds.field("config", &self.config);
83
84        ds.finish()
85    }
86}
87
88impl AzdlsBuilder {
89    /// Set root of this backend.
90    ///
91    /// All operations will happen under this root.
92    pub fn root(mut self, root: &str) -> Self {
93        self.config.root = if root.is_empty() {
94            None
95        } else {
96            Some(root.to_string())
97        };
98
99        self
100    }
101
102    /// Set filesystem name of this backend.
103    pub fn filesystem(mut self, filesystem: &str) -> Self {
104        self.config.filesystem = filesystem.to_string();
105
106        self
107    }
108
109    /// Set endpoint of this backend.
110    ///
111    /// Endpoint must be full uri, e.g.
112    ///
113    /// - Azblob: `https://accountname.blob.core.windows.net`
114    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
115    pub fn endpoint(mut self, endpoint: &str) -> Self {
116        if !endpoint.is_empty() {
117            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
118            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
119        }
120
121        self
122    }
123
124    /// Set account_name of this backend.
125    ///
126    /// - If account_name is set, we will take user's input first.
127    /// - If not, we will try to load it from environment.
128    pub fn account_name(mut self, account_name: &str) -> Self {
129        if !account_name.is_empty() {
130            self.config.account_name = Some(account_name.to_string());
131        }
132
133        self
134    }
135
136    /// Set account_key of this backend.
137    ///
138    /// - If account_key is set, we will take user's input first.
139    /// - If not, we will try to load it from environment.
140    pub fn account_key(mut self, account_key: &str) -> Self {
141        if !account_key.is_empty() {
142            self.config.account_key = Some(account_key.to_string());
143        }
144
145        self
146    }
147
148    /// Set client_secret of this backend.
149    ///
150    /// - If client_secret is set, we will take user's input first.
151    /// - If not, we will try to load it from environment.
152    /// - required for client_credentials authentication
153    pub fn client_secret(mut self, client_secret: &str) -> Self {
154        if !client_secret.is_empty() {
155            self.config.client_secret = Some(client_secret.to_string());
156        }
157
158        self
159    }
160
161    /// Set tenant_id of this backend.
162    ///
163    /// - If tenant_id is set, we will take user's input first.
164    /// - If not, we will try to load it from environment.
165    /// - required for client_credentials authentication
166    pub fn tenant_id(mut self, tenant_id: &str) -> Self {
167        if !tenant_id.is_empty() {
168            self.config.tenant_id = Some(tenant_id.to_string());
169        }
170
171        self
172    }
173
174    /// Set client_id of this backend.
175    ///
176    /// - If client_id is set, we will take user's input first.
177    /// - If not, we will try to load it from environment.
178    /// - required for client_credentials authentication
179    pub fn client_id(mut self, client_id: &str) -> Self {
180        if !client_id.is_empty() {
181            self.config.client_id = Some(client_id.to_string());
182        }
183
184        self
185    }
186
187    /// Set the sas_token of this backend.
188    pub fn sas_token(mut self, sas_token: &str) -> Self {
189        if !sas_token.is_empty() {
190            self.config.sas_token = Some(sas_token.to_string());
191        }
192
193        self
194    }
195
196    /// Set authority_host of this backend.
197    ///
198    /// - If authority_host is set, we will take user's input first.
199    /// - If not, we will try to load it from environment.
200    /// - default value: `https://login.microsoftonline.com`
201    pub fn authority_host(mut self, authority_host: &str) -> Self {
202        if !authority_host.is_empty() {
203            self.config.authority_host = Some(authority_host.to_string());
204        }
205
206        self
207    }
208
209    /// Specify the http client that used by this service.
210    ///
211    /// # Notes
212    ///
213    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
214    /// during minor updates.
215    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
216    #[allow(deprecated)]
217    pub fn http_client(mut self, client: HttpClient) -> Self {
218        self.http_client = Some(client);
219        self
220    }
221
222    /// Create a new `AzdlsBuilder` instance from an [Azure Storage connection string][1].
223    ///
224    /// [1]: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
225    ///
226    /// # Example
227    /// ```
228    /// use opendal::Builder;
229    /// use opendal::services::Azdls;
230    ///
231    /// let conn_str = "AccountName=example;DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net";
232    ///
233    /// let mut config = Azdls::from_connection_string(&conn_str)
234    ///     .unwrap()
235    ///     // Add additional configuration if needed
236    ///     .filesystem("myFilesystem")
237    ///     .client_id("myClientId")
238    ///     .client_secret("myClientSecret")
239    ///     .tenant_id("myTenantId")
240    ///     .build()
241    ///     .unwrap();
242    /// ```
243    pub fn from_connection_string(conn_str: &str) -> Result<Self> {
244        let config =
245            raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::Adls)?;
246
247        Ok(AzdlsConfig::from(config).into_builder())
248    }
249}
250
251impl Builder for AzdlsBuilder {
252    const SCHEME: Scheme = Scheme::Azdls;
253    type Config = AzdlsConfig;
254
255    fn build(self) -> Result<impl Access> {
256        debug!("backend build started: {:?}", &self);
257
258        let root = normalize_root(&self.config.root.unwrap_or_default());
259        debug!("backend use root {root}");
260
261        // Handle endpoint, region and container name.
262        let filesystem = match self.config.filesystem.is_empty() {
263            false => Ok(&self.config.filesystem),
264            true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
265                .with_operation("Builder::build")
266                .with_context("service", Scheme::Azdls)),
267        }?;
268        debug!("backend use filesystem {}", &filesystem);
269
270        let endpoint = match &self.config.endpoint {
271            Some(endpoint) => Ok(endpoint.clone().trim_end_matches('/').to_string()),
272            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
273                .with_operation("Builder::build")
274                .with_context("service", Scheme::Azdls)),
275        }?;
276        debug!("backend use endpoint {}", &endpoint);
277
278        let config_loader = AzureStorageConfig {
279            account_name: self
280                .config
281                .account_name
282                .clone()
283                .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str())),
284            account_key: self.config.account_key.clone(),
285            sas_token: self.config.sas_token,
286            client_id: self.config.client_id.clone(),
287            client_secret: self.config.client_secret.clone(),
288            tenant_id: self.config.tenant_id.clone(),
289            authority_host: self.config.authority_host.clone(),
290            ..Default::default()
291        };
292
293        let cred_loader = AzureStorageLoader::new(config_loader);
294        let signer = AzureStorageSigner::new();
295        Ok(AzdlsBackend {
296            core: Arc::new(AzdlsCore {
297                info: {
298                    let am = AccessorInfo::default();
299                    am.set_scheme(Scheme::Azdls)
300                        .set_root(&root)
301                        .set_name(filesystem)
302                        .set_native_capability(Capability {
303                            stat: true,
304
305                            read: true,
306
307                            write: true,
308                            write_can_append: true,
309                            write_with_if_none_match: true,
310                            write_with_if_not_exists: true,
311
312                            create_dir: true,
313                            delete: true,
314                            rename: true,
315
316                            list: true,
317
318                            shared: true,
319
320                            ..Default::default()
321                        });
322
323                    // allow deprecated api here for compatibility
324                    #[allow(deprecated)]
325                    if let Some(client) = self.http_client {
326                        am.update_http_client(|_| client);
327                    }
328
329                    am.into()
330                },
331                filesystem: self.config.filesystem.clone(),
332                root,
333                endpoint,
334                loader: cred_loader,
335                signer,
336            }),
337        })
338    }
339}
340
341/// Backend for azblob services.
342#[derive(Debug, Clone)]
343pub struct AzdlsBackend {
344    core: Arc<AzdlsCore>,
345}
346
347impl Access for AzdlsBackend {
348    type Reader = HttpBody;
349    type Writer = AzdlsWriters;
350    type Lister = oio::PageLister<AzdlsLister>;
351    type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
352
353    fn info(&self) -> Arc<AccessorInfo> {
354        self.core.info.clone()
355    }
356
357    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
358        let resp = self
359            .core
360            .azdls_create(path, DIRECTORY, &OpWrite::default())
361            .await?;
362
363        let status = resp.status();
364        match status {
365            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
366            _ => Err(parse_error(resp)),
367        }
368    }
369
370    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
371        // Stat root always returns a DIR.
372        // TODO: include metadata for the root (#4746)
373        if path == "/" {
374            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
375        }
376
377        let metadata = self.core.azdls_stat_metadata(path).await?;
378        Ok(RpStat::new(metadata))
379    }
380
381    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
382        let resp = self.core.azdls_read(path, args.range()).await?;
383
384        let status = resp.status();
385        match status {
386            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
387            _ => {
388                let (part, mut body) = resp.into_parts();
389                let buf = body.to_buffer().await?;
390                Err(parse_error(Response::from_parts(part, buf)))
391            }
392        }
393    }
394
395    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
396        let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
397        let w = if args.append() {
398            AzdlsWriters::Two(oio::AppendWriter::new(w))
399        } else {
400            AzdlsWriters::One(oio::OneShotWriter::new(w))
401        };
402        Ok((RpWrite::default(), w))
403    }
404
405    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
406        Ok((
407            RpDelete::default(),
408            oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
409        ))
410    }
411
412    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
413        let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
414
415        Ok((RpList::default(), oio::PageLister::new(l)))
416    }
417
418    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
419        if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
420            let status = resp.status();
421            match status {
422                StatusCode::CREATED | StatusCode::CONFLICT => {}
423                _ => return Err(parse_error(resp)),
424            }
425        }
426
427        let resp = self.core.azdls_rename(from, to).await?;
428
429        let status = resp.status();
430
431        match status {
432            StatusCode::CREATED => Ok(RpRename::default()),
433            _ => Err(parse_error(resp)),
434        }
435    }
436}