opendal/services/webhdfs/
backend.rs1use core::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use tokio::sync::OnceCell;
27
28use super::core::WebhdfsCore;
29use super::delete::WebhdfsDeleter;
30use super::error::parse_error;
31use super::lister::WebhdfsLister;
32use super::message::BooleanResp;
33use super::message::FileStatusType;
34use super::message::FileStatusWrapper;
35use super::writer::WebhdfsWriter;
36use super::writer::WebhdfsWriters;
37use crate::raw::*;
38use crate::services::WebhdfsConfig;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43impl Configurator for WebhdfsConfig {
44 type Builder = WebhdfsBuilder;
45 fn into_builder(self) -> Self::Builder {
46 WebhdfsBuilder { config: self }
47 }
48}
49
50#[doc = include_str!("docs.md")]
52#[derive(Default, Clone)]
53pub struct WebhdfsBuilder {
54 config: WebhdfsConfig,
55}
56
57impl Debug for WebhdfsBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let mut d = f.debug_struct("WebhdfsBuilder");
60 d.field("config", &self.config);
61 d.finish_non_exhaustive()
62 }
63}
64
65impl WebhdfsBuilder {
66 pub fn root(mut self, root: &str) -> Self {
74 self.config.root = if root.is_empty() {
75 None
76 } else {
77 Some(root.to_string())
78 };
79
80 self
81 }
82
83 pub fn endpoint(mut self, endpoint: &str) -> Self {
94 if !endpoint.is_empty() {
95 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
97 }
98 self
99 }
100
101 pub fn user_name(mut self, user_name: &str) -> Self {
104 if !user_name.is_empty() {
105 self.config.user_name = Some(user_name.to_string());
106 }
107 self
108 }
109
110 pub fn delegation(mut self, delegation: &str) -> Self {
117 if !delegation.is_empty() {
118 self.config.delegation = Some(delegation.to_string());
119 }
120 self
121 }
122
123 pub fn disable_list_batch(mut self) -> Self {
130 self.config.disable_list_batch = true;
131 self
132 }
133
134 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
140 self.config.atomic_write_dir = if dir.is_empty() {
141 None
142 } else {
143 Some(String::from(dir))
144 };
145 self
146 }
147}
148
149impl Builder for WebhdfsBuilder {
150 const SCHEME: Scheme = Scheme::Webhdfs;
151 type Config = WebhdfsConfig;
152
153 fn build(self) -> Result<impl Access> {
161 debug!("start building backend: {self:?}");
162
163 let root = normalize_root(&self.config.root.unwrap_or_default());
164 debug!("backend use root {root}");
165
166 let endpoint = match self.config.endpoint {
168 Some(endpoint) => {
169 if endpoint.starts_with("http") {
170 endpoint
171 } else {
172 format!("http://{endpoint}")
173 }
174 }
175 None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
176 };
177 debug!("backend use endpoint {endpoint}");
178
179 let atomic_write_dir = self.config.atomic_write_dir;
180
181 let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
182
183 let info = AccessorInfo::default();
184 info.set_scheme(Scheme::Webhdfs)
185 .set_root(&root)
186 .set_native_capability(Capability {
187 stat: true,
188
189 read: true,
190
191 write: true,
192 write_can_append: true,
193 write_can_multi: atomic_write_dir.is_some(),
194
195 create_dir: true,
196 delete: true,
197
198 list: true,
199
200 shared: true,
201
202 ..Default::default()
203 });
204
205 let accessor_info = Arc::new(info);
206 let core = Arc::new(WebhdfsCore {
207 info: accessor_info,
208 root,
209 endpoint,
210 user_name: self.config.user_name,
211 auth,
212 root_checker: OnceCell::new(),
213 atomic_write_dir,
214 disable_list_batch: self.config.disable_list_batch,
215 });
216
217 Ok(WebhdfsBackend { core })
218 }
219}
220
221#[derive(Debug, Clone)]
223pub struct WebhdfsBackend {
224 core: Arc<WebhdfsCore>,
225}
226
227impl WebhdfsBackend {
228 async fn check_root(&self) -> Result<()> {
229 let resp = self.core.webhdfs_get_file_status("/").await?;
230 match resp.status() {
231 StatusCode::OK => {
232 let bs = resp.into_body();
233
234 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
235 .map_err(new_json_deserialize_error)?
236 .file_status;
237
238 if file_status.ty == FileStatusType::File {
239 return Err(Error::new(
240 ErrorKind::ConfigInvalid,
241 "root path must be dir",
242 ));
243 }
244 }
245 StatusCode::NOT_FOUND => {
246 self.create_dir("/", OpCreateDir::new()).await?;
247 }
248 _ => return Err(parse_error(resp)),
249 }
250 Ok(())
251 }
252}
253
254impl Access for WebhdfsBackend {
255 type Reader = HttpBody;
256 type Writer = WebhdfsWriters;
257 type Lister = oio::PageLister<WebhdfsLister>;
258 type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
259
260 fn info(&self) -> Arc<AccessorInfo> {
261 self.core.info.clone()
262 }
263
264 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
266 let resp = self.core.webhdfs_create_dir(path).await?;
267
268 let status = resp.status();
269 match status {
274 StatusCode::CREATED | StatusCode::OK => {
275 let bs = resp.into_body();
276
277 let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
278 .map_err(new_json_deserialize_error)?;
279
280 if resp.boolean {
281 Ok(RpCreateDir::default())
282 } else {
283 Err(Error::new(
284 ErrorKind::Unexpected,
285 "webhdfs create dir failed",
286 ))
287 }
288 }
289 _ => Err(parse_error(resp)),
290 }
291 }
292
293 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
294 self.core
296 .root_checker
297 .get_or_try_init(|| async { self.check_root().await })
298 .await?;
299
300 let resp = self.core.webhdfs_get_file_status(path).await?;
301 let status = resp.status();
302 match status {
303 StatusCode::OK => {
304 let bs = resp.into_body();
305
306 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
307 .map_err(new_json_deserialize_error)?
308 .file_status;
309
310 let meta = match file_status.ty {
311 FileStatusType::Directory => Metadata::new(EntryMode::DIR),
312 FileStatusType::File => Metadata::new(EntryMode::FILE)
313 .with_content_length(file_status.length)
314 .with_last_modified(parse_datetime_from_from_timestamp_millis(
315 file_status.modification_time,
316 )?),
317 };
318
319 Ok(RpStat::new(meta))
320 }
321
322 _ => Err(parse_error(resp)),
323 }
324 }
325
326 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
327 let resp = self.core.webhdfs_read_file(path, args.range()).await?;
328
329 let status = resp.status();
330 match status {
331 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
332 Ok((RpRead::default(), resp.into_body()))
333 }
334 _ => {
335 let (part, mut body) = resp.into_parts();
336 let buf = body.to_buffer().await?;
337 Err(parse_error(Response::from_parts(part, buf)))
338 }
339 }
340 }
341
342 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
343 let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
344
345 let w = if args.append() {
346 WebhdfsWriters::Two(oio::AppendWriter::new(w))
347 } else {
348 WebhdfsWriters::One(oio::BlockWriter::new(
349 self.info().clone(),
350 w,
351 args.concurrent(),
352 ))
353 };
354
355 Ok((RpWrite::default(), w))
356 }
357
358 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
359 Ok((
360 RpDelete::default(),
361 oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
362 ))
363 }
364
365 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
366 if args.recursive() {
367 return Err(Error::new(
368 ErrorKind::Unsupported,
369 "WebHDFS doesn't support list with recursive",
370 ));
371 }
372
373 let path = path.trim_end_matches('/');
374 let l = WebhdfsLister::new(self.core.clone(), path);
375 Ok((RpList::default(), oio::PageLister::new(l)))
376 }
377}