1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::CopyMode as ObjectStoreCopyMode;
35use object_store::CopyOptions as ObjectStoreCopyOptions;
36use object_store::ListResult;
37use object_store::MultipartUpload;
38use object_store::ObjectMeta;
39use object_store::ObjectStore;
40use object_store::PutMultipartOptions;
41use object_store::PutOptions;
42use object_store::PutPayload;
43use object_store::PutResult;
44use object_store::path::Path;
45use object_store::{GetOptions, UploadPart};
46use object_store::{GetRange, GetResultPayload};
47use object_store::{GetResult, PutMode};
48use opendal::Buffer;
49use opendal::Writer;
50use opendal::options::CopyOptions;
51use opendal::raw::percent_decode_path;
52use opendal::{Operator, OperatorInfo};
53use std::collections::HashMap;
54
55#[derive(Clone)]
103pub struct OpendalStore {
104 info: Arc<OperatorInfo>,
105 inner: Operator,
106}
107
108impl OpendalStore {
109 pub fn new(op: Operator) -> Self {
111 Self {
112 info: op.info().into(),
113 inner: op,
114 }
115 }
116
117 pub fn info(&self) -> &OperatorInfo {
119 self.info.as_ref()
120 }
121
122 async fn copy_request(
124 &self,
125 from: &Path,
126 to: &Path,
127 if_not_exists: bool,
128 ) -> object_store::Result<()> {
129 let mut copy_options = CopyOptions::default();
130 if if_not_exists {
131 copy_options.if_not_exists = true;
132 }
133
134 self.inner
136 .copy_options(
137 &percent_decode_path(from.as_ref()),
138 &percent_decode_path(to.as_ref()),
139 copy_options,
140 )
141 .into_send()
142 .await
143 .map_err(|err| {
144 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
145 object_store::Error::AlreadyExists {
146 path: to.to_string(),
147 source: Box::new(err),
148 }
149 } else {
150 format_object_store_error(err, from.as_ref())
151 }
152 })?;
153
154 Ok(())
155 }
156}
157
158impl Debug for OpendalStore {
159 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
160 f.debug_struct("OpendalStore")
161 .field("scheme", &self.info.scheme())
162 .field("name", &self.info.name())
163 .field("root", &self.info.root())
164 .field("capability", &self.info.full_capability())
165 .finish()
166 }
167}
168
169impl Display for OpendalStore {
170 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
171 let info = self.inner.info();
172 write!(
173 f,
174 "Opendal({}, bucket={}, root={})",
175 info.scheme(),
176 info.name(),
177 info.root()
178 )
179 }
180}
181
182impl From<Operator> for OpendalStore {
183 fn from(value: Operator) -> Self {
184 Self::new(value)
185 }
186}
187
188#[async_trait]
189impl ObjectStore for OpendalStore {
190 async fn put_opts(
191 &self,
192 location: &Path,
193 bytes: PutPayload,
194 opts: PutOptions,
195 ) -> object_store::Result<PutResult> {
196 let decoded_location = percent_decode_path(location.as_ref());
197 let mut future_write = self
198 .inner
199 .write_with(&decoded_location, Buffer::from_iter(bytes));
200 let opts_mode = opts.mode.clone();
201 match opts.mode {
202 PutMode::Overwrite => {}
203 PutMode::Create => {
204 future_write = future_write.if_not_exists(true);
205 }
206 PutMode::Update(update_version) => {
207 let Some(etag) = update_version.e_tag else {
208 Err(object_store::Error::NotSupported {
209 source: Box::new(opendal::Error::new(
210 opendal::ErrorKind::Unsupported,
211 "etag is required for conditional put",
212 )),
213 })?
214 };
215 future_write = future_write.if_match(etag.as_str());
216 }
217 }
218 let rp = future_write.into_send().await.map_err(|err| {
219 match format_object_store_error(err, location.as_ref()) {
220 object_store::Error::Precondition { path, source }
221 if opts_mode == PutMode::Create =>
222 {
223 object_store::Error::AlreadyExists { path, source }
224 }
225 e => e,
226 }
227 })?;
228
229 let e_tag = rp.etag().map(|s| s.to_string());
230 let version = rp.version().map(|s| s.to_string());
231
232 Ok(PutResult { e_tag, version })
233 }
234
235 async fn put_multipart_opts(
236 &self,
237 location: &Path,
238 opts: PutMultipartOptions,
239 ) -> object_store::Result<Box<dyn MultipartUpload>> {
240 const DEFAULT_CONCURRENT: usize = 8;
241
242 let mut options = opendal::options::WriteOptions {
243 concurrent: DEFAULT_CONCURRENT,
244 ..Default::default()
245 };
246
247 let mut user_metadata = HashMap::new();
249
250 for (key, value) in opts.attributes.iter() {
252 match key {
253 object_store::Attribute::CacheControl => {
254 options.cache_control = Some(value.to_string());
255 }
256 object_store::Attribute::ContentDisposition => {
257 options.content_disposition = Some(value.to_string());
258 }
259 object_store::Attribute::ContentEncoding => {
260 options.content_encoding = Some(value.to_string());
261 }
262 object_store::Attribute::ContentLanguage => {
263 continue;
265 }
266 object_store::Attribute::ContentType => {
267 options.content_type = Some(value.to_string());
268 }
269 object_store::Attribute::Metadata(k) => {
270 user_metadata.insert(k.to_string(), value.to_string());
271 }
272 _ => {}
273 }
274 }
275
276 if !user_metadata.is_empty() {
278 options.user_metadata = Some(user_metadata);
279 }
280
281 let decoded_location = percent_decode_path(location.as_ref());
282 let writer = self
283 .inner
284 .writer_options(&decoded_location, options)
285 .into_send()
286 .await
287 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
288 let upload = OpendalMultipartUpload::new(writer, location.clone());
289
290 Ok(Box::new(upload))
291 }
292
293 async fn get_opts(
294 &self,
295 location: &Path,
296 options: GetOptions,
297 ) -> object_store::Result<GetResult> {
298 let raw_location = percent_decode_path(location.as_ref());
299 let meta = {
300 let mut s = self.inner.stat_with(&raw_location);
301 if let Some(version) = &options.version {
302 s = s.version(version.as_str())
303 }
304 if let Some(if_match) = &options.if_match {
305 s = s.if_match(if_match.as_str());
306 }
307 if let Some(if_none_match) = &options.if_none_match {
308 s = s.if_none_match(if_none_match.as_str());
309 }
310 if let Some(if_modified_since) =
311 options.if_modified_since.and_then(datetime_to_timestamp)
312 {
313 s = s.if_modified_since(if_modified_since);
314 }
315 if let Some(if_unmodified_since) =
316 options.if_unmodified_since.and_then(datetime_to_timestamp)
317 {
318 s = s.if_unmodified_since(if_unmodified_since);
319 }
320 s.into_send()
321 .await
322 .map_err(|err| format_object_store_error(err, location.as_ref()))?
323 };
324
325 let mut attributes = object_store::Attributes::new();
327 if let Some(user_meta) = meta.user_metadata() {
328 for (key, value) in user_meta {
329 attributes.insert(
330 object_store::Attribute::Metadata(key.clone().into()),
331 value.clone().into(),
332 );
333 }
334 }
335
336 let meta = ObjectMeta {
337 location: location.clone(),
338 last_modified: meta
339 .last_modified()
340 .and_then(timestamp_to_datetime)
341 .unwrap_or_default(),
342 size: meta.content_length(),
343 e_tag: meta.etag().map(|x| x.to_string()),
344 version: meta.version().map(|x| x.to_string()),
345 };
346
347 if options.head {
348 return Ok(GetResult {
349 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
350 range: 0..0,
351 meta,
352 attributes,
353 });
354 }
355
356 let reader = {
357 let mut r = self.inner.reader_with(raw_location.as_ref());
358 if let Some(version) = options.version {
359 r = r.version(version.as_str());
360 }
361 if let Some(if_match) = options.if_match {
362 r = r.if_match(if_match.as_str());
363 }
364 if let Some(if_none_match) = options.if_none_match {
365 r = r.if_none_match(if_none_match.as_str());
366 }
367 if let Some(if_modified_since) =
368 options.if_modified_since.and_then(datetime_to_timestamp)
369 {
370 r = r.if_modified_since(if_modified_since);
371 }
372 if let Some(if_unmodified_since) =
373 options.if_unmodified_since.and_then(datetime_to_timestamp)
374 {
375 r = r.if_unmodified_since(if_unmodified_since);
376 }
377 r.into_send()
378 .await
379 .map_err(|err| format_object_store_error(err, location.as_ref()))?
380 };
381
382 let read_range = match options.range {
383 Some(GetRange::Bounded(r)) => {
384 if r.start >= r.end || r.start >= meta.size {
385 0..0
386 } else {
387 let end = r.end.min(meta.size);
388 r.start..end
389 }
390 }
391 Some(GetRange::Offset(r)) => {
392 if r < meta.size {
393 r..meta.size
394 } else {
395 0..0
396 }
397 }
398 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
399 _ => 0..meta.size,
400 };
401
402 let stream = reader
403 .into_bytes_stream(read_range.start..read_range.end)
404 .into_send()
405 .await
406 .map_err(|err| format_object_store_error(err, location.as_ref()))?
407 .into_send()
408 .map_err(|err: io::Error| object_store::Error::Generic {
409 store: "IoError",
410 source: Box::new(err),
411 });
412
413 Ok(GetResult {
414 payload: GetResultPayload::Stream(Box::pin(stream)),
415 range: read_range.start..read_range.end,
416 meta,
417 attributes,
418 })
419 }
420
421 async fn get_ranges(
422 &self,
423 location: &Path,
424 ranges: &[Range<u64>],
425 ) -> object_store::Result<Vec<Bytes>> {
426 let raw_location = percent_decode_path(location.as_ref());
427 let reader = self
428 .inner
429 .reader_with(&raw_location)
430 .into_send()
431 .await
432 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
433
434 let mut results = Vec::with_capacity(ranges.len());
435 for range in ranges {
436 let data = reader
437 .read(range.start..range.end)
438 .into_send()
439 .await
440 .map(|buf| buf.to_bytes())
441 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
442 results.push(data);
443 }
444 Ok(results)
445 }
446
447 fn delete_stream(
448 &self,
449 locations: BoxStream<'static, object_store::Result<Path>>,
450 ) -> BoxStream<'static, object_store::Result<Path>> {
451 let this = self.clone();
452 locations
453 .and_then(move |location| {
454 let this = this.clone();
455 async move {
456 let decoded = percent_decode_path(location.as_ref());
457 this.inner
458 .delete(&decoded)
459 .into_send()
460 .await
461 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
462 Ok(location)
463 }
464 .into_send()
465 })
466 .boxed()
467 }
468
469 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
470 let path = prefix.map_or("".into(), |x| {
473 format!("{}/", percent_decode_path(x.as_ref()))
474 });
475
476 let this = self.clone();
477 let fut = async move {
478 let stream = this
479 .inner
480 .lister_with(&path)
481 .recursive(true)
482 .await
483 .map_err(|err| format_object_store_error(err, &path))?;
484
485 let stream = stream.then(|res| async {
486 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
487 let meta = entry.metadata();
488
489 Ok(format_object_meta(entry.path(), meta))
490 });
491 Ok::<_, object_store::Error>(stream)
492 };
493
494 fut.into_stream().try_flatten().into_send().boxed()
495 }
496
497 fn list_with_offset(
498 &self,
499 prefix: Option<&Path>,
500 offset: &Path,
501 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
502 let path = prefix.map_or("".into(), |x| {
503 format!("{}/", percent_decode_path(x.as_ref()))
504 });
505 let offset = offset.clone();
506
507 let this = self.clone();
510
511 let fut = async move {
512 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
513 let mut fut = this.inner.lister_with(&path).recursive(true);
514
515 if list_with_start_after {
517 fut = fut.start_after(offset.as_ref());
518 }
519
520 let lister = fut
521 .await
522 .map_err(|err| format_object_store_error(err, &path))?
523 .then(move |entry| {
524 let path = path.clone();
525 let this = this.clone();
526 async move {
527 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
528 let (path, metadata) = entry.into_parts();
529
530 if metadata.is_dir() || metadata.last_modified().is_some() {
532 let object_meta = format_object_meta(&path, &metadata);
533 return Ok(object_meta);
534 }
535
536 let metadata = this
537 .inner
538 .stat(&path)
539 .await
540 .map_err(|err| format_object_store_error(err, &path))?;
541 let object_meta = format_object_meta(&path, &metadata);
542 Ok::<_, object_store::Error>(object_meta)
543 }
544 })
545 .into_send()
546 .boxed();
547
548 let stream = if list_with_start_after {
549 lister
550 } else {
551 lister
552 .try_filter(move |entry| futures::future::ready(entry.location > offset))
553 .into_send()
554 .boxed()
555 };
556
557 Ok::<_, object_store::Error>(stream)
558 };
559
560 fut.into_stream().into_send().try_flatten().boxed()
561 }
562
563 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
564 let path = prefix.map_or("".into(), |x| {
565 format!("{}/", percent_decode_path(x.as_ref()))
566 });
567 let mut stream = self
568 .inner
569 .lister_with(&path)
570 .into_future()
571 .into_send()
572 .await
573 .map_err(|err| format_object_store_error(err, &path))?
574 .into_send();
575
576 let mut common_prefixes = Vec::new();
577 let mut objects = Vec::new();
578
579 while let Some(res) = stream.next().into_send().await {
580 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
581 let meta = entry.metadata();
582
583 if meta.is_dir() {
584 common_prefixes.push(entry.path().into());
585 } else if meta.last_modified().is_some() {
586 objects.push(format_object_meta(entry.path(), meta));
587 } else {
588 let meta = self
589 .inner
590 .stat(entry.path())
591 .into_send()
592 .await
593 .map_err(|err| format_object_store_error(err, entry.path()))?;
594 objects.push(format_object_meta(entry.path(), &meta));
595 }
596 }
597
598 Ok(ListResult {
599 common_prefixes,
600 objects,
601 })
602 }
603
604 async fn copy_opts(
605 &self,
606 from: &Path,
607 to: &Path,
608 options: ObjectStoreCopyOptions,
609 ) -> object_store::Result<()> {
610 let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
611 self.copy_request(from, to, if_not_exists).await
612 }
613}
614
615struct OpendalMultipartUpload {
624 writer: Arc<Mutex<Writer>>,
625 location: Path,
626 next_notify: oneshot::Receiver<()>,
627}
628
629impl OpendalMultipartUpload {
630 fn new(writer: Writer, location: Path) -> Self {
631 let (_, rx) = oneshot::channel();
633
634 Self {
635 writer: Arc::new(Mutex::new(writer)),
636 location,
637 next_notify: rx,
638 }
639 }
640}
641
642#[async_trait]
643impl MultipartUpload for OpendalMultipartUpload {
644 fn put_part(&mut self, data: PutPayload) -> UploadPart {
645 let writer = self.writer.clone();
646 let location = self.location.clone();
647
648 let (tx, rx) = oneshot::channel();
650 let last_rx = std::mem::replace(&mut self.next_notify, rx);
652
653 async move {
654 let _ = last_rx.await;
656
657 let mut writer = writer.lock().await;
658 let result = writer
659 .write(Buffer::from_iter(data.into_iter()))
660 .await
661 .map_err(|err| format_object_store_error(err, location.as_ref()));
662
663 drop(tx);
665
666 result
667 }
668 .into_send()
669 .boxed()
670 }
671
672 async fn complete(&mut self) -> object_store::Result<PutResult> {
673 let mut writer = self.writer.lock().await;
674 let metadata = writer
675 .close()
676 .into_send()
677 .await
678 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
679
680 let e_tag = metadata.etag().map(|s| s.to_string());
681 let version = metadata.version().map(|s| s.to_string());
682
683 Ok(PutResult { e_tag, version })
684 }
685
686 async fn abort(&mut self) -> object_store::Result<()> {
687 let mut writer = self.writer.lock().await;
688 writer
689 .abort()
690 .into_send()
691 .await
692 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
693 }
694}
695
696impl Debug for OpendalMultipartUpload {
697 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
698 f.debug_struct("OpendalMultipartUpload")
699 .field("location", &self.location)
700 .finish()
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use bytes::Bytes;
707 use object_store::path::Path;
708 use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
709 use opendal::services;
710 use rand::prelude::*;
711 use std::sync::Arc;
712
713 use super::*;
714
715 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
716 let op = Operator::new(services::Memory::default()).unwrap().finish();
717 let object_store = Arc::new(OpendalStore::new(op));
718
719 let path: Path = "data/test.txt".into();
720 let bytes = Bytes::from_static(b"hello, world!");
721 object_store.put(&path, bytes.into()).await.unwrap();
722
723 let path: Path = "data/nested/test.txt".into();
724 let bytes = Bytes::from_static(b"hello, world! I am nested.");
725 object_store.put(&path, bytes.into()).await.unwrap();
726
727 object_store
728 }
729
730 #[tokio::test]
731 async fn test_basic() {
732 let op = Operator::new(services::Memory::default()).unwrap().finish();
733 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
734
735 let path: Path = "data/test.txt".into();
737
738 let bytes = Bytes::from_static(b"hello, world!");
739 object_store.put(&path, bytes.clone().into()).await.unwrap();
740
741 let meta = object_store.head(&path).await.unwrap();
742
743 assert_eq!(meta.size, 13);
744
745 assert_eq!(
746 object_store
747 .get(&path)
748 .await
749 .unwrap()
750 .bytes()
751 .await
752 .unwrap(),
753 bytes
754 );
755 }
756
757 #[tokio::test]
758 async fn test_put_multipart() {
759 let op = Operator::new(services::Memory::default()).unwrap().finish();
760 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
761
762 let mut rng = thread_rng();
763
764 let path: Path = "data/test_complete.txt".into();
766 let upload = object_store.put_multipart(&path).await.unwrap();
767
768 let mut write = WriteMultipart::new(upload);
769
770 let mut all_bytes = vec![];
771 let round = rng.gen_range(1..=1024);
772 for _ in 0..round {
773 let size = rng.gen_range(1..=1024);
774 let mut bytes = vec![0; size];
775 rng.fill_bytes(&mut bytes);
776
777 all_bytes.extend_from_slice(&bytes);
778 write.put(bytes.into());
779 }
780
781 let _ = write.finish().await.unwrap();
782
783 let meta = object_store.head(&path).await.unwrap();
784
785 assert_eq!(meta.size, all_bytes.len() as u64);
786
787 assert_eq!(
788 object_store
789 .get(&path)
790 .await
791 .unwrap()
792 .bytes()
793 .await
794 .unwrap(),
795 Bytes::from(all_bytes)
796 );
797
798 let path: Path = "data/test_abort.txt".into();
800 let mut upload = object_store.put_multipart(&path).await.unwrap();
801 upload.put_part(vec![1; 1024].into()).await.unwrap();
802 upload.abort().await.unwrap();
803
804 let res = object_store.head(&path).await;
805 let err = res.unwrap_err();
806
807 assert!(matches!(err, object_store::Error::NotFound { .. }))
808 }
809
810 #[tokio::test]
811 async fn test_list() {
812 let object_store = create_test_object_store().await;
813 let path: Path = "data/".into();
814 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
815 assert_eq!(results.len(), 2);
816 let mut locations = results
817 .iter()
818 .map(|x| x.as_ref().unwrap().location.as_ref())
819 .collect::<Vec<_>>();
820
821 let expected_files = vec![
822 (
823 "data/nested/test.txt",
824 Bytes::from_static(b"hello, world! I am nested."),
825 ),
826 ("data/test.txt", Bytes::from_static(b"hello, world!")),
827 ];
828
829 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
830
831 locations.sort();
832 assert_eq!(locations, expected_locations);
833
834 for (location, bytes) in expected_files {
835 let path: Path = location.into();
836 assert_eq!(
837 object_store
838 .get(&path)
839 .await
840 .unwrap()
841 .bytes()
842 .await
843 .unwrap(),
844 bytes
845 );
846 }
847 }
848
849 #[tokio::test]
850 async fn test_list_with_delimiter() {
851 let object_store = create_test_object_store().await;
852 let path: Path = "data/".into();
853 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
854 assert_eq!(result.objects.len(), 1);
855 assert_eq!(result.common_prefixes.len(), 1);
856 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
857 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
858 }
859
860 #[tokio::test]
861 async fn test_list_with_offset() {
862 let object_store = create_test_object_store().await;
863 let path: Path = "data/".into();
864 let offset: Path = "data/nested/test.txt".into();
865 let result = object_store
866 .list_with_offset(Some(&path), &offset)
867 .collect::<Vec<_>>()
868 .await;
869 assert_eq!(result.len(), 1);
870 assert_eq!(
871 result[0].as_ref().unwrap().location.as_ref(),
872 "data/test.txt"
873 );
874 }
875
876 mod stat_counter {
878 use super::*;
879 use std::sync::atomic::{AtomicUsize, Ordering};
880
881 #[derive(Debug, Clone)]
882 pub struct StatCounterLayer {
883 count: Arc<AtomicUsize>,
884 }
885
886 impl StatCounterLayer {
887 pub fn new(count: Arc<AtomicUsize>) -> Self {
888 Self { count }
889 }
890 }
891
892 impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
893 type LayeredAccess = StatCounterAccessor<A>;
894
895 fn layer(&self, inner: A) -> Self::LayeredAccess {
896 StatCounterAccessor {
897 inner,
898 count: self.count.clone(),
899 }
900 }
901 }
902
903 #[derive(Debug, Clone)]
904 pub struct StatCounterAccessor<A> {
905 inner: A,
906 count: Arc<AtomicUsize>,
907 }
908
909 impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
910 type Inner = A;
911 type Reader = A::Reader;
912 type Writer = A::Writer;
913 type Lister = A::Lister;
914 type Deleter = A::Deleter;
915
916 fn inner(&self) -> &Self::Inner {
917 &self.inner
918 }
919
920 async fn stat(
921 &self,
922 path: &str,
923 args: opendal::raw::OpStat,
924 ) -> opendal::Result<opendal::raw::RpStat> {
925 self.count.fetch_add(1, Ordering::SeqCst);
926 self.inner.stat(path, args).await
927 }
928
929 async fn read(
930 &self,
931 path: &str,
932 args: opendal::raw::OpRead,
933 ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
934 self.inner.read(path, args).await
935 }
936
937 async fn write(
938 &self,
939 path: &str,
940 args: opendal::raw::OpWrite,
941 ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
942 self.inner.write(path, args).await
943 }
944
945 async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
946 self.inner.delete().await
947 }
948
949 async fn list(
950 &self,
951 path: &str,
952 args: opendal::raw::OpList,
953 ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
954 self.inner.list(path, args).await
955 }
956
957 async fn copy(
958 &self,
959 from: &str,
960 to: &str,
961 args: opendal::raw::OpCopy,
962 ) -> opendal::Result<opendal::raw::RpCopy> {
963 self.inner.copy(from, to, args).await
964 }
965
966 async fn rename(
967 &self,
968 from: &str,
969 to: &str,
970 args: opendal::raw::OpRename,
971 ) -> opendal::Result<opendal::raw::RpRename> {
972 self.inner.rename(from, to, args).await
973 }
974 }
975 }
976
977 #[tokio::test]
978 async fn test_get_range_no_stat() {
979 use std::sync::atomic::{AtomicUsize, Ordering};
980
981 let stat_count = Arc::new(AtomicUsize::new(0));
983 let op = Operator::new(opendal::services::Memory::default())
984 .unwrap()
985 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
986 .finish();
987 let store = OpendalStore::new(op);
988
989 let location = "test_get_range.txt".into();
991 let value = Bytes::from_static(b"Hello, world!");
992 store.put(&location, value.clone().into()).await.unwrap();
993
994 stat_count.store(0, Ordering::SeqCst);
996
997 #[allow(clippy::single_range_in_vec_init)]
999 let ranges = [0u64..5];
1000 let ret = store.get_ranges(&location, &ranges).await.unwrap();
1001 assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1002 assert_eq!(
1003 stat_count.load(Ordering::SeqCst),
1004 0,
1005 "get_ranges should not call stat()"
1006 );
1007
1008 stat_count.store(0, Ordering::SeqCst);
1010
1011 let opts = object_store::GetOptions {
1013 range: Some(object_store::GetRange::Bounded(0..5)),
1014 ..Default::default()
1015 };
1016 let ret = store.get_opts(&location, opts).await.unwrap();
1017 let data = ret.bytes().await.unwrap();
1018 assert_eq!(Bytes::from_static(b"Hello"), data);
1019 assert!(
1020 stat_count.load(Ordering::SeqCst) > 0,
1021 "get_opts should call stat() to get metadata"
1022 );
1023
1024 store.delete(&location).await.unwrap();
1026 }
1027}