opendal/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use http::StatusCode;
25
26use super::DEFAULT_SCHEME;
27use crate::raw::*;
28use crate::services::cloudflare_kv::core::CloudflareKvCore;
29use crate::services::cloudflare_kv::delete::CloudflareKvDeleter;
30use crate::services::cloudflare_kv::error::parse_error;
31use crate::services::cloudflare_kv::lister::CloudflareKvLister;
32use crate::services::cloudflare_kv::model::*;
33use crate::services::cloudflare_kv::writer::CloudflareWriter;
34use crate::services::CloudflareKvConfig;
35use crate::ErrorKind;
36use crate::*;
37
38impl Configurator for CloudflareKvConfig {
39 type Builder = CloudflareKvBuilder;
40 fn into_builder(self) -> Self::Builder {
41 CloudflareKvBuilder {
42 config: self,
43 http_client: None,
44 }
45 }
46}
47
48#[doc = include_str!("docs.md")]
49#[derive(Default)]
50pub struct CloudflareKvBuilder {
51 config: CloudflareKvConfig,
52
53 http_client: Option<HttpClient>,
55}
56
57impl Debug for CloudflareKvBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("CloudFlareKvBuilder")
60 .field("config", &self.config)
61 .finish()
62 }
63}
64
65impl CloudflareKvBuilder {
66 pub fn api_token(mut self, api_token: &str) -> Self {
68 if !api_token.is_empty() {
69 self.config.api_token = Some(api_token.to_string())
70 }
71 self
72 }
73
74 pub fn account_id(mut self, account_id: &str) -> Self {
76 if !account_id.is_empty() {
77 self.config.account_id = Some(account_id.to_string())
78 }
79 self
80 }
81
82 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
84 if !namespace_id.is_empty() {
85 self.config.namespace_id = Some(namespace_id.to_string())
86 }
87 self
88 }
89
90 pub fn default_ttl(mut self, ttl: Duration) -> Self {
94 self.config.default_ttl = Some(ttl);
95 self
96 }
97
98 pub fn root(mut self, root: &str) -> Self {
100 self.config.root = if root.is_empty() {
101 None
102 } else {
103 Some(root.to_string())
104 };
105
106 self
107 }
108}
109
110impl Builder for CloudflareKvBuilder {
111 type Config = CloudflareKvConfig;
112
113 fn build(self) -> Result<impl Access> {
114 let api_token = match &self.config.api_token {
115 Some(api_token) => format_authorization_by_bearer(api_token)?,
116 None => {
117 return Err(Error::new(
118 ErrorKind::ConfigInvalid,
119 "api_token is required",
120 ))
121 }
122 };
123
124 let Some(account_id) = self.config.account_id.clone() else {
125 return Err(Error::new(
126 ErrorKind::ConfigInvalid,
127 "account_id is required",
128 ));
129 };
130
131 let Some(namespace_id) = self.config.namespace_id.clone() else {
132 return Err(Error::new(
133 ErrorKind::ConfigInvalid,
134 "namespace_id is required",
135 ));
136 };
137
138 if let Some(ttl) = self.config.default_ttl {
140 if ttl < Duration::from_secs(60) {
141 return Err(Error::new(
142 ErrorKind::ConfigInvalid,
143 "Default TTL must be at least 60 seconds",
144 ));
145 }
146 }
147
148 let root = normalize_root(
149 self.config
150 .root
151 .clone()
152 .unwrap_or_else(|| "/".to_string())
153 .as_str(),
154 );
155
156 Ok(CloudflareKvAccessor {
157 core: Arc::new(CloudflareKvCore {
158 api_token,
159 account_id,
160 namespace_id,
161 expiration_ttl: self.config.default_ttl,
162 info: {
163 let am = AccessorInfo::default();
164 am.set_scheme(DEFAULT_SCHEME)
165 .set_root(&root)
166 .set_native_capability(Capability {
167 create_dir: true,
168
169 stat: true,
170 stat_with_if_match: true,
171 stat_with_if_none_match: true,
172 stat_with_if_modified_since: true,
173 stat_with_if_unmodified_since: true,
174
175 read: true,
176 read_with_if_match: true,
177 read_with_if_none_match: true,
178 read_with_if_modified_since: true,
179 read_with_if_unmodified_since: true,
180
181 write: true,
182 write_can_empty: true,
183 write_total_max_size: Some(25 * 1024 * 1024),
184
185 list: true,
186 list_with_limit: true,
187 list_with_recursive: true,
188
189 delete: true,
190 delete_max_size: Some(10000),
191
192 shared: false,
193
194 ..Default::default()
195 });
196
197 #[allow(deprecated)]
199 if let Some(client) = self.http_client {
200 am.update_http_client(|_| client);
201 }
202
203 am.into()
204 },
205 }),
206 })
207 }
208}
209
210#[derive(Debug, Clone)]
211pub struct CloudflareKvAccessor {
212 core: std::sync::Arc<CloudflareKvCore>,
213}
214
215impl Access for CloudflareKvAccessor {
216 type Reader = Buffer;
217 type Writer = oio::OneShotWriter<CloudflareWriter>;
218 type Lister = oio::PageLister<CloudflareKvLister>;
219 type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
220
221 fn info(&self) -> std::sync::Arc<AccessorInfo> {
222 self.core.info.clone()
223 }
224
225 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
226 let path = build_abs_path(&self.core.info.root(), path);
227
228 if path == build_abs_path(&self.core.info.root(), "") {
229 return Ok(RpCreateDir::default());
230 }
231
232 let segments: Vec<&str> = path
234 .trim_start_matches('/')
235 .trim_end_matches('/')
236 .split('/')
237 .collect();
238
239 let mut current_path = String::from("/");
241 for segment in segments {
242 if !current_path.ends_with('/') {
244 current_path.push('/');
245 }
246 current_path.push_str(segment);
247 current_path.push('/');
248
249 let cf_kv_metadata = CfKvMetadata {
251 etag: build_tmp_path_of(¤t_path),
252 last_modified: chrono::Local::now().to_rfc3339(),
253 content_length: 0,
254 is_dir: true,
255 };
256
257 self.core
259 .set(¤t_path, Buffer::new(), cf_kv_metadata)
260 .await?;
261 }
262
263 Ok(RpCreateDir::default())
264 }
265
266 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
267 let path = build_abs_path(&self.core.info.root(), path);
268 let new_path = path.trim_end_matches('/');
269
270 let resp = self.core.metadata(new_path).await?;
271
272 if resp.status() != StatusCode::OK {
274 if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
276 let list_resp = self.core.list(&path, None, None).await?;
278
279 if list_resp.status() == StatusCode::OK {
280 let list_body = list_resp.into_body();
281 let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
282 .map_err(new_json_deserialize_error)?;
283
284 if let Some(entries) = list_result.result {
286 if !entries.is_empty() {
287 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
288 }
289 }
290
291 return Err(Error::new(
293 ErrorKind::NotFound,
294 "key not found in CloudFlare KV",
295 ));
296 }
297 }
298
299 return Err(parse_error(resp));
301 }
302
303 let resp_body = resp.into_body();
304 let cf_response: CfKvStatResponse =
305 serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
306
307 if !cf_response.success {
308 return Err(Error::new(
309 ErrorKind::Unexpected,
310 "cloudflare_kv stat this key failed for reason we don't know",
311 ));
312 }
313
314 let metadata = match cf_response.result {
315 Some(metadata) => {
316 if path.ends_with('/') && !metadata.is_dir {
317 return Err(Error::new(
318 ErrorKind::NotFound,
319 "key not found in CloudFlare KV",
320 ));
321 } else {
322 metadata
323 }
324 }
325 None => {
326 return Err(Error::new(
327 ErrorKind::NotFound,
328 "key not found in CloudFlare KV",
329 ));
330 }
331 };
332
333 if let Some(if_match) = &args.if_match() {
335 if if_match != &metadata.etag {
336 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
337 }
338 }
339
340 if let Some(if_none_match) = &args.if_none_match() {
342 if if_none_match == &metadata.etag {
343 return Err(Error::new(
344 ErrorKind::ConditionNotMatch,
345 "etag match when expected none match",
346 ));
347 }
348 }
349
350 let last_modified = chrono::DateTime::parse_from_rfc3339(&metadata.last_modified)
352 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
353
354 if let Some(modified_since) = &args.if_modified_since() {
356 if !last_modified.gt(modified_since) {
357 return Err(Error::new(
358 ErrorKind::ConditionNotMatch,
359 "not modified since specified time",
360 ));
361 }
362 }
363
364 if let Some(unmodified_since) = &args.if_unmodified_since() {
366 if !last_modified.le(unmodified_since) {
367 return Err(Error::new(
368 ErrorKind::ConditionNotMatch,
369 "modified since specified time",
370 ));
371 }
372 }
373
374 let meta = Metadata::new(if metadata.is_dir {
375 EntryMode::DIR
376 } else {
377 EntryMode::FILE
378 })
379 .with_etag(metadata.etag)
380 .with_content_length(metadata.content_length as u64)
381 .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?);
382
383 Ok(RpStat::new(meta))
384 }
385
386 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
387 let path = build_abs_path(&self.core.info.root(), path);
388 let resp = self.core.get(&path).await?;
389
390 let status = resp.status();
391
392 if status != StatusCode::OK {
393 return Err(parse_error(resp));
394 }
395
396 let resp_body = resp.into_body();
397
398 if args.if_match().is_some()
399 || args.if_none_match().is_some()
400 || args.if_modified_since().is_some()
401 || args.if_unmodified_since().is_some()
402 {
403 let meta_resp = self.core.metadata(&path).await?;
404
405 if meta_resp.status() != StatusCode::OK {
406 return Err(parse_error(meta_resp));
407 }
408
409 let cf_response: CfKvStatResponse =
410 serde_json::from_reader(meta_resp.into_body().reader())
411 .map_err(new_json_deserialize_error)?;
412
413 if !cf_response.success && cf_response.result.is_some() {
414 return Err(Error::new(
415 ErrorKind::Unexpected,
416 "cloudflare_kv read this key failed for reason we don't know",
417 ));
418 }
419
420 let metadata = cf_response.result.unwrap();
421
422 if let Some(if_match) = &args.if_match() {
424 if if_match != &metadata.etag {
425 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
426 }
427 }
428
429 if let Some(if_none_match) = &args.if_none_match() {
431 if if_none_match == &metadata.etag {
432 return Err(Error::new(
433 ErrorKind::ConditionNotMatch,
434 "etag match when expected none match",
435 ));
436 }
437 }
438
439 let last_modified = chrono::DateTime::parse_from_rfc3339(&metadata.last_modified)
441 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
442
443 if let Some(modified_since) = &args.if_modified_since() {
445 if !last_modified.gt(modified_since) {
446 return Err(Error::new(
447 ErrorKind::ConditionNotMatch,
448 "not modified since specified time",
449 ));
450 }
451 }
452
453 if let Some(unmodified_since) = &args.if_unmodified_since() {
455 if !last_modified.le(unmodified_since) {
456 return Err(Error::new(
457 ErrorKind::ConditionNotMatch,
458 "modified since specified time",
459 ));
460 }
461 }
462 }
463
464 let range = args.range();
465 let buffer = if range.is_full() {
466 resp_body
467 } else {
468 let start = range.offset() as usize;
469 let end = match range.size() {
470 Some(size) => (range.offset() + size) as usize,
471 None => resp_body.len(),
472 };
473 resp_body.slice(start..end.min(resp_body.len()))
474 };
475 Ok((RpRead::new(), buffer))
476 }
477
478 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
479 let path = build_abs_path(&self.core.info.root(), path);
480 let writer = CloudflareWriter::new(self.core.clone(), path);
481
482 let w = oio::OneShotWriter::new(writer);
483
484 Ok((RpWrite::default(), w))
485 }
486
487 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
488 Ok((
489 RpDelete::default(),
490 oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
491 ))
492 }
493
494 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
495 let path = build_abs_path(&self.core.info.root(), path);
496
497 let limit = match args.limit() {
498 Some(limit) => {
499 if !(10..=1000).contains(&limit) {
501 1000
502 } else {
503 limit
504 }
505 }
506 None => 1000,
507 };
508
509 let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
510
511 Ok((RpList::default(), oio::PageLister::new(l)))
512 }
513}