opendal/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use super::CLOUDFLARE_KV_SCHEME;
24use crate::ErrorKind;
25use crate::raw::*;
26use crate::services::CloudflareKvConfig;
27use crate::services::cloudflare_kv::core::CloudflareKvCore;
28use crate::services::cloudflare_kv::delete::CloudflareKvDeleter;
29use crate::services::cloudflare_kv::error::parse_error;
30use crate::services::cloudflare_kv::lister::CloudflareKvLister;
31use crate::services::cloudflare_kv::model::*;
32use crate::services::cloudflare_kv::writer::CloudflareWriter;
33use crate::*;
34use bytes::Buf;
35use http::StatusCode;
36
37#[doc = include_str!("docs.md")]
38#[derive(Default)]
39pub struct CloudflareKvBuilder {
40 pub(super) config: CloudflareKvConfig,
41
42 pub(super) http_client: Option<HttpClient>,
44}
45
46impl Debug for CloudflareKvBuilder {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("CloudFlareKvBuilder")
49 .field("config", &self.config)
50 .finish()
51 }
52}
53
54impl CloudflareKvBuilder {
55 pub fn api_token(mut self, api_token: &str) -> Self {
57 if !api_token.is_empty() {
58 self.config.api_token = Some(api_token.to_string())
59 }
60 self
61 }
62
63 pub fn account_id(mut self, account_id: &str) -> Self {
65 if !account_id.is_empty() {
66 self.config.account_id = Some(account_id.to_string())
67 }
68 self
69 }
70
71 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
73 if !namespace_id.is_empty() {
74 self.config.namespace_id = Some(namespace_id.to_string())
75 }
76 self
77 }
78
79 pub fn default_ttl(mut self, ttl: Duration) -> Self {
83 self.config.default_ttl = Some(ttl);
84 self
85 }
86
87 pub fn root(mut self, root: &str) -> Self {
89 self.config.root = if root.is_empty() {
90 None
91 } else {
92 Some(root.to_string())
93 };
94
95 self
96 }
97}
98
99impl Builder for CloudflareKvBuilder {
100 type Config = CloudflareKvConfig;
101
102 fn build(self) -> Result<impl Access> {
103 let api_token = match &self.config.api_token {
104 Some(api_token) => format_authorization_by_bearer(api_token)?,
105 None => {
106 return Err(Error::new(
107 ErrorKind::ConfigInvalid,
108 "api_token is required",
109 ));
110 }
111 };
112
113 let Some(account_id) = self.config.account_id.clone() else {
114 return Err(Error::new(
115 ErrorKind::ConfigInvalid,
116 "account_id is required",
117 ));
118 };
119
120 let Some(namespace_id) = self.config.namespace_id.clone() else {
121 return Err(Error::new(
122 ErrorKind::ConfigInvalid,
123 "namespace_id is required",
124 ));
125 };
126
127 if let Some(ttl) = self.config.default_ttl {
129 if ttl < Duration::from_secs(60) {
130 return Err(Error::new(
131 ErrorKind::ConfigInvalid,
132 "Default TTL must be at least 60 seconds",
133 ));
134 }
135 }
136
137 let root = normalize_root(
138 self.config
139 .root
140 .clone()
141 .unwrap_or_else(|| "/".to_string())
142 .as_str(),
143 );
144
145 Ok(CloudflareKvAccessor {
146 core: Arc::new(CloudflareKvCore {
147 api_token,
148 account_id,
149 namespace_id,
150 expiration_ttl: self.config.default_ttl,
151 info: {
152 let am = AccessorInfo::default();
153 am.set_scheme(CLOUDFLARE_KV_SCHEME)
154 .set_root(&root)
155 .set_native_capability(Capability {
156 create_dir: true,
157
158 stat: true,
159 stat_with_if_match: true,
160 stat_with_if_none_match: true,
161 stat_with_if_modified_since: true,
162 stat_with_if_unmodified_since: true,
163
164 read: true,
165 read_with_if_match: true,
166 read_with_if_none_match: true,
167 read_with_if_modified_since: true,
168 read_with_if_unmodified_since: true,
169
170 write: true,
171 write_can_empty: true,
172 write_total_max_size: Some(25 * 1024 * 1024),
173
174 list: true,
175 list_with_limit: true,
176 list_with_recursive: true,
177
178 delete: true,
179 delete_max_size: Some(10000),
180
181 shared: false,
182
183 ..Default::default()
184 });
185
186 #[allow(deprecated)]
188 if let Some(client) = self.http_client {
189 am.update_http_client(|_| client);
190 }
191
192 am.into()
193 },
194 }),
195 })
196 }
197}
198
199#[derive(Debug, Clone)]
200pub struct CloudflareKvAccessor {
201 core: std::sync::Arc<CloudflareKvCore>,
202}
203
204impl Access for CloudflareKvAccessor {
205 type Reader = Buffer;
206 type Writer = oio::OneShotWriter<CloudflareWriter>;
207 type Lister = oio::PageLister<CloudflareKvLister>;
208 type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
209
210 fn info(&self) -> std::sync::Arc<AccessorInfo> {
211 self.core.info.clone()
212 }
213
214 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
215 let path = build_abs_path(&self.core.info.root(), path);
216
217 if path == build_abs_path(&self.core.info.root(), "") {
218 return Ok(RpCreateDir::default());
219 }
220
221 let segments: Vec<&str> = path
223 .trim_start_matches('/')
224 .trim_end_matches('/')
225 .split('/')
226 .collect();
227
228 let mut current_path = String::from("/");
230 for segment in segments {
231 if !current_path.ends_with('/') {
233 current_path.push('/');
234 }
235 current_path.push_str(segment);
236 current_path.push('/');
237
238 let cf_kv_metadata = CfKvMetadata {
240 etag: build_tmp_path_of(¤t_path),
241 last_modified: Timestamp::now().to_string(),
242 content_length: 0,
243 is_dir: true,
244 };
245
246 self.core
248 .set(¤t_path, Buffer::new(), cf_kv_metadata)
249 .await?;
250 }
251
252 Ok(RpCreateDir::default())
253 }
254
255 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
256 let path = build_abs_path(&self.core.info.root(), path);
257 let new_path = path.trim_end_matches('/');
258
259 let resp = self.core.metadata(new_path).await?;
260
261 if resp.status() != StatusCode::OK {
263 if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
265 let list_resp = self.core.list(&path, None, None).await?;
267
268 if list_resp.status() == StatusCode::OK {
269 let list_body = list_resp.into_body();
270 let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
271 .map_err(new_json_deserialize_error)?;
272
273 if let Some(entries) = list_result.result {
275 if !entries.is_empty() {
276 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
277 }
278 }
279
280 return Err(Error::new(
282 ErrorKind::NotFound,
283 "key not found in CloudFlare KV",
284 ));
285 }
286 }
287
288 return Err(parse_error(resp));
290 }
291
292 let resp_body = resp.into_body();
293 let cf_response: CfKvStatResponse =
294 serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
295
296 if !cf_response.success {
297 return Err(Error::new(
298 ErrorKind::Unexpected,
299 "cloudflare_kv stat this key failed for reason we don't know",
300 ));
301 }
302
303 let metadata = match cf_response.result {
304 Some(metadata) => {
305 if path.ends_with('/') && !metadata.is_dir {
306 return Err(Error::new(
307 ErrorKind::NotFound,
308 "key not found in CloudFlare KV",
309 ));
310 } else {
311 metadata
312 }
313 }
314 None => {
315 return Err(Error::new(
316 ErrorKind::NotFound,
317 "key not found in CloudFlare KV",
318 ));
319 }
320 };
321
322 if let Some(if_match) = &args.if_match() {
324 if if_match != &metadata.etag {
325 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
326 }
327 }
328
329 if let Some(if_none_match) = &args.if_none_match() {
331 if if_none_match == &metadata.etag {
332 return Err(Error::new(
333 ErrorKind::ConditionNotMatch,
334 "etag match when expected none match",
335 ));
336 }
337 }
338
339 let last_modified = metadata
341 .last_modified
342 .parse::<Timestamp>()
343 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
344
345 if let Some(modified_since) = &args.if_modified_since() {
347 if !last_modified.gt(modified_since) {
348 return Err(Error::new(
349 ErrorKind::ConditionNotMatch,
350 "not modified since specified time",
351 ));
352 }
353 }
354
355 if let Some(unmodified_since) = &args.if_unmodified_since() {
357 if !last_modified.le(unmodified_since) {
358 return Err(Error::new(
359 ErrorKind::ConditionNotMatch,
360 "modified since specified time",
361 ));
362 }
363 }
364
365 let meta = Metadata::new(if metadata.is_dir {
366 EntryMode::DIR
367 } else {
368 EntryMode::FILE
369 })
370 .with_etag(metadata.etag)
371 .with_content_length(metadata.content_length as u64)
372 .with_last_modified(metadata.last_modified.parse::<Timestamp>()?);
373
374 Ok(RpStat::new(meta))
375 }
376
377 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
378 let path = build_abs_path(&self.core.info.root(), path);
379 let resp = self.core.get(&path).await?;
380
381 let status = resp.status();
382
383 if status != StatusCode::OK {
384 return Err(parse_error(resp));
385 }
386
387 let resp_body = resp.into_body();
388
389 if args.if_match().is_some()
390 || args.if_none_match().is_some()
391 || args.if_modified_since().is_some()
392 || args.if_unmodified_since().is_some()
393 {
394 let meta_resp = self.core.metadata(&path).await?;
395
396 if meta_resp.status() != StatusCode::OK {
397 return Err(parse_error(meta_resp));
398 }
399
400 let cf_response: CfKvStatResponse =
401 serde_json::from_reader(meta_resp.into_body().reader())
402 .map_err(new_json_deserialize_error)?;
403
404 if !cf_response.success && cf_response.result.is_some() {
405 return Err(Error::new(
406 ErrorKind::Unexpected,
407 "cloudflare_kv read this key failed for reason we don't know",
408 ));
409 }
410
411 let metadata = cf_response.result.unwrap();
412
413 if let Some(if_match) = &args.if_match() {
415 if if_match != &metadata.etag {
416 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
417 }
418 }
419
420 if let Some(if_none_match) = &args.if_none_match() {
422 if if_none_match == &metadata.etag {
423 return Err(Error::new(
424 ErrorKind::ConditionNotMatch,
425 "etag match when expected none match",
426 ));
427 }
428 }
429
430 let last_modified = metadata
432 .last_modified
433 .parse::<Timestamp>()
434 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
435
436 if let Some(modified_since) = &args.if_modified_since() {
438 if !last_modified.gt(modified_since) {
439 return Err(Error::new(
440 ErrorKind::ConditionNotMatch,
441 "not modified since specified time",
442 ));
443 }
444 }
445
446 if let Some(unmodified_since) = &args.if_unmodified_since() {
448 if !last_modified.le(unmodified_since) {
449 return Err(Error::new(
450 ErrorKind::ConditionNotMatch,
451 "modified since specified time",
452 ));
453 }
454 }
455 }
456
457 let range = args.range();
458 let buffer = if range.is_full() {
459 resp_body
460 } else {
461 let start = range.offset() as usize;
462 let end = match range.size() {
463 Some(size) => (range.offset() + size) as usize,
464 None => resp_body.len(),
465 };
466 resp_body.slice(start..end.min(resp_body.len()))
467 };
468 Ok((RpRead::new(), buffer))
469 }
470
471 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
472 let path = build_abs_path(&self.core.info.root(), path);
473 let writer = CloudflareWriter::new(self.core.clone(), path);
474
475 let w = oio::OneShotWriter::new(writer);
476
477 Ok((RpWrite::default(), w))
478 }
479
480 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
481 Ok((
482 RpDelete::default(),
483 oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
484 ))
485 }
486
487 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
488 let path = build_abs_path(&self.core.info.root(), path);
489
490 let limit = match args.limit() {
491 Some(limit) => {
492 if !(10..=1000).contains(&limit) {
494 1000
495 } else {
496 limit
497 }
498 }
499 None => 1000,
500 };
501
502 let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
503
504 Ok((RpList::default(), oio::PageLister::new(l)))
505 }
506}