1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::raw::oio::Delete;
23use crate::raw::oio::List;
24use crate::raw::oio::PrefixLister;
25use crate::raw::*;
26use crate::*;
27
28#[derive(Debug, Clone)]
30pub struct SimulateLayer {
31 read_with_suffix: bool,
32 list_recursive: bool,
33 stat_dir: bool,
34 create_dir: bool,
35 delete_recursive: bool,
36}
37
38impl Default for SimulateLayer {
39 fn default() -> Self {
40 Self {
41 read_with_suffix: true,
42 list_recursive: true,
43 stat_dir: true,
44 create_dir: true,
45 delete_recursive: true,
46 }
47 }
48}
49
50impl SimulateLayer {
51 pub fn with_read_with_suffix(mut self, enabled: bool) -> Self {
53 self.read_with_suffix = enabled;
54 self
55 }
56
57 pub fn with_list_recursive(mut self, enabled: bool) -> Self {
59 self.list_recursive = enabled;
60 self
61 }
62
63 pub fn with_stat_dir(mut self, enabled: bool) -> Self {
65 self.stat_dir = enabled;
66 self
67 }
68
69 pub fn with_create_dir(mut self, enabled: bool) -> Self {
71 self.create_dir = enabled;
72 self
73 }
74
75 pub fn with_delete_recursive(mut self, enabled: bool) -> Self {
77 self.delete_recursive = enabled;
78 self
79 }
80}
81
82impl Layer for SimulateLayer {
83 fn apply_service(&self, srv: Servicer) -> Servicer {
84 Arc::new(self.layer(srv))
85 }
86}
87
88impl SimulateLayer {
89 fn layer(&self, srv: Servicer) -> SimulateService {
90 SimulateService {
91 srv,
92 config: self.clone(),
93 }
94 }
95}
96
97pub struct SimulateService {
99 srv: Servicer,
100 config: SimulateLayer,
101}
102
103impl Debug for SimulateService {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 self.srv.fmt(f)
106 }
107}
108
109impl SimulateService {
110 fn simulate_capability(&self, mut cap: Capability) -> Capability {
111 if self.config.read_with_suffix && cap.read {
112 cap.read_with_suffix = true;
113 }
114 if self.config.create_dir && cap.list && cap.write_can_empty {
115 cap.create_dir = true;
116 }
117 if self.config.delete_recursive && cap.list && cap.delete {
118 cap.delete_with_recursive = true;
119 }
120 cap
121 }
122
123 async fn simulate_create_dir(
124 &self,
125 ctx: &OperationContext,
126 path: &str,
127 args: OpCreateDir,
128 ) -> Result<RpCreateDir> {
129 let capability = self.srv.capability();
130
131 if capability.create_dir || !self.config.create_dir {
132 return self.srv.create_dir(ctx, path, args).await;
133 }
134
135 if capability.write_can_empty && capability.list {
136 let mut w = self.srv.write(ctx, path, OpWrite::default())?;
137 oio::Write::close(&mut w).await?;
138 return Ok(RpCreateDir::default());
139 }
140
141 self.srv.create_dir(ctx, path, args).await
142 }
143
144 async fn simulate_stat(
145 &self,
146 ctx: &OperationContext,
147 path: &str,
148 args: OpStat,
149 ) -> Result<RpStat> {
150 let capability = self.srv.capability();
151
152 if path == "/" {
153 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
154 }
155
156 if path.ends_with('/') {
157 if capability.create_dir {
158 let meta = self
159 .srv
160 .stat(ctx, path, args.clone())
161 .await?
162 .into_metadata();
163
164 if meta.is_file() {
165 return Err(Error::new(
166 ErrorKind::NotFound,
167 "stat expected a directory, but found a file",
168 ));
169 }
170
171 return Ok(RpStat::new(meta));
172 }
173
174 if self.config.stat_dir && capability.list_with_recursive {
175 let mut l = self.srv.list(
176 ctx,
177 path,
178 OpList::default().with_recursive(true).with_limit(1),
179 )?;
180
181 return if l.next().await?.is_some() {
182 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
183 } else {
184 Err(Error::new(
185 ErrorKind::NotFound,
186 "the directory is not found",
187 ))
188 };
189 }
190 }
191
192 self.srv.stat(ctx, path, args).await
193 }
194
195 fn simulate_list(
196 &self,
197 ctx: &OperationContext,
198 path: &str,
199 args: OpList,
200 ) -> Result<SimulateLister> {
201 let cap = self.srv.capability();
202
203 let recursive = args.recursive();
204 let forward = args;
205
206 let lister = match (
207 recursive,
208 cap.list_with_recursive,
209 self.config.list_recursive,
210 ) {
211 (_, true, _) => {
213 let p = self.srv.list(ctx, path, forward)?;
214 SimulateLister::One(p)
215 }
216 (true, false, true) => {
218 if path.ends_with('/') {
219 let p = ServicerFlatLister::new(ctx.clone(), self.srv.clone(), path);
220 SimulateLister::Two(p)
221 } else {
222 let parent = get_parent(path);
223 let p = ServicerFlatLister::new(ctx.clone(), self.srv.clone(), parent);
224 let p = PrefixLister::new(p, path);
225 SimulateLister::Four(p)
226 }
227 }
228 (true, false, false) => {
230 let p = self.srv.list(ctx, path, forward)?;
231 SimulateLister::One(p)
232 }
233 (false, false, _) => {
235 if path.ends_with('/') {
236 let p = self.srv.list(ctx, path, forward)?;
237 SimulateLister::One(p)
238 } else {
239 let parent = get_parent(path);
240 let p = self.srv.list(ctx, parent, forward)?;
241 let p = PrefixLister::new(p, path);
242 SimulateLister::Three(p)
243 }
244 }
245 };
246
247 Ok(lister)
248 }
249
250 async fn simulate_delete_with_recursive(
251 &self,
252 ctx: &OperationContext,
253 path: &str,
254 args: OpDelete,
255 deleter: &mut oio::Deleter,
256 ) -> Result<()> {
257 if !self.capability().delete_with_recursive {
258 return Err(Error::new(
259 ErrorKind::Unsupported,
260 "recursive delete is not supported",
261 ));
262 }
263
264 let non_recursive = args.clone().with_recursive(false);
265
266 let mut lister = self.simulate_list(ctx, path, OpList::new().with_recursive(true))?;
267
268 while let Some(entry) = lister.next().await? {
269 let entry = entry.into_entry();
270 let mut entry_args = non_recursive.clone();
271 if let Some(version) = entry.metadata().version() {
272 entry_args = entry_args.with_version(version);
273 }
274 deleter.delete(entry.path(), entry_args).await?;
275 }
276
277 Ok(())
278 }
279}
280
281impl Service for SimulateService {
282 type Reader = SimulateReader;
283 type Writer = oio::Writer;
284 type Lister = SimulateLister;
285 type Deleter = SimulateDeleter;
286 type Copier = oio::Copier;
287
288 fn info(&self) -> ServiceInfo {
289 self.srv.info()
290 }
291
292 fn capability(&self) -> Capability {
293 self.simulate_capability(self.srv.capability())
294 }
295
296 async fn create_dir(
297 &self,
298 ctx: &OperationContext,
299 path: &str,
300 args: OpCreateDir,
301 ) -> Result<RpCreateDir> {
302 self.simulate_create_dir(ctx, path, args).await
303 }
304
305 fn read(&self, ctx: &OperationContext, path: &str, args: OpRead) -> Result<Self::Reader> {
306 let capability = self.srv.capability();
307 let simulate_read_with_suffix =
308 self.config.read_with_suffix && capability.read && !capability.read_with_suffix;
309 let reader = self.srv.read(ctx, path, args.clone())?;
310 let reader = SimulateReader::new(
311 ctx.clone(),
312 self.srv.clone(),
313 path.to_string(),
314 args,
315 reader,
316 simulate_read_with_suffix,
317 );
318 Ok(reader)
319 }
320
321 fn write(&self, ctx: &OperationContext, path: &str, args: OpWrite) -> Result<Self::Writer> {
322 self.srv.write(ctx, path, args)
323 }
324
325 fn copy(
326 &self,
327 ctx: &OperationContext,
328 from: &str,
329 to: &str,
330 args: OpCopy,
331 opts: OpCopier,
332 ) -> Result<Self::Copier> {
333 self.srv.copy(ctx, from, to, args, opts)
334 }
335
336 async fn rename(
337 &self,
338 ctx: &OperationContext,
339 from: &str,
340 to: &str,
341 args: OpRename,
342 ) -> Result<RpRename> {
343 self.srv.rename(ctx, from, to, args).await
344 }
345
346 async fn stat(&self, ctx: &OperationContext, path: &str, args: OpStat) -> Result<RpStat> {
347 self.simulate_stat(ctx, path, args).await
348 }
349
350 fn delete(&self, ctx: &OperationContext) -> Result<Self::Deleter> {
351 let deleter = self.srv.delete(ctx)?;
352 Ok(SimulateDeleter::new(
353 ctx.clone(),
354 self.srv.clone(),
355 self.config.clone(),
356 deleter,
357 ))
358 }
359
360 fn list(&self, ctx: &OperationContext, path: &str, args: OpList) -> Result<Self::Lister> {
361 self.simulate_list(ctx, path, args)
362 }
363
364 async fn presign(
365 &self,
366 ctx: &OperationContext,
367 path: &str,
368 args: OpPresign,
369 ) -> Result<RpPresign> {
370 self.srv.presign(ctx, path, args).await
371 }
372}
373
374pub type SimulateLister = FourWays<
375 oio::Lister,
376 ServicerFlatLister,
377 PrefixLister<oio::Lister>,
378 PrefixLister<ServicerFlatLister>,
379>;
380
381pub struct SimulateReader {
382 ctx: OperationContext,
383 srv: Servicer,
384 path: String,
385 args: OpRead,
386 inner: oio::Reader,
387 simulate_read_with_suffix: bool,
388}
389
390impl SimulateReader {
391 fn new(
392 ctx: OperationContext,
393 srv: Servicer,
394 path: String,
395 args: OpRead,
396 inner: oio::Reader,
397 simulate_read_with_suffix: bool,
398 ) -> Self {
399 Self {
400 ctx,
401 srv,
402 path,
403 args,
404 inner,
405 simulate_read_with_suffix,
406 }
407 }
408
409 async fn content_length(&self) -> Result<u64> {
410 if let Some(v) = self.args.content_length_hint() {
411 return Ok(v);
412 }
413
414 let mut op = OpStat::new();
415 if let Some(version) = self.args.version() {
416 op = op.with_version(version);
417 }
418
419 Ok(self
420 .srv
421 .stat(&self.ctx, &self.path, op)
422 .await?
423 .into_metadata()
424 .content_length())
425 }
426
427 async fn resolve_range(&self, range: BytesRange) -> Result<BytesRange> {
428 if !self.simulate_read_with_suffix || !range.is_suffix() {
429 return Ok(range);
430 }
431
432 let BytesRange::Suffix { size } = range else {
433 unreachable!("checked by BytesRange::is_suffix")
434 };
435
436 let content_length = self.content_length().await?;
437 let start = content_length.saturating_sub(size);
438 Ok(BytesRange::new(start, Some(content_length - start)))
439 }
440}
441
442impl oio::Read for SimulateReader {
443 async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
444 let range = self.resolve_range(range).await?;
445 self.inner.open(range).await
446 }
447
448 async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
449 let range = self.resolve_range(range).await?;
450 self.inner.read(range).await
451 }
452}
453
454pub struct ServicerFlatLister {
455 ctx: OperationContext,
456 srv: Servicer,
457 next_dir: Option<oio::Entry>,
458 active_lister: Vec<(Option<oio::Entry>, oio::Lister)>,
459}
460
461impl ServicerFlatLister {
462 fn new(ctx: OperationContext, srv: Servicer, path: &str) -> Self {
463 Self {
464 ctx,
465 srv,
466 next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))),
467 active_lister: vec![],
468 }
469 }
470}
471
472impl oio::List for ServicerFlatLister {
473 async fn next(&mut self) -> Result<Option<oio::Entry>> {
474 loop {
475 if let Some(de) = self.next_dir.take() {
476 let mut l = match self.srv.list(&self.ctx, de.path(), OpList::new()) {
477 Ok(v) => v,
478 Err(e) if e.kind() == ErrorKind::PermissionDenied => {
479 log::warn!(
480 "ServicerFlatLister skipping directory due to permission denied: {}",
481 de.path()
482 );
483 continue;
484 }
485 Err(e) if e.kind() == ErrorKind::NotFound => {
486 log::warn!(
487 "ServicerFlatLister skipping directory due to not found during listing: {}",
488 de.path()
489 );
490 continue;
491 }
492 Err(e) => return Err(e),
493 };
494 let first = loop {
495 match l.next().await {
496 Ok(v) => break v,
497 Err(e) if e.kind() == ErrorKind::NotFound => {
498 log::warn!(
499 "ServicerFlatLister skipping entry due to not found during listing: {}",
500 de.path()
501 );
502 continue;
503 }
504 Err(e) => return Err(e),
505 }
506 };
507 if let Some(v) = first {
508 self.active_lister.push((Some(de.clone()), l));
509
510 if v.mode().is_dir() {
511 if v.path() != de.path() {
512 self.next_dir = Some(v);
513 continue;
514 }
515 } else {
516 return Ok(Some(v));
517 }
518 }
519 }
520
521 if matches!(self.active_lister.last(), Some((None, _))) {
522 let _ = self.active_lister.pop();
523 continue;
524 }
525
526 let (de, lister) = match self.active_lister.last_mut() {
527 Some((de, lister)) => (de, lister),
528 None => return Ok(None),
529 };
530
531 match lister.next().await {
532 Err(e) if e.kind() == ErrorKind::NotFound => {
533 let path = de.as_ref().map(|entry| entry.path()).unwrap_or("<unknown>");
534 log::warn!(
535 "ServicerFlatLister skipping entry due to not found during recursive listing: {}",
536 path
537 );
538 continue;
539 }
540 Err(e) => return Err(e),
541 Ok(Some(v)) if v.mode().is_dir() => {
542 if v.path()
543 != de
544 .as_ref()
545 .expect("de must be present before listing")
546 .path()
547 {
548 self.next_dir = Some(v);
549 continue;
550 }
551 }
552 Ok(Some(v)) => return Ok(Some(v)),
553 Ok(None) => match de.take() {
554 Some(de) => return Ok(Some(de)),
555 None => {
556 let _ = self.active_lister.pop();
557 continue;
558 }
559 },
560 }
561 }
562 }
563}
564
565pub struct SimulateDeleter {
567 ctx: OperationContext,
568 srv: Servicer,
569 config: SimulateLayer,
570 inner: oio::Deleter,
571}
572
573impl SimulateDeleter {
574 pub fn new(
575 ctx: OperationContext,
576 srv: Servicer,
577 config: SimulateLayer,
578 inner: oio::Deleter,
579 ) -> Self {
580 Self {
581 ctx,
582 srv,
583 config,
584 inner,
585 }
586 }
587}
588
589impl oio::Delete for SimulateDeleter {
590 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
591 if args.recursive() {
592 let cap = self.srv.capability();
593
594 if cap.delete_with_recursive {
595 return self.inner.delete(path, args).await;
596 }
597
598 if self.config.delete_recursive {
599 let service = SimulateService {
600 srv: self.srv.clone(),
601 config: self.config.clone(),
602 };
603 return service
604 .simulate_delete_with_recursive(&self.ctx, path, args, &mut self.inner)
605 .await;
606 }
607 }
608
609 self.inner.delete(path, args).await
610 }
611
612 async fn close(&mut self) -> Result<()> {
613 self.inner.close().await
614 }
615}
616
617#[cfg(test)]
618mod tests {
619 use std::sync::Mutex;
620
621 use super::*;
622
623 #[derive(Debug)]
624 struct MockService {
625 capability: Capability,
626 }
627
628 impl Service for MockService {
629 type Reader = ();
630 type Writer = ();
631 type Lister = ();
632 type Deleter = ();
633 type Copier = ();
634
635 fn info(&self) -> ServiceInfo {
636 ServiceInfo::with_scheme("mock")
637 }
638
639 fn capability(&self) -> Capability {
640 self.capability
641 }
642
643 async fn create_dir(
644 &self,
645 _: &OperationContext,
646 _: &str,
647 _: OpCreateDir,
648 ) -> Result<RpCreateDir> {
649 Err(Error::new(
650 ErrorKind::Unsupported,
651 "operation is not supported",
652 ))
653 }
654
655 async fn stat(&self, _: &OperationContext, _: &str, _: OpStat) -> Result<RpStat> {
656 Err(Error::new(
657 ErrorKind::Unsupported,
658 "operation is not supported",
659 ))
660 }
661
662 fn read(&self, _ctx: &OperationContext, _: &str, _: OpRead) -> Result<Self::Reader> {
663 Err(Error::new(
664 ErrorKind::Unsupported,
665 "operation is not supported",
666 ))
667 }
668
669 fn write(&self, _ctx: &OperationContext, _: &str, _: OpWrite) -> Result<Self::Writer> {
670 Err(Error::new(
671 ErrorKind::Unsupported,
672 "operation is not supported",
673 ))
674 }
675
676 fn delete(&self, _ctx: &OperationContext) -> Result<Self::Deleter> {
677 Err(Error::new(
678 ErrorKind::Unsupported,
679 "operation is not supported",
680 ))
681 }
682
683 fn list(&self, _ctx: &OperationContext, _: &str, _: OpList) -> Result<Self::Lister> {
684 Err(Error::new(
685 ErrorKind::Unsupported,
686 "operation is not supported",
687 ))
688 }
689
690 fn copy(
691 &self,
692 _: &OperationContext,
693 _: &str,
694 _: &str,
695 _: OpCopy,
696 _: OpCopier,
697 ) -> Result<Self::Copier> {
698 Err(Error::new(
699 ErrorKind::Unsupported,
700 "operation is not supported",
701 ))
702 }
703
704 async fn rename(
705 &self,
706 _: &OperationContext,
707 _: &str,
708 _: &str,
709 _: OpRename,
710 ) -> Result<RpRename> {
711 Err(Error::new(
712 ErrorKind::Unsupported,
713 "operation is not supported",
714 ))
715 }
716
717 async fn presign(&self, _: &OperationContext, _: &str, _: OpPresign) -> Result<RpPresign> {
718 Err(Error::new(
719 ErrorKind::Unsupported,
720 "operation is not supported",
721 ))
722 }
723 }
724
725 struct MockReader {
726 observed_range: Arc<Mutex<Option<BytesRange>>>,
727 }
728
729 impl oio::Read for MockReader {
730 async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
731 *self.observed_range.lock().expect("mutex must not poison") = Some(range);
732 Ok((
733 RpRead::new(Metadata::new(EntryMode::FILE).with_content_length(0)),
734 Box::new(Buffer::new()),
735 ))
736 }
737
738 async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
739 *self.observed_range.lock().expect("mutex must not poison") = Some(range);
740 Ok((
741 RpRead::new(Metadata::new(EntryMode::FILE).with_content_length(0)),
742 Buffer::new(),
743 ))
744 }
745 }
746
747 #[test]
748 fn simulate_layer_exposes_read_with_suffix() {
749 let capability = Capability {
750 read: true,
751 read_with_suffix: false,
752 ..Default::default()
753 };
754 let srv = Arc::new(MockService { capability }) as Servicer;
755
756 let srv = SimulateLayer::default().apply_service(srv);
757
758 assert!(srv.capability().read_with_suffix);
759 }
760
761 #[test]
762 fn simulate_layer_can_disable_read_with_suffix() {
763 let capability = Capability {
764 read: true,
765 read_with_suffix: false,
766 ..Default::default()
767 };
768 let srv = Arc::new(MockService { capability }) as Servicer;
769
770 let srv = SimulateLayer::default()
771 .with_read_with_suffix(false)
772 .apply_service(srv);
773
774 assert!(!srv.capability().read_with_suffix);
775 }
776
777 #[tokio::test]
778 async fn simulate_reader_uses_content_length_hint_for_suffix() -> Result<()> {
779 let observed_range = Arc::new(Mutex::new(None));
780 let (_, args, _) = options::ReadOptions {
781 content_length_hint: Some(42),
782 ..Default::default()
783 }
784 .into();
785 let reader = SimulateReader::new(
786 OperationContext::new(),
787 Arc::new(MockService {
788 capability: Capability::default(),
789 }),
790 "test".to_string(),
791 args,
792 Box::new(MockReader {
793 observed_range: observed_range.clone(),
794 }),
795 true,
796 );
797
798 oio::Read::read(&reader, BytesRange::suffix(10)).await?;
799
800 assert_eq!(
801 *observed_range.lock().expect("mutex must not poison"),
802 Some(BytesRange::new(32, Some(10)))
803 );
804
805 Ok(())
806 }
807}