object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::CopyMode as ObjectStoreCopyMode;
35use object_store::CopyOptions as ObjectStoreCopyOptions;
36use object_store::ListResult;
37use object_store::MultipartUpload;
38use object_store::ObjectMeta;
39use object_store::ObjectStore;
40use object_store::PutMultipartOptions;
41use object_store::PutOptions;
42use object_store::PutPayload;
43use object_store::PutResult;
44use object_store::path::Path;
45use object_store::{GetOptions, UploadPart};
46use object_store::{GetRange, GetResultPayload};
47use object_store::{GetResult, PutMode};
48use opendal::Buffer;
49use opendal::Writer;
50use opendal::options::CopyOptions;
51use opendal::raw::percent_decode_path;
52use opendal::{Operator, OperatorInfo};
53use std::collections::HashMap;
54
55/// OpendalStore implements ObjectStore trait by using opendal.
56///
57/// This allows users to use opendal as an object store without extra cost.
58///
59/// Visit [`opendal::services`] for more information about supported services.
60///
61/// ```no_run
62/// use std::sync::Arc;
63///
64/// use bytes::Bytes;
65/// use object_store::path::Path;
66/// use object_store::ObjectStore;
67/// use object_store::ObjectStoreExt;
68/// use object_store_opendal::OpendalStore;
69/// use opendal::services::S3;
70/// use opendal::{Builder, Operator};
71///
72/// #[tokio::main]
73/// async fn main() {
74///    let builder = S3::default()
75///     .access_key_id("my_access_key")
76///     .secret_access_key("my_secret_key")
77///     .endpoint("my_endpoint")
78///     .region("my_region");
79///
80///     // Create a new operator
81///     let operator = Operator::new(builder).unwrap().finish();
82///
83///     // Create a new object store
84///     let object_store = Arc::new(OpendalStore::new(operator));
85///
86///     let path = Path::from("data/nested/test.txt");
87///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
88///
89///     object_store.put(&path, bytes.clone().into()).await.unwrap();
90///
91///     let content = object_store
92///         .get(&path)
93///         .await
94///         .unwrap()
95///         .bytes()
96///         .await
97///         .unwrap();
98///
99///     assert_eq!(content, bytes);
100/// }
101/// ```
102#[derive(Clone)]
103pub struct OpendalStore {
104    info: Arc<OperatorInfo>,
105    inner: Operator,
106}
107
108impl OpendalStore {
109    /// Create OpendalStore by given Operator.
110    pub fn new(op: Operator) -> Self {
111        Self {
112            info: op.info().into(),
113            inner: op,
114        }
115    }
116
117    /// Get the Operator info.
118    pub fn info(&self) -> &OperatorInfo {
119        self.info.as_ref()
120    }
121
122    /// Copy a file from one location to another
123    async fn copy_request(
124        &self,
125        from: &Path,
126        to: &Path,
127        if_not_exists: bool,
128    ) -> object_store::Result<()> {
129        let mut copy_options = CopyOptions::default();
130        if if_not_exists {
131            copy_options.if_not_exists = true;
132        }
133
134        // Perform the copy operation
135        self.inner
136            .copy_options(
137                &percent_decode_path(from.as_ref()),
138                &percent_decode_path(to.as_ref()),
139                copy_options,
140            )
141            .into_send()
142            .await
143            .map_err(|err| {
144                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
145                    object_store::Error::AlreadyExists {
146                        path: to.to_string(),
147                        source: Box::new(err),
148                    }
149                } else {
150                    format_object_store_error(err, from.as_ref())
151                }
152            })?;
153
154        Ok(())
155    }
156}
157
158impl Debug for OpendalStore {
159    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
160        f.debug_struct("OpendalStore")
161            .field("scheme", &self.info.scheme())
162            .field("name", &self.info.name())
163            .field("root", &self.info.root())
164            .field("capability", &self.info.full_capability())
165            .finish()
166    }
167}
168
169impl Display for OpendalStore {
170    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
171        let info = self.inner.info();
172        write!(
173            f,
174            "Opendal({}, bucket={}, root={})",
175            info.scheme(),
176            info.name(),
177            info.root()
178        )
179    }
180}
181
182impl From<Operator> for OpendalStore {
183    fn from(value: Operator) -> Self {
184        Self::new(value)
185    }
186}
187
188#[async_trait]
189impl ObjectStore for OpendalStore {
190    async fn put_opts(
191        &self,
192        location: &Path,
193        bytes: PutPayload,
194        opts: PutOptions,
195    ) -> object_store::Result<PutResult> {
196        let decoded_location = percent_decode_path(location.as_ref());
197        let mut future_write = self
198            .inner
199            .write_with(&decoded_location, Buffer::from_iter(bytes));
200        let opts_mode = opts.mode.clone();
201        match opts.mode {
202            PutMode::Overwrite => {}
203            PutMode::Create => {
204                future_write = future_write.if_not_exists(true);
205            }
206            PutMode::Update(update_version) => {
207                let Some(etag) = update_version.e_tag else {
208                    Err(object_store::Error::NotSupported {
209                        source: Box::new(opendal::Error::new(
210                            opendal::ErrorKind::Unsupported,
211                            "etag is required for conditional put",
212                        )),
213                    })?
214                };
215                future_write = future_write.if_match(etag.as_str());
216            }
217        }
218        let rp = future_write.into_send().await.map_err(|err| {
219            match format_object_store_error(err, location.as_ref()) {
220                object_store::Error::Precondition { path, source }
221                    if opts_mode == PutMode::Create =>
222                {
223                    object_store::Error::AlreadyExists { path, source }
224                }
225                e => e,
226            }
227        })?;
228
229        let e_tag = rp.etag().map(|s| s.to_string());
230        let version = rp.version().map(|s| s.to_string());
231
232        Ok(PutResult { e_tag, version })
233    }
234
235    async fn put_multipart_opts(
236        &self,
237        location: &Path,
238        opts: PutMultipartOptions,
239    ) -> object_store::Result<Box<dyn MultipartUpload>> {
240        const DEFAULT_CONCURRENT: usize = 8;
241
242        let mut options = opendal::options::WriteOptions {
243            concurrent: DEFAULT_CONCURRENT,
244            ..Default::default()
245        };
246
247        // Collect user metadata separately to handle multiple entries
248        let mut user_metadata = HashMap::new();
249
250        // Handle attributes if provided
251        for (key, value) in opts.attributes.iter() {
252            match key {
253                object_store::Attribute::CacheControl => {
254                    options.cache_control = Some(value.to_string());
255                }
256                object_store::Attribute::ContentDisposition => {
257                    options.content_disposition = Some(value.to_string());
258                }
259                object_store::Attribute::ContentEncoding => {
260                    options.content_encoding = Some(value.to_string());
261                }
262                object_store::Attribute::ContentLanguage => {
263                    // no support
264                    continue;
265                }
266                object_store::Attribute::ContentType => {
267                    options.content_type = Some(value.to_string());
268                }
269                object_store::Attribute::Metadata(k) => {
270                    user_metadata.insert(k.to_string(), value.to_string());
271                }
272                _ => {}
273            }
274        }
275
276        // Apply user metadata if any entries were collected
277        if !user_metadata.is_empty() {
278            options.user_metadata = Some(user_metadata);
279        }
280
281        let decoded_location = percent_decode_path(location.as_ref());
282        let writer = self
283            .inner
284            .writer_options(&decoded_location, options)
285            .into_send()
286            .await
287            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
288        let upload = OpendalMultipartUpload::new(writer, location.clone());
289
290        Ok(Box::new(upload))
291    }
292
293    async fn get_opts(
294        &self,
295        location: &Path,
296        options: GetOptions,
297    ) -> object_store::Result<GetResult> {
298        let raw_location = percent_decode_path(location.as_ref());
299        let meta = {
300            let mut s = self.inner.stat_with(&raw_location);
301            if let Some(version) = &options.version {
302                s = s.version(version.as_str())
303            }
304            if let Some(if_match) = &options.if_match {
305                s = s.if_match(if_match.as_str());
306            }
307            if let Some(if_none_match) = &options.if_none_match {
308                s = s.if_none_match(if_none_match.as_str());
309            }
310            if let Some(if_modified_since) =
311                options.if_modified_since.and_then(datetime_to_timestamp)
312            {
313                s = s.if_modified_since(if_modified_since);
314            }
315            if let Some(if_unmodified_since) =
316                options.if_unmodified_since.and_then(datetime_to_timestamp)
317            {
318                s = s.if_unmodified_since(if_unmodified_since);
319            }
320            s.into_send()
321                .await
322                .map_err(|err| format_object_store_error(err, location.as_ref()))?
323        };
324
325        // Convert user defined metadata from OpenDAL to object_store attributes
326        let mut attributes = object_store::Attributes::new();
327        if let Some(user_meta) = meta.user_metadata() {
328            for (key, value) in user_meta {
329                attributes.insert(
330                    object_store::Attribute::Metadata(key.clone().into()),
331                    value.clone().into(),
332                );
333            }
334        }
335
336        let meta = ObjectMeta {
337            location: location.clone(),
338            last_modified: meta
339                .last_modified()
340                .and_then(timestamp_to_datetime)
341                .unwrap_or_default(),
342            size: meta.content_length(),
343            e_tag: meta.etag().map(|x| x.to_string()),
344            version: meta.version().map(|x| x.to_string()),
345        };
346
347        if options.head {
348            return Ok(GetResult {
349                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
350                range: 0..0,
351                meta,
352                attributes,
353            });
354        }
355
356        let reader = {
357            let mut r = self.inner.reader_with(raw_location.as_ref());
358            if let Some(version) = options.version {
359                r = r.version(version.as_str());
360            }
361            if let Some(if_match) = options.if_match {
362                r = r.if_match(if_match.as_str());
363            }
364            if let Some(if_none_match) = options.if_none_match {
365                r = r.if_none_match(if_none_match.as_str());
366            }
367            if let Some(if_modified_since) =
368                options.if_modified_since.and_then(datetime_to_timestamp)
369            {
370                r = r.if_modified_since(if_modified_since);
371            }
372            if let Some(if_unmodified_since) =
373                options.if_unmodified_since.and_then(datetime_to_timestamp)
374            {
375                r = r.if_unmodified_since(if_unmodified_since);
376            }
377            r.into_send()
378                .await
379                .map_err(|err| format_object_store_error(err, location.as_ref()))?
380        };
381
382        let read_range = match options.range {
383            Some(GetRange::Bounded(r)) => {
384                if r.start >= r.end || r.start >= meta.size {
385                    0..0
386                } else {
387                    let end = r.end.min(meta.size);
388                    r.start..end
389                }
390            }
391            Some(GetRange::Offset(r)) => {
392                if r < meta.size {
393                    r..meta.size
394                } else {
395                    0..0
396                }
397            }
398            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
399            _ => 0..meta.size,
400        };
401
402        let stream = reader
403            .into_bytes_stream(read_range.start..read_range.end)
404            .into_send()
405            .await
406            .map_err(|err| format_object_store_error(err, location.as_ref()))?
407            .into_send()
408            .map_err(|err: io::Error| object_store::Error::Generic {
409                store: "IoError",
410                source: Box::new(err),
411            });
412
413        Ok(GetResult {
414            payload: GetResultPayload::Stream(Box::pin(stream)),
415            range: read_range.start..read_range.end,
416            meta,
417            attributes,
418        })
419    }
420
421    async fn get_ranges(
422        &self,
423        location: &Path,
424        ranges: &[Range<u64>],
425    ) -> object_store::Result<Vec<Bytes>> {
426        let raw_location = percent_decode_path(location.as_ref());
427        let reader = self
428            .inner
429            .reader_with(&raw_location)
430            .into_send()
431            .await
432            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
433
434        let mut results = Vec::with_capacity(ranges.len());
435        for range in ranges {
436            let data = reader
437                .read(range.start..range.end)
438                .into_send()
439                .await
440                .map(|buf| buf.to_bytes())
441                .map_err(|err| format_object_store_error(err, location.as_ref()))?;
442            results.push(data);
443        }
444        Ok(results)
445    }
446
447    fn delete_stream(
448        &self,
449        locations: BoxStream<'static, object_store::Result<Path>>,
450    ) -> BoxStream<'static, object_store::Result<Path>> {
451        let this = self.clone();
452        locations
453            .and_then(move |location| {
454                let this = this.clone();
455                async move {
456                    let decoded = percent_decode_path(location.as_ref());
457                    this.inner
458                        .delete(&decoded)
459                        .into_send()
460                        .await
461                        .map_err(|err| format_object_store_error(err, location.as_ref()))?;
462                    Ok(location)
463                }
464                .into_send()
465            })
466            .boxed()
467    }
468
469    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
470        // object_store `Path` always removes trailing slash
471        // need to add it back
472        let path = prefix.map_or("".into(), |x| {
473            format!("{}/", percent_decode_path(x.as_ref()))
474        });
475
476        let this = self.clone();
477        let fut = async move {
478            let stream = this
479                .inner
480                .lister_with(&path)
481                .recursive(true)
482                .await
483                .map_err(|err| format_object_store_error(err, &path))?;
484
485            let stream = stream.then(|res| async {
486                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
487                let meta = entry.metadata();
488
489                Ok(format_object_meta(entry.path(), meta))
490            });
491            Ok::<_, object_store::Error>(stream)
492        };
493
494        fut.into_stream().try_flatten().into_send().boxed()
495    }
496
497    fn list_with_offset(
498        &self,
499        prefix: Option<&Path>,
500        offset: &Path,
501    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
502        let path = prefix.map_or("".into(), |x| {
503            format!("{}/", percent_decode_path(x.as_ref()))
504        });
505        let offset = offset.clone();
506
507        // clone self for 'static lifetime
508        // clone self is cheap
509        let this = self.clone();
510
511        let fut = async move {
512            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
513            let mut fut = this.inner.lister_with(&path).recursive(true);
514
515            // Use native start_after support if possible.
516            if list_with_start_after {
517                fut = fut.start_after(offset.as_ref());
518            }
519
520            let lister = fut
521                .await
522                .map_err(|err| format_object_store_error(err, &path))?
523                .then(move |entry| {
524                    let path = path.clone();
525                    let this = this.clone();
526                    async move {
527                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
528                        let (path, metadata) = entry.into_parts();
529
530                        // If it's a dir or last_modified is present, we can use it directly.
531                        if metadata.is_dir() || metadata.last_modified().is_some() {
532                            let object_meta = format_object_meta(&path, &metadata);
533                            return Ok(object_meta);
534                        }
535
536                        let metadata = this
537                            .inner
538                            .stat(&path)
539                            .await
540                            .map_err(|err| format_object_store_error(err, &path))?;
541                        let object_meta = format_object_meta(&path, &metadata);
542                        Ok::<_, object_store::Error>(object_meta)
543                    }
544                })
545                .into_send()
546                .boxed();
547
548            let stream = if list_with_start_after {
549                lister
550            } else {
551                lister
552                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
553                    .into_send()
554                    .boxed()
555            };
556
557            Ok::<_, object_store::Error>(stream)
558        };
559
560        fut.into_stream().into_send().try_flatten().boxed()
561    }
562
563    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
564        let path = prefix.map_or("".into(), |x| {
565            format!("{}/", percent_decode_path(x.as_ref()))
566        });
567        let mut stream = self
568            .inner
569            .lister_with(&path)
570            .into_future()
571            .into_send()
572            .await
573            .map_err(|err| format_object_store_error(err, &path))?
574            .into_send();
575
576        let mut common_prefixes = Vec::new();
577        let mut objects = Vec::new();
578
579        while let Some(res) = stream.next().into_send().await {
580            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
581            let meta = entry.metadata();
582
583            if meta.is_dir() {
584                common_prefixes.push(entry.path().into());
585            } else if meta.last_modified().is_some() {
586                objects.push(format_object_meta(entry.path(), meta));
587            } else {
588                let meta = self
589                    .inner
590                    .stat(entry.path())
591                    .into_send()
592                    .await
593                    .map_err(|err| format_object_store_error(err, entry.path()))?;
594                objects.push(format_object_meta(entry.path(), &meta));
595            }
596        }
597
598        Ok(ListResult {
599            common_prefixes,
600            objects,
601        })
602    }
603
604    async fn copy_opts(
605        &self,
606        from: &Path,
607        to: &Path,
608        options: ObjectStoreCopyOptions,
609    ) -> object_store::Result<()> {
610        let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
611        self.copy_request(from, to, if_not_exists).await
612    }
613}
614
615/// `MultipartUpload`'s impl based on `Writer` in opendal
616///
617/// # Notes
618///
619/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
620/// implementation do. Instead, we just write the part and notify the next task to be written.
621///
622/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
623struct OpendalMultipartUpload {
624    writer: Arc<Mutex<Writer>>,
625    location: Path,
626    next_notify: oneshot::Receiver<()>,
627}
628
629impl OpendalMultipartUpload {
630    fn new(writer: Writer, location: Path) -> Self {
631        // an immediately dropped sender for the first part to write without waiting
632        let (_, rx) = oneshot::channel();
633
634        Self {
635            writer: Arc::new(Mutex::new(writer)),
636            location,
637            next_notify: rx,
638        }
639    }
640}
641
642#[async_trait]
643impl MultipartUpload for OpendalMultipartUpload {
644    fn put_part(&mut self, data: PutPayload) -> UploadPart {
645        let writer = self.writer.clone();
646        let location = self.location.clone();
647
648        // Generate next notify which will be notified after the current part is written.
649        let (tx, rx) = oneshot::channel();
650        // Fetch the notify for current part to wait for it to be written.
651        let last_rx = std::mem::replace(&mut self.next_notify, rx);
652
653        async move {
654            // Wait for the previous part to be written
655            let _ = last_rx.await;
656
657            let mut writer = writer.lock().await;
658            let result = writer
659                .write(Buffer::from_iter(data.into_iter()))
660                .await
661                .map_err(|err| format_object_store_error(err, location.as_ref()));
662
663            // Notify the next part to be written
664            drop(tx);
665
666            result
667        }
668        .into_send()
669        .boxed()
670    }
671
672    async fn complete(&mut self) -> object_store::Result<PutResult> {
673        let mut writer = self.writer.lock().await;
674        let metadata = writer
675            .close()
676            .into_send()
677            .await
678            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
679
680        let e_tag = metadata.etag().map(|s| s.to_string());
681        let version = metadata.version().map(|s| s.to_string());
682
683        Ok(PutResult { e_tag, version })
684    }
685
686    async fn abort(&mut self) -> object_store::Result<()> {
687        let mut writer = self.writer.lock().await;
688        writer
689            .abort()
690            .into_send()
691            .await
692            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
693    }
694}
695
696impl Debug for OpendalMultipartUpload {
697    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
698        f.debug_struct("OpendalMultipartUpload")
699            .field("location", &self.location)
700            .finish()
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use bytes::Bytes;
707    use object_store::path::Path;
708    use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
709    use opendal::services;
710    use rand::prelude::*;
711    use std::sync::Arc;
712
713    use super::*;
714
715    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
716        let op = Operator::new(services::Memory::default()).unwrap().finish();
717        let object_store = Arc::new(OpendalStore::new(op));
718
719        let path: Path = "data/test.txt".into();
720        let bytes = Bytes::from_static(b"hello, world!");
721        object_store.put(&path, bytes.into()).await.unwrap();
722
723        let path: Path = "data/nested/test.txt".into();
724        let bytes = Bytes::from_static(b"hello, world! I am nested.");
725        object_store.put(&path, bytes.into()).await.unwrap();
726
727        object_store
728    }
729
730    #[tokio::test]
731    async fn test_basic() {
732        let op = Operator::new(services::Memory::default()).unwrap().finish();
733        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
734
735        // Retrieve a specific file
736        let path: Path = "data/test.txt".into();
737
738        let bytes = Bytes::from_static(b"hello, world!");
739        object_store.put(&path, bytes.clone().into()).await.unwrap();
740
741        let meta = object_store.head(&path).await.unwrap();
742
743        assert_eq!(meta.size, 13);
744
745        assert_eq!(
746            object_store
747                .get(&path)
748                .await
749                .unwrap()
750                .bytes()
751                .await
752                .unwrap(),
753            bytes
754        );
755    }
756
757    #[tokio::test]
758    async fn test_put_multipart() {
759        let op = Operator::new(services::Memory::default()).unwrap().finish();
760        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
761
762        let mut rng = thread_rng();
763
764        // Case complete
765        let path: Path = "data/test_complete.txt".into();
766        let upload = object_store.put_multipart(&path).await.unwrap();
767
768        let mut write = WriteMultipart::new(upload);
769
770        let mut all_bytes = vec![];
771        let round = rng.gen_range(1..=1024);
772        for _ in 0..round {
773            let size = rng.gen_range(1..=1024);
774            let mut bytes = vec![0; size];
775            rng.fill_bytes(&mut bytes);
776
777            all_bytes.extend_from_slice(&bytes);
778            write.put(bytes.into());
779        }
780
781        let _ = write.finish().await.unwrap();
782
783        let meta = object_store.head(&path).await.unwrap();
784
785        assert_eq!(meta.size, all_bytes.len() as u64);
786
787        assert_eq!(
788            object_store
789                .get(&path)
790                .await
791                .unwrap()
792                .bytes()
793                .await
794                .unwrap(),
795            Bytes::from(all_bytes)
796        );
797
798        // Case abort
799        let path: Path = "data/test_abort.txt".into();
800        let mut upload = object_store.put_multipart(&path).await.unwrap();
801        upload.put_part(vec![1; 1024].into()).await.unwrap();
802        upload.abort().await.unwrap();
803
804        let res = object_store.head(&path).await;
805        let err = res.unwrap_err();
806
807        assert!(matches!(err, object_store::Error::NotFound { .. }))
808    }
809
810    #[tokio::test]
811    async fn test_list() {
812        let object_store = create_test_object_store().await;
813        let path: Path = "data/".into();
814        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
815        assert_eq!(results.len(), 2);
816        let mut locations = results
817            .iter()
818            .map(|x| x.as_ref().unwrap().location.as_ref())
819            .collect::<Vec<_>>();
820
821        let expected_files = vec![
822            (
823                "data/nested/test.txt",
824                Bytes::from_static(b"hello, world! I am nested."),
825            ),
826            ("data/test.txt", Bytes::from_static(b"hello, world!")),
827        ];
828
829        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
830
831        locations.sort();
832        assert_eq!(locations, expected_locations);
833
834        for (location, bytes) in expected_files {
835            let path: Path = location.into();
836            assert_eq!(
837                object_store
838                    .get(&path)
839                    .await
840                    .unwrap()
841                    .bytes()
842                    .await
843                    .unwrap(),
844                bytes
845            );
846        }
847    }
848
849    #[tokio::test]
850    async fn test_list_with_delimiter() {
851        let object_store = create_test_object_store().await;
852        let path: Path = "data/".into();
853        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
854        assert_eq!(result.objects.len(), 1);
855        assert_eq!(result.common_prefixes.len(), 1);
856        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
857        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
858    }
859
860    #[tokio::test]
861    async fn test_list_with_offset() {
862        let object_store = create_test_object_store().await;
863        let path: Path = "data/".into();
864        let offset: Path = "data/nested/test.txt".into();
865        let result = object_store
866            .list_with_offset(Some(&path), &offset)
867            .collect::<Vec<_>>()
868            .await;
869        assert_eq!(result.len(), 1);
870        assert_eq!(
871            result[0].as_ref().unwrap().location.as_ref(),
872            "data/test.txt"
873        );
874    }
875
876    /// Custom layer that counts stat operations for testing
877    mod stat_counter {
878        use super::*;
879        use std::sync::atomic::{AtomicUsize, Ordering};
880
881        #[derive(Debug, Clone)]
882        pub struct StatCounterLayer {
883            count: Arc<AtomicUsize>,
884        }
885
886        impl StatCounterLayer {
887            pub fn new(count: Arc<AtomicUsize>) -> Self {
888                Self { count }
889            }
890        }
891
892        impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
893            type LayeredAccess = StatCounterAccessor<A>;
894
895            fn layer(&self, inner: A) -> Self::LayeredAccess {
896                StatCounterAccessor {
897                    inner,
898                    count: self.count.clone(),
899                }
900            }
901        }
902
903        #[derive(Debug, Clone)]
904        pub struct StatCounterAccessor<A> {
905            inner: A,
906            count: Arc<AtomicUsize>,
907        }
908
909        impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
910            type Inner = A;
911            type Reader = A::Reader;
912            type Writer = A::Writer;
913            type Lister = A::Lister;
914            type Deleter = A::Deleter;
915
916            fn inner(&self) -> &Self::Inner {
917                &self.inner
918            }
919
920            async fn stat(
921                &self,
922                path: &str,
923                args: opendal::raw::OpStat,
924            ) -> opendal::Result<opendal::raw::RpStat> {
925                self.count.fetch_add(1, Ordering::SeqCst);
926                self.inner.stat(path, args).await
927            }
928
929            async fn read(
930                &self,
931                path: &str,
932                args: opendal::raw::OpRead,
933            ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
934                self.inner.read(path, args).await
935            }
936
937            async fn write(
938                &self,
939                path: &str,
940                args: opendal::raw::OpWrite,
941            ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
942                self.inner.write(path, args).await
943            }
944
945            async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
946                self.inner.delete().await
947            }
948
949            async fn list(
950                &self,
951                path: &str,
952                args: opendal::raw::OpList,
953            ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
954                self.inner.list(path, args).await
955            }
956
957            async fn copy(
958                &self,
959                from: &str,
960                to: &str,
961                args: opendal::raw::OpCopy,
962            ) -> opendal::Result<opendal::raw::RpCopy> {
963                self.inner.copy(from, to, args).await
964            }
965
966            async fn rename(
967                &self,
968                from: &str,
969                to: &str,
970                args: opendal::raw::OpRename,
971            ) -> opendal::Result<opendal::raw::RpRename> {
972                self.inner.rename(from, to, args).await
973            }
974        }
975    }
976
977    #[tokio::test]
978    async fn test_get_range_no_stat() {
979        use std::sync::atomic::{AtomicUsize, Ordering};
980
981        // Create a stat counter and operator with tracking layer
982        let stat_count = Arc::new(AtomicUsize::new(0));
983        let op = Operator::new(opendal::services::Memory::default())
984            .unwrap()
985            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
986            .finish();
987        let store = OpendalStore::new(op);
988
989        // Create a test file
990        let location = "test_get_range.txt".into();
991        let value = Bytes::from_static(b"Hello, world!");
992        store.put(&location, value.clone().into()).await.unwrap();
993
994        // Reset counter after put
995        stat_count.store(0, Ordering::SeqCst);
996
997        // Test 1: get_ranges should NOT call stat()
998        #[allow(clippy::single_range_in_vec_init)]
999        let ranges = [0u64..5];
1000        let ret = store.get_ranges(&location, &ranges).await.unwrap();
1001        assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1002        assert_eq!(
1003            stat_count.load(Ordering::SeqCst),
1004            0,
1005            "get_ranges should not call stat()"
1006        );
1007
1008        // Reset counter
1009        stat_count.store(0, Ordering::SeqCst);
1010
1011        // Test 2: get_opts SHOULD call stat() to get metadata
1012        let opts = object_store::GetOptions {
1013            range: Some(object_store::GetRange::Bounded(0..5)),
1014            ..Default::default()
1015        };
1016        let ret = store.get_opts(&location, opts).await.unwrap();
1017        let data = ret.bytes().await.unwrap();
1018        assert_eq!(Bytes::from_static(b"Hello"), data);
1019        assert!(
1020            stat_count.load(Ordering::SeqCst) > 0,
1021            "get_opts should call stat() to get metadata"
1022        );
1023
1024        // Cleanup
1025        store.delete(&location).await.unwrap();
1026    }
1027}