opendal/services/dbfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use crate::raw::*;
33use crate::services::DbfsConfig;
34use crate::*;
35
36impl Configurator for DbfsConfig {
37    type Builder = DbfsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        DbfsBuilder { config: self }
40    }
41}
42
43/// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47    config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut ds = f.debug_struct("DbfsBuilder");
53
54        ds.field("config", &self.config);
55
56        ds.finish()
57    }
58}
59
60impl DbfsBuilder {
61    /// Set root of this backend.
62    ///
63    /// All operations will happen under this root.
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint of this backend.
75    ///
76    /// Endpoint must be full uri, e.g.
77    ///
78    /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net`
79    /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com`
80    pub fn endpoint(mut self, endpoint: &str) -> Self {
81        self.config.endpoint = if endpoint.is_empty() {
82            None
83        } else {
84            Some(endpoint.trim_end_matches('/').to_string())
85        };
86        self
87    }
88
89    /// Set the token of this backend.
90    pub fn token(mut self, token: &str) -> Self {
91        if !token.is_empty() {
92            self.config.token = Some(token.to_string());
93        }
94        self
95    }
96}
97
98impl Builder for DbfsBuilder {
99    const SCHEME: Scheme = Scheme::Dbfs;
100    type Config = DbfsConfig;
101
102    /// Build a DbfsBackend.
103    fn build(self) -> Result<impl Access> {
104        debug!("backend build started: {:?}", &self);
105
106        let root = normalize_root(&self.config.root.unwrap_or_default());
107        debug!("backend use root {root}");
108
109        let endpoint = match &self.config.endpoint {
110            Some(endpoint) => Ok(endpoint.clone()),
111            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
112                .with_operation("Builder::build")
113                .with_context("service", Scheme::Dbfs)),
114        }?;
115        debug!("backend use endpoint: {}", &endpoint);
116
117        let token = match self.config.token {
118            Some(token) => token,
119            None => {
120                return Err(Error::new(
121                    ErrorKind::ConfigInvalid,
122                    "missing token for Dbfs",
123                ));
124            }
125        };
126
127        let client = HttpClient::new()?;
128        Ok(DbfsBackend {
129            core: Arc::new(DbfsCore {
130                root,
131                endpoint: endpoint.to_string(),
132                token,
133                client,
134            }),
135        })
136    }
137}
138
139/// Backend for DBFS service
140#[derive(Debug, Clone)]
141pub struct DbfsBackend {
142    core: Arc<DbfsCore>,
143}
144
145impl Access for DbfsBackend {
146    type Reader = ();
147    type Writer = oio::OneShotWriter<DbfsWriter>;
148    type Lister = oio::PageLister<DbfsLister>;
149    type Deleter = oio::OneShotDeleter<DbfsDeleter>;
150
151    fn info(&self) -> Arc<AccessorInfo> {
152        let am = AccessorInfo::default();
153        am.set_scheme(Scheme::Dbfs)
154            .set_root(&self.core.root)
155            .set_native_capability(Capability {
156                stat: true,
157
158                write: true,
159                create_dir: true,
160                delete: true,
161                rename: true,
162
163                list: true,
164
165                shared: true,
166
167                ..Default::default()
168            });
169        am.into()
170    }
171
172    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
173        let resp = self.core.dbfs_create_dir(path).await?;
174
175        let status = resp.status();
176
177        match status {
178            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
179            _ => Err(parse_error(resp)),
180        }
181    }
182
183    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
184        // Stat root always returns a DIR.
185        if path == "/" {
186            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
187        }
188
189        let resp = self.core.dbfs_get_status(path).await?;
190
191        let status = resp.status();
192
193        match status {
194            StatusCode::OK => {
195                let mut meta = parse_into_metadata(path, resp.headers())?;
196                let bs = resp.into_body();
197                let decoded_response: DbfsStatus =
198                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
199                meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
200                    decoded_response.modification_time,
201                )?);
202                match decoded_response.is_dir {
203                    true => meta.set_mode(EntryMode::DIR),
204                    false => {
205                        meta.set_mode(EntryMode::FILE);
206                        meta.set_content_length(decoded_response.file_size as u64)
207                    }
208                };
209                Ok(RpStat::new(meta))
210            }
211            StatusCode::NOT_FOUND if path.ends_with('/') => {
212                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
213            }
214            _ => Err(parse_error(resp)),
215        }
216    }
217
218    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
219        Ok((
220            RpWrite::default(),
221            oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
222        ))
223    }
224
225    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
226        Ok((
227            RpDelete::default(),
228            oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
229        ))
230    }
231
232    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
233        let l = DbfsLister::new(self.core.clone(), path.to_string());
234
235        Ok((RpList::default(), oio::PageLister::new(l)))
236    }
237
238    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
239        self.core.dbfs_ensure_parent_path(to).await?;
240
241        let resp = self.core.dbfs_rename(from, to).await?;
242
243        let status = resp.status();
244
245        match status {
246            StatusCode::OK => Ok(RpRename::default()),
247            _ => Err(parse_error(resp)),
248        }
249    }
250}
251
252#[derive(Deserialize)]
253struct DbfsStatus {
254    // Not used fields.
255    // path: String,
256    is_dir: bool,
257    file_size: i64,
258    modification_time: i64,
259}