Skip to main content

opendal_core/layers/
simulate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::raw::oio::Delete;
23use crate::raw::oio::List;
24use crate::raw::oio::PrefixLister;
25use crate::raw::*;
26use crate::*;
27
28/// Simulate missing capabilities for backends in a configurable way.
29#[derive(Debug, Clone)]
30pub struct SimulateLayer {
31    read_with_suffix: bool,
32    list_recursive: bool,
33    stat_dir: bool,
34    create_dir: bool,
35    delete_recursive: bool,
36}
37
38impl Default for SimulateLayer {
39    fn default() -> Self {
40        Self {
41            read_with_suffix: true,
42            list_recursive: true,
43            stat_dir: true,
44            create_dir: true,
45            delete_recursive: true,
46        }
47    }
48}
49
50impl SimulateLayer {
51    /// Enable or disable suffix read simulation. Default: true.
52    pub fn with_read_with_suffix(mut self, enabled: bool) -> Self {
53        self.read_with_suffix = enabled;
54        self
55    }
56
57    /// Enable or disable recursive list simulation. Default: true.
58    pub fn with_list_recursive(mut self, enabled: bool) -> Self {
59        self.list_recursive = enabled;
60        self
61    }
62
63    /// Enable or disable stat dir simulation. Default: true.
64    pub fn with_stat_dir(mut self, enabled: bool) -> Self {
65        self.stat_dir = enabled;
66        self
67    }
68
69    /// Enable or disable create_dir simulation. Default: true.
70    pub fn with_create_dir(mut self, enabled: bool) -> Self {
71        self.create_dir = enabled;
72        self
73    }
74
75    /// Enable or disable recursive delete simulation. Default: true.
76    pub fn with_delete_recursive(mut self, enabled: bool) -> Self {
77        self.delete_recursive = enabled;
78        self
79    }
80}
81
82impl Layer for SimulateLayer {
83    fn apply_service(&self, srv: Servicer) -> Servicer {
84        Arc::new(self.layer(srv))
85    }
86}
87
88impl SimulateLayer {
89    fn layer(&self, srv: Servicer) -> SimulateService {
90        SimulateService {
91            srv,
92            config: self.clone(),
93        }
94    }
95}
96
97/// Service that applies capability simulation.
98pub struct SimulateService {
99    srv: Servicer,
100    config: SimulateLayer,
101}
102
103impl Debug for SimulateService {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        self.srv.fmt(f)
106    }
107}
108
109impl SimulateService {
110    fn simulate_capability(&self, mut cap: Capability) -> Capability {
111        if self.config.read_with_suffix && cap.read {
112            cap.read_with_suffix = true;
113        }
114        if self.config.create_dir && cap.list && cap.write_can_empty {
115            cap.create_dir = true;
116        }
117        if self.config.delete_recursive && cap.list && cap.delete {
118            cap.delete_with_recursive = true;
119        }
120        cap
121    }
122
123    async fn simulate_create_dir(
124        &self,
125        ctx: &OperationContext,
126        path: &str,
127        args: OpCreateDir,
128    ) -> Result<RpCreateDir> {
129        let capability = self.srv.capability();
130
131        if capability.create_dir || !self.config.create_dir {
132            return self.srv.create_dir(ctx, path, args).await;
133        }
134
135        if capability.write_can_empty && capability.list {
136            let mut w = self.srv.write(ctx, path, OpWrite::default())?;
137            oio::Write::close(&mut w).await?;
138            return Ok(RpCreateDir::default());
139        }
140
141        self.srv.create_dir(ctx, path, args).await
142    }
143
144    async fn simulate_stat(
145        &self,
146        ctx: &OperationContext,
147        path: &str,
148        args: OpStat,
149    ) -> Result<RpStat> {
150        let capability = self.srv.capability();
151
152        if path == "/" {
153            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
154        }
155
156        if path.ends_with('/') {
157            if capability.create_dir {
158                let meta = self
159                    .srv
160                    .stat(ctx, path, args.clone())
161                    .await?
162                    .into_metadata();
163
164                if meta.is_file() {
165                    return Err(Error::new(
166                        ErrorKind::NotFound,
167                        "stat expected a directory, but found a file",
168                    ));
169                }
170
171                return Ok(RpStat::new(meta));
172            }
173
174            if self.config.stat_dir && capability.list_with_recursive {
175                let mut l = self.srv.list(
176                    ctx,
177                    path,
178                    OpList::default().with_recursive(true).with_limit(1),
179                )?;
180
181                return if l.next().await?.is_some() {
182                    Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
183                } else {
184                    Err(Error::new(
185                        ErrorKind::NotFound,
186                        "the directory is not found",
187                    ))
188                };
189            }
190        }
191
192        self.srv.stat(ctx, path, args).await
193    }
194
195    fn simulate_list(
196        &self,
197        ctx: &OperationContext,
198        path: &str,
199        args: OpList,
200    ) -> Result<SimulateLister> {
201        let cap = self.srv.capability();
202
203        let recursive = args.recursive();
204        let forward = args;
205
206        let lister = match (
207            recursive,
208            cap.list_with_recursive,
209            self.config.list_recursive,
210        ) {
211            // Backend supports recursive list, forward directly.
212            (_, true, _) => {
213                let p = self.srv.list(ctx, path, forward)?;
214                SimulateLister::One(p)
215            }
216            // Simulate recursive via flat list when enabled.
217            (true, false, true) => {
218                if path.ends_with('/') {
219                    let p = ServicerFlatLister::new(ctx.clone(), self.srv.clone(), path);
220                    SimulateLister::Two(p)
221                } else {
222                    let parent = get_parent(path);
223                    let p = ServicerFlatLister::new(ctx.clone(), self.srv.clone(), parent);
224                    let p = PrefixLister::new(p, path);
225                    SimulateLister::Four(p)
226                }
227            }
228            // Recursive requested but simulation disabled; rely on backend and propagate errors.
229            (true, false, false) => {
230                let p = self.srv.list(ctx, path, forward)?;
231                SimulateLister::One(p)
232            }
233            // Non-recursive list: keep existing prefix handling semantics.
234            (false, false, _) => {
235                if path.ends_with('/') {
236                    let p = self.srv.list(ctx, path, forward)?;
237                    SimulateLister::One(p)
238                } else {
239                    let parent = get_parent(path);
240                    let p = self.srv.list(ctx, parent, forward)?;
241                    let p = PrefixLister::new(p, path);
242                    SimulateLister::Three(p)
243                }
244            }
245        };
246
247        Ok(lister)
248    }
249
250    async fn simulate_delete_with_recursive(
251        &self,
252        ctx: &OperationContext,
253        path: &str,
254        args: OpDelete,
255        deleter: &mut oio::Deleter,
256    ) -> Result<()> {
257        if !self.capability().delete_with_recursive {
258            return Err(Error::new(
259                ErrorKind::Unsupported,
260                "recursive delete is not supported",
261            ));
262        }
263
264        let non_recursive = args.clone().with_recursive(false);
265
266        let mut lister = self.simulate_list(ctx, path, OpList::new().with_recursive(true))?;
267
268        while let Some(entry) = lister.next().await? {
269            let entry = entry.into_entry();
270            let mut entry_args = non_recursive.clone();
271            if let Some(version) = entry.metadata().version() {
272                entry_args = entry_args.with_version(version);
273            }
274            deleter.delete(entry.path(), entry_args).await?;
275        }
276
277        Ok(())
278    }
279}
280
281impl Service for SimulateService {
282    type Reader = SimulateReader;
283    type Writer = oio::Writer;
284    type Lister = SimulateLister;
285    type Deleter = SimulateDeleter;
286    type Copier = oio::Copier;
287
288    fn info(&self) -> ServiceInfo {
289        self.srv.info()
290    }
291
292    fn capability(&self) -> Capability {
293        self.simulate_capability(self.srv.capability())
294    }
295
296    async fn create_dir(
297        &self,
298        ctx: &OperationContext,
299        path: &str,
300        args: OpCreateDir,
301    ) -> Result<RpCreateDir> {
302        self.simulate_create_dir(ctx, path, args).await
303    }
304
305    fn read(&self, ctx: &OperationContext, path: &str, args: OpRead) -> Result<Self::Reader> {
306        let capability = self.srv.capability();
307        let simulate_read_with_suffix =
308            self.config.read_with_suffix && capability.read && !capability.read_with_suffix;
309        let reader = self.srv.read(ctx, path, args.clone())?;
310        let reader = SimulateReader::new(
311            ctx.clone(),
312            self.srv.clone(),
313            path.to_string(),
314            args,
315            reader,
316            simulate_read_with_suffix,
317        );
318        Ok(reader)
319    }
320
321    fn write(&self, ctx: &OperationContext, path: &str, args: OpWrite) -> Result<Self::Writer> {
322        self.srv.write(ctx, path, args)
323    }
324
325    fn copy(
326        &self,
327        ctx: &OperationContext,
328        from: &str,
329        to: &str,
330        args: OpCopy,
331        opts: OpCopier,
332    ) -> Result<Self::Copier> {
333        self.srv.copy(ctx, from, to, args, opts)
334    }
335
336    async fn rename(
337        &self,
338        ctx: &OperationContext,
339        from: &str,
340        to: &str,
341        args: OpRename,
342    ) -> Result<RpRename> {
343        self.srv.rename(ctx, from, to, args).await
344    }
345
346    async fn stat(&self, ctx: &OperationContext, path: &str, args: OpStat) -> Result<RpStat> {
347        self.simulate_stat(ctx, path, args).await
348    }
349
350    fn delete(&self, ctx: &OperationContext) -> Result<Self::Deleter> {
351        let deleter = self.srv.delete(ctx)?;
352        Ok(SimulateDeleter::new(
353            ctx.clone(),
354            self.srv.clone(),
355            self.config.clone(),
356            deleter,
357        ))
358    }
359
360    fn list(&self, ctx: &OperationContext, path: &str, args: OpList) -> Result<Self::Lister> {
361        self.simulate_list(ctx, path, args)
362    }
363
364    async fn presign(
365        &self,
366        ctx: &OperationContext,
367        path: &str,
368        args: OpPresign,
369    ) -> Result<RpPresign> {
370        self.srv.presign(ctx, path, args).await
371    }
372}
373
374pub type SimulateLister = FourWays<
375    oio::Lister,
376    ServicerFlatLister,
377    PrefixLister<oio::Lister>,
378    PrefixLister<ServicerFlatLister>,
379>;
380
381pub struct SimulateReader {
382    ctx: OperationContext,
383    srv: Servicer,
384    path: String,
385    args: OpRead,
386    inner: oio::Reader,
387    simulate_read_with_suffix: bool,
388}
389
390impl SimulateReader {
391    fn new(
392        ctx: OperationContext,
393        srv: Servicer,
394        path: String,
395        args: OpRead,
396        inner: oio::Reader,
397        simulate_read_with_suffix: bool,
398    ) -> Self {
399        Self {
400            ctx,
401            srv,
402            path,
403            args,
404            inner,
405            simulate_read_with_suffix,
406        }
407    }
408
409    async fn content_length(&self) -> Result<u64> {
410        if let Some(v) = self.args.content_length_hint() {
411            return Ok(v);
412        }
413
414        let mut op = OpStat::new();
415        if let Some(version) = self.args.version() {
416            op = op.with_version(version);
417        }
418
419        Ok(self
420            .srv
421            .stat(&self.ctx, &self.path, op)
422            .await?
423            .into_metadata()
424            .content_length())
425    }
426
427    async fn resolve_range(&self, range: BytesRange) -> Result<BytesRange> {
428        if !self.simulate_read_with_suffix || !range.is_suffix() {
429            return Ok(range);
430        }
431
432        let BytesRange::Suffix { size } = range else {
433            unreachable!("checked by BytesRange::is_suffix")
434        };
435
436        let content_length = self.content_length().await?;
437        let start = content_length.saturating_sub(size);
438        Ok(BytesRange::new(start, Some(content_length - start)))
439    }
440}
441
442impl oio::Read for SimulateReader {
443    async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
444        let range = self.resolve_range(range).await?;
445        self.inner.open(range).await
446    }
447
448    async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
449        let range = self.resolve_range(range).await?;
450        self.inner.read(range).await
451    }
452}
453
454pub struct ServicerFlatLister {
455    ctx: OperationContext,
456    srv: Servicer,
457    next_dir: Option<oio::Entry>,
458    active_lister: Vec<(Option<oio::Entry>, oio::Lister)>,
459}
460
461impl ServicerFlatLister {
462    fn new(ctx: OperationContext, srv: Servicer, path: &str) -> Self {
463        Self {
464            ctx,
465            srv,
466            next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))),
467            active_lister: vec![],
468        }
469    }
470}
471
472impl oio::List for ServicerFlatLister {
473    async fn next(&mut self) -> Result<Option<oio::Entry>> {
474        loop {
475            if let Some(de) = self.next_dir.take() {
476                let mut l = match self.srv.list(&self.ctx, de.path(), OpList::new()) {
477                    Ok(v) => v,
478                    Err(e) if e.kind() == ErrorKind::PermissionDenied => {
479                        log::warn!(
480                            "ServicerFlatLister skipping directory due to permission denied: {}",
481                            de.path()
482                        );
483                        continue;
484                    }
485                    Err(e) if e.kind() == ErrorKind::NotFound => {
486                        log::warn!(
487                            "ServicerFlatLister skipping directory due to not found during listing: {}",
488                            de.path()
489                        );
490                        continue;
491                    }
492                    Err(e) => return Err(e),
493                };
494                let first = loop {
495                    match l.next().await {
496                        Ok(v) => break v,
497                        Err(e) if e.kind() == ErrorKind::NotFound => {
498                            log::warn!(
499                                "ServicerFlatLister skipping entry due to not found during listing: {}",
500                                de.path()
501                            );
502                            continue;
503                        }
504                        Err(e) => return Err(e),
505                    }
506                };
507                if let Some(v) = first {
508                    self.active_lister.push((Some(de.clone()), l));
509
510                    if v.mode().is_dir() {
511                        if v.path() != de.path() {
512                            self.next_dir = Some(v);
513                            continue;
514                        }
515                    } else {
516                        return Ok(Some(v));
517                    }
518                }
519            }
520
521            if matches!(self.active_lister.last(), Some((None, _))) {
522                let _ = self.active_lister.pop();
523                continue;
524            }
525
526            let (de, lister) = match self.active_lister.last_mut() {
527                Some((de, lister)) => (de, lister),
528                None => return Ok(None),
529            };
530
531            match lister.next().await {
532                Err(e) if e.kind() == ErrorKind::NotFound => {
533                    let path = de.as_ref().map(|entry| entry.path()).unwrap_or("<unknown>");
534                    log::warn!(
535                        "ServicerFlatLister skipping entry due to not found during recursive listing: {}",
536                        path
537                    );
538                    continue;
539                }
540                Err(e) => return Err(e),
541                Ok(Some(v)) if v.mode().is_dir() => {
542                    if v.path()
543                        != de
544                            .as_ref()
545                            .expect("de must be present before listing")
546                            .path()
547                    {
548                        self.next_dir = Some(v);
549                        continue;
550                    }
551                }
552                Ok(Some(v)) => return Ok(Some(v)),
553                Ok(None) => match de.take() {
554                    Some(de) => return Ok(Some(de)),
555                    None => {
556                        let _ = self.active_lister.pop();
557                        continue;
558                    }
559                },
560            }
561        }
562    }
563}
564
565/// Deleter wrapper that simulates recursive deletion.
566pub struct SimulateDeleter {
567    ctx: OperationContext,
568    srv: Servicer,
569    config: SimulateLayer,
570    inner: oio::Deleter,
571}
572
573impl SimulateDeleter {
574    pub fn new(
575        ctx: OperationContext,
576        srv: Servicer,
577        config: SimulateLayer,
578        inner: oio::Deleter,
579    ) -> Self {
580        Self {
581            ctx,
582            srv,
583            config,
584            inner,
585        }
586    }
587}
588
589impl oio::Delete for SimulateDeleter {
590    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
591        if args.recursive() {
592            let cap = self.srv.capability();
593
594            if cap.delete_with_recursive {
595                return self.inner.delete(path, args).await;
596            }
597
598            if self.config.delete_recursive {
599                let service = SimulateService {
600                    srv: self.srv.clone(),
601                    config: self.config.clone(),
602                };
603                return service
604                    .simulate_delete_with_recursive(&self.ctx, path, args, &mut self.inner)
605                    .await;
606            }
607        }
608
609        self.inner.delete(path, args).await
610    }
611
612    async fn close(&mut self) -> Result<()> {
613        self.inner.close().await
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use std::sync::Mutex;
620
621    use super::*;
622
623    #[derive(Debug)]
624    struct MockService {
625        capability: Capability,
626    }
627
628    impl Service for MockService {
629        type Reader = ();
630        type Writer = ();
631        type Lister = ();
632        type Deleter = ();
633        type Copier = ();
634
635        fn info(&self) -> ServiceInfo {
636            ServiceInfo::with_scheme("mock")
637        }
638
639        fn capability(&self) -> Capability {
640            self.capability
641        }
642
643        async fn create_dir(
644            &self,
645            _: &OperationContext,
646            _: &str,
647            _: OpCreateDir,
648        ) -> Result<RpCreateDir> {
649            Err(Error::new(
650                ErrorKind::Unsupported,
651                "operation is not supported",
652            ))
653        }
654
655        async fn stat(&self, _: &OperationContext, _: &str, _: OpStat) -> Result<RpStat> {
656            Err(Error::new(
657                ErrorKind::Unsupported,
658                "operation is not supported",
659            ))
660        }
661
662        fn read(&self, _ctx: &OperationContext, _: &str, _: OpRead) -> Result<Self::Reader> {
663            Err(Error::new(
664                ErrorKind::Unsupported,
665                "operation is not supported",
666            ))
667        }
668
669        fn write(&self, _ctx: &OperationContext, _: &str, _: OpWrite) -> Result<Self::Writer> {
670            Err(Error::new(
671                ErrorKind::Unsupported,
672                "operation is not supported",
673            ))
674        }
675
676        fn delete(&self, _ctx: &OperationContext) -> Result<Self::Deleter> {
677            Err(Error::new(
678                ErrorKind::Unsupported,
679                "operation is not supported",
680            ))
681        }
682
683        fn list(&self, _ctx: &OperationContext, _: &str, _: OpList) -> Result<Self::Lister> {
684            Err(Error::new(
685                ErrorKind::Unsupported,
686                "operation is not supported",
687            ))
688        }
689
690        fn copy(
691            &self,
692            _: &OperationContext,
693            _: &str,
694            _: &str,
695            _: OpCopy,
696            _: OpCopier,
697        ) -> Result<Self::Copier> {
698            Err(Error::new(
699                ErrorKind::Unsupported,
700                "operation is not supported",
701            ))
702        }
703
704        async fn rename(
705            &self,
706            _: &OperationContext,
707            _: &str,
708            _: &str,
709            _: OpRename,
710        ) -> Result<RpRename> {
711            Err(Error::new(
712                ErrorKind::Unsupported,
713                "operation is not supported",
714            ))
715        }
716
717        async fn presign(&self, _: &OperationContext, _: &str, _: OpPresign) -> Result<RpPresign> {
718            Err(Error::new(
719                ErrorKind::Unsupported,
720                "operation is not supported",
721            ))
722        }
723    }
724
725    struct MockReader {
726        observed_range: Arc<Mutex<Option<BytesRange>>>,
727    }
728
729    impl oio::Read for MockReader {
730        async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
731            *self.observed_range.lock().expect("mutex must not poison") = Some(range);
732            Ok((
733                RpRead::new(Metadata::new(EntryMode::FILE).with_content_length(0)),
734                Box::new(Buffer::new()),
735            ))
736        }
737
738        async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
739            *self.observed_range.lock().expect("mutex must not poison") = Some(range);
740            Ok((
741                RpRead::new(Metadata::new(EntryMode::FILE).with_content_length(0)),
742                Buffer::new(),
743            ))
744        }
745    }
746
747    #[test]
748    fn simulate_layer_exposes_read_with_suffix() {
749        let capability = Capability {
750            read: true,
751            read_with_suffix: false,
752            ..Default::default()
753        };
754        let srv = Arc::new(MockService { capability }) as Servicer;
755
756        let srv = SimulateLayer::default().apply_service(srv);
757
758        assert!(srv.capability().read_with_suffix);
759    }
760
761    #[test]
762    fn simulate_layer_can_disable_read_with_suffix() {
763        let capability = Capability {
764            read: true,
765            read_with_suffix: false,
766            ..Default::default()
767        };
768        let srv = Arc::new(MockService { capability }) as Servicer;
769
770        let srv = SimulateLayer::default()
771            .with_read_with_suffix(false)
772            .apply_service(srv);
773
774        assert!(!srv.capability().read_with_suffix);
775    }
776
777    #[tokio::test]
778    async fn simulate_reader_uses_content_length_hint_for_suffix() -> Result<()> {
779        let observed_range = Arc::new(Mutex::new(None));
780        let (_, args, _) = options::ReadOptions {
781            content_length_hint: Some(42),
782            ..Default::default()
783        }
784        .into();
785        let reader = SimulateReader::new(
786            OperationContext::new(),
787            Arc::new(MockService {
788                capability: Capability::default(),
789            }),
790            "test".to_string(),
791            args,
792            Box::new(MockReader {
793                observed_range: observed_range.clone(),
794            }),
795            true,
796        );
797
798        oio::Read::read(&reader, BytesRange::suffix(10)).await?;
799
800        assert_eq!(
801            *observed_range.lock().expect("mutex must not poison"),
802            Some(BytesRange::new(32, Some(10)))
803        );
804
805        Ok(())
806    }
807}