1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
118 builder: ExponentialBuilder,
119 notify: Arc<I>,
120}
121
122impl<I: RetryInterceptor> Clone for RetryLayer<I> {
123 fn clone(&self) -> Self {
124 Self {
125 builder: self.builder,
126 notify: self.notify.clone(),
127 }
128 }
129}
130
131impl Default for RetryLayer {
132 fn default() -> Self {
133 Self {
134 builder: ExponentialBuilder::default(),
135 notify: Arc::new(DefaultRetryInterceptor),
136 }
137 }
138}
139
140impl RetryLayer {
141 pub fn new() -> RetryLayer {
155 Self::default()
156 }
157}
158
159impl<I: RetryInterceptor> RetryLayer<I> {
160 pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
175 RetryLayer {
176 builder: self.builder,
177 notify: Arc::new(notify),
178 }
179 }
180
181 pub fn with_jitter(mut self) -> Self {
186 self.builder = self.builder.with_jitter();
187 self
188 }
189
190 pub fn with_factor(mut self, factor: f32) -> Self {
196 self.builder = self.builder.with_factor(factor);
197 self
198 }
199
200 pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
202 self.builder = self.builder.with_min_delay(min_delay);
203 self
204 }
205
206 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
210 self.builder = self.builder.with_max_delay(max_delay);
211 self
212 }
213
214 pub fn with_max_times(mut self, max_times: usize) -> Self {
218 self.builder = self.builder.with_max_times(max_times);
219 self
220 }
221}
222
223impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
224 type LayeredAccess = RetryAccessor<A, I>;
225
226 fn layer(&self, inner: A) -> Self::LayeredAccess {
227 RetryAccessor {
228 inner: Arc::new(inner),
229 builder: self.builder,
230 notify: self.notify.clone(),
231 }
232 }
233}
234
235pub trait RetryInterceptor: Send + Sync + 'static {
237 fn intercept(&self, err: &Error, dur: Duration);
253}
254
255impl<F> RetryInterceptor for F
256where
257 F: Fn(&Error, Duration) + Send + Sync + 'static,
258{
259 fn intercept(&self, err: &Error, dur: Duration) {
260 self(err, dur);
261 }
262}
263
264pub struct DefaultRetryInterceptor;
266
267impl RetryInterceptor for DefaultRetryInterceptor {
268 fn intercept(&self, err: &Error, dur: Duration) {
269 warn!(
270 target: "opendal::layers::retry",
271 "will retry after {}s because: {}",
272 dur.as_secs_f64(), err)
273 }
274}
275
276pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
277 inner: Arc<A>,
278 builder: ExponentialBuilder,
279 notify: Arc<I>,
280}
281
282impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
283 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("RetryAccessor")
285 .field("inner", &self.inner)
286 .finish_non_exhaustive()
287 }
288}
289
290impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
291 type Inner = A;
292 type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
293 type Writer = RetryWrapper<A::Writer, I>;
294 type Lister = RetryWrapper<A::Lister, I>;
295 type Deleter = RetryWrapper<A::Deleter, I>;
296
297 fn inner(&self) -> &Self::Inner {
298 &self.inner
299 }
300
301 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
302 { || self.inner.create_dir(path, args.clone()) }
303 .retry(self.builder)
304 .when(|e| e.is_temporary())
305 .notify(|err, dur: Duration| self.notify.intercept(err, dur))
306 .await
307 .map_err(|e| e.set_persistent())
308 }
309
310 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
311 let (rp, reader) = { || self.inner.read(path, args.clone()) }
312 .retry(self.builder)
313 .when(|e| e.is_temporary())
314 .notify(|err, dur| self.notify.intercept(err, dur))
315 .await
316 .map_err(|e| e.set_persistent())?;
317
318 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
319 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
320
321 Ok((rp, retry_wrapper))
322 }
323
324 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
325 { || self.inner.write(path, args.clone()) }
326 .retry(self.builder)
327 .when(|e| e.is_temporary())
328 .notify(|err, dur| self.notify.intercept(err, dur))
329 .await
330 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
331 .map_err(|e| e.set_persistent())
332 }
333
334 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
335 { || self.inner.stat(path, args.clone()) }
336 .retry(self.builder)
337 .when(|e| e.is_temporary())
338 .notify(|err, dur| self.notify.intercept(err, dur))
339 .await
340 .map_err(|e| e.set_persistent())
341 }
342
343 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
344 { || self.inner.delete() }
345 .retry(self.builder)
346 .when(|e| e.is_temporary())
347 .notify(|err, dur| self.notify.intercept(err, dur))
348 .await
349 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
350 .map_err(|e| e.set_persistent())
351 }
352
353 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
354 { || self.inner.copy(from, to, args.clone()) }
355 .retry(self.builder)
356 .when(|e| e.is_temporary())
357 .notify(|err, dur| self.notify.intercept(err, dur))
358 .await
359 .map_err(|e| e.set_persistent())
360 }
361
362 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
363 { || self.inner.rename(from, to, args.clone()) }
364 .retry(self.builder)
365 .when(|e| e.is_temporary())
366 .notify(|err, dur| self.notify.intercept(err, dur))
367 .await
368 .map_err(|e| e.set_persistent())
369 }
370
371 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
372 { || self.inner.list(path, args.clone()) }
373 .retry(self.builder)
374 .when(|e| e.is_temporary())
375 .notify(|err, dur| self.notify.intercept(err, dur))
376 .await
377 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
378 .map_err(|e| e.set_persistent())
379 }
380}
381
382pub struct RetryReader<A, R> {
383 inner: Arc<A>,
384 reader: Option<R>,
385
386 path: String,
387 args: OpRead,
388}
389
390impl<A, R> RetryReader<A, R> {
391 fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
392 Self {
393 inner,
394 reader: Some(r),
395
396 path,
397 args,
398 }
399 }
400}
401
402impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
403 async fn read(&mut self) -> Result<Buffer> {
404 loop {
405 match self.reader.take() {
406 None => {
407 let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
408 self.reader = Some(r);
409 continue;
410 }
411 Some(mut reader) => {
412 let buf = reader.read().await?;
413 self.reader = Some(reader);
414 self.args.range_mut().advance(buf.len() as u64);
415 return Ok(buf);
416 }
417 }
418 }
419 }
420}
421
422pub struct RetryWrapper<R, I> {
423 inner: Option<R>,
424 notify: Arc<I>,
425
426 builder: ExponentialBuilder,
427}
428
429impl<R, I> RetryWrapper<R, I> {
430 fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
431 Self {
432 inner: Some(inner),
433 notify,
434 builder: backoff,
435 }
436 }
437
438 fn take_inner(&mut self) -> Result<R> {
439 self.inner.take().ok_or_else(|| {
440 Error::new(
441 ErrorKind::Unexpected,
442 "retry layer is in bad state, please make sure future not dropped before ready",
443 )
444 })
445 }
446}
447
448impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
449 async fn read(&mut self) -> Result<Buffer> {
450 use backon::RetryableWithContext;
451
452 let inner = self.take_inner()?;
453
454 let (inner, res) = {
455 |mut r: R| async move {
456 let res = r.read().await;
457
458 (r, res)
459 }
460 }
461 .retry(self.builder)
462 .when(|e| e.is_temporary())
463 .context(inner)
464 .notify(|err, dur| self.notify.intercept(err, dur))
465 .await;
466
467 self.inner = Some(inner);
468 res.map_err(|err| err.set_persistent())
469 }
470}
471
472impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
473 async fn write(&mut self, bs: Buffer) -> Result<()> {
474 use backon::RetryableWithContext;
475
476 let inner = self.take_inner()?;
477
478 let ((inner, _), res) = {
479 |(mut r, bs): (R, Buffer)| async move {
480 let res = r.write(bs.clone()).await;
481
482 ((r, bs), res)
483 }
484 }
485 .retry(self.builder)
486 .when(|e| e.is_temporary())
487 .context((inner, bs))
488 .notify(|err, dur| self.notify.intercept(err, dur))
489 .await;
490
491 self.inner = Some(inner);
492 res.map_err(|err| err.set_persistent())
493 }
494
495 async fn abort(&mut self) -> Result<()> {
496 use backon::RetryableWithContext;
497
498 let inner = self.take_inner()?;
499
500 let (inner, res) = {
501 |mut r: R| async move {
502 let res = r.abort().await;
503
504 (r, res)
505 }
506 }
507 .retry(self.builder)
508 .when(|e| e.is_temporary())
509 .context(inner)
510 .notify(|err, dur| self.notify.intercept(err, dur))
511 .await;
512
513 self.inner = Some(inner);
514 res.map_err(|err| err.set_persistent())
515 }
516
517 async fn close(&mut self) -> Result<Metadata> {
518 use backon::RetryableWithContext;
519
520 let inner = self.take_inner()?;
521
522 let (inner, res) = {
523 |mut r: R| async move {
524 let res = r.close().await;
525
526 (r, res)
527 }
528 }
529 .retry(self.builder)
530 .when(|e| e.is_temporary())
531 .context(inner)
532 .notify(|err, dur| self.notify.intercept(err, dur))
533 .await;
534
535 self.inner = Some(inner);
536 res.map_err(|err| err.set_persistent())
537 }
538}
539
540impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
541 async fn next(&mut self) -> Result<Option<oio::Entry>> {
542 use backon::RetryableWithContext;
543
544 let inner = self.take_inner()?;
545
546 let (inner, res) = {
547 |mut p: P| async move {
548 let res = p.next().await;
549
550 (p, res)
551 }
552 }
553 .retry(self.builder)
554 .when(|e| e.is_temporary())
555 .context(inner)
556 .notify(|err, dur| self.notify.intercept(err, dur))
557 .await;
558
559 self.inner = Some(inner);
560 res.map_err(|err| err.set_persistent())
561 }
562}
563
564impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
565 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
566 { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
567 .retry(self.builder)
568 .when(|e| e.is_temporary())
569 .notify(|err, dur| {
570 self.notify.intercept(err, dur);
571 })
572 .call()
573 .map_err(|e| e.set_persistent())
574 }
575
576 async fn flush(&mut self) -> Result<usize> {
577 use backon::RetryableWithContext;
578
579 let inner = self.take_inner()?;
580
581 let (inner, res) = {
582 |mut p: P| async move {
583 let res = p.flush().await;
584
585 (p, res)
586 }
587 }
588 .retry(self.builder)
589 .when(|e| e.is_temporary())
590 .context(inner)
591 .notify(|err, dur| self.notify.intercept(err, dur))
592 .await;
593
594 self.inner = Some(inner);
595 res.map_err(|err| err.set_persistent())
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use std::mem;
602 use std::sync::Arc;
603 use std::sync::Mutex;
604
605 use bytes::Bytes;
606 use futures::TryStreamExt;
607 use futures::stream;
608 use tracing_subscriber::filter::LevelFilter;
609
610 use super::*;
611 use crate::layers::LoggingLayer;
612
613 #[derive(Default, Clone)]
614 struct MockBuilder {
615 attempt: Arc<Mutex<usize>>,
616 }
617
618 impl Builder for MockBuilder {
619 type Config = ();
620
621 fn build(self) -> Result<impl Access> {
622 Ok(MockService {
623 attempt: self.attempt.clone(),
624 })
625 }
626 }
627
628 #[derive(Debug, Clone, Default)]
629 struct MockService {
630 attempt: Arc<Mutex<usize>>,
631 }
632
633 impl Access for MockService {
634 type Reader = MockReader;
635 type Writer = MockWriter;
636 type Lister = MockLister;
637 type Deleter = MockDeleter;
638
639 fn info(&self) -> Arc<AccessorInfo> {
640 let am = AccessorInfo::default();
641 am.set_scheme("mock").set_native_capability(Capability {
642 read: true,
643 write: true,
644 write_can_multi: true,
645 delete: true,
646 delete_max_size: Some(10),
647 stat: true,
648 list: true,
649 list_with_recursive: true,
650 ..Default::default()
651 });
652
653 am.into()
654 }
655
656 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
657 Ok(RpStat::new(
658 Metadata::new(EntryMode::FILE).with_content_length(13),
659 ))
660 }
661
662 async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
663 Ok((
664 RpRead::new(),
665 MockReader {
666 buf: Bytes::from("Hello, World!").into(),
667 range: args.range(),
668 attempt: self.attempt.clone(),
669 },
670 ))
671 }
672
673 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
674 Ok((
675 RpDelete::default(),
676 MockDeleter {
677 size: 0,
678 attempt: self.attempt.clone(),
679 },
680 ))
681 }
682
683 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
684 Ok((RpWrite::new(), MockWriter {}))
685 }
686
687 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
688 let lister = MockLister::default();
689 Ok((RpList::default(), lister))
690 }
691 }
692
693 #[derive(Debug, Clone, Default)]
694 struct MockReader {
695 buf: Buffer,
696 range: BytesRange,
697 attempt: Arc<Mutex<usize>>,
698 }
699
700 impl oio::Read for MockReader {
701 async fn read(&mut self) -> Result<Buffer> {
702 let mut attempt = self.attempt.lock().unwrap();
703 *attempt += 1;
704
705 match *attempt {
706 1 => Err(
707 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
708 .set_temporary(),
709 ),
710 2 => Err(
711 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
712 .set_temporary(),
713 ),
714 3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
716 4 => Err(
717 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
718 .set_temporary(),
719 ),
720 5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
722 _ => unreachable!(),
723 }
724 }
725 }
726
727 #[derive(Debug, Clone, Default)]
728 struct MockWriter {}
729
730 impl oio::Write for MockWriter {
731 async fn write(&mut self, _: Buffer) -> Result<()> {
732 Ok(())
733 }
734
735 async fn close(&mut self) -> Result<Metadata> {
736 Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
737 }
738
739 async fn abort(&mut self) -> Result<()> {
740 Ok(())
741 }
742 }
743
744 #[derive(Debug, Clone, Default)]
745 struct MockLister {
746 attempt: usize,
747 }
748
749 impl oio::List for MockLister {
750 async fn next(&mut self) -> Result<Option<oio::Entry>> {
751 self.attempt += 1;
752 match self.attempt {
753 1 => Err(Error::new(
754 ErrorKind::RateLimited,
755 "retryable rate limited error from lister",
756 )
757 .set_temporary()),
758 2 => Ok(Some(oio::Entry::new(
759 "hello",
760 Metadata::new(EntryMode::FILE),
761 ))),
762 3 => Ok(Some(oio::Entry::new(
763 "world",
764 Metadata::new(EntryMode::FILE),
765 ))),
766 4 => Err(
767 Error::new(ErrorKind::Unexpected, "retryable internal server error")
768 .set_temporary(),
769 ),
770 5 => Ok(Some(oio::Entry::new(
771 "2023/",
772 Metadata::new(EntryMode::DIR),
773 ))),
774 6 => Ok(Some(oio::Entry::new(
775 "0208/",
776 Metadata::new(EntryMode::DIR),
777 ))),
778 7 => Ok(None),
779 _ => {
780 unreachable!()
781 }
782 }
783 }
784 }
785
786 #[derive(Debug, Clone, Default)]
787 struct MockDeleter {
788 size: usize,
789 attempt: Arc<Mutex<usize>>,
790 }
791
792 impl oio::Delete for MockDeleter {
793 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
794 self.size += 1;
795 Ok(())
796 }
797
798 async fn flush(&mut self) -> Result<usize> {
799 let mut attempt = self.attempt.lock().unwrap();
800 *attempt += 1;
801
802 match *attempt {
803 1 => Err(
804 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
805 .set_temporary(),
806 ),
807 2 => {
808 self.size -= 1;
809 Ok(1)
810 }
811 3 => Err(
812 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
813 .set_temporary(),
814 ),
815 4 => Err(
816 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
817 .set_temporary(),
818 ),
819 5 => {
820 let s = mem::take(&mut self.size);
821 Ok(s)
822 }
823 _ => unreachable!(),
824 }
825 }
826 }
827
828 #[tokio::test]
829 async fn test_retry_read() {
830 let _ = tracing_subscriber::fmt()
831 .with_max_level(LevelFilter::TRACE)
832 .with_test_writer()
833 .try_init();
834
835 let builder = MockBuilder::default();
836 let op = Operator::new(builder.clone())
837 .unwrap()
838 .layer(LoggingLayer::default())
839 .layer(RetryLayer::new())
840 .finish();
841
842 let r = op.reader("retryable_error").await.unwrap();
843 let mut content = Vec::new();
844 let size = r
845 .read_into(&mut content, ..)
846 .await
847 .expect("read must succeed");
848 assert_eq!(size, 13);
849 assert_eq!(content, "Hello, World!".as_bytes());
850 assert_eq!(*builder.attempt.lock().unwrap(), 5);
852 }
853
854 #[tokio::test]
856 async fn test_retry_write_fail_on_close() {
857 let _ = tracing_subscriber::fmt()
858 .with_max_level(LevelFilter::TRACE)
859 .with_test_writer()
860 .try_init();
861
862 let builder = MockBuilder::default();
863 let op = Operator::new(builder.clone())
864 .unwrap()
865 .layer(
866 RetryLayer::new()
867 .with_min_delay(Duration::from_millis(1))
868 .with_max_delay(Duration::from_millis(1))
869 .with_jitter(),
870 )
871 .layer(LoggingLayer::default())
874 .finish();
875
876 let mut w = op.writer("test_write").await.unwrap();
877 w.write("aaa").await.unwrap();
878 w.write("bbb").await.unwrap();
879 match w.close().await {
880 Ok(_) => (),
881 Err(_) => {
882 w.abort().await.unwrap();
883 }
884 };
885 }
886
887 #[tokio::test]
888 async fn test_retry_list() {
889 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
890
891 let builder = MockBuilder::default();
892 let op = Operator::new(builder.clone())
893 .unwrap()
894 .layer(RetryLayer::new())
895 .finish();
896
897 let expected = vec!["hello", "world", "2023/", "0208/"];
898
899 let mut lister = op
900 .lister("retryable_error/")
901 .await
902 .expect("service must support list");
903 let mut actual = Vec::new();
904 while let Some(obj) = lister.try_next().await.expect("must success") {
905 actual.push(obj.name().to_owned());
906 }
907
908 assert_eq!(actual, expected);
909 }
910
911 #[tokio::test]
912 async fn test_retry_batch() {
913 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
914
915 let builder = MockBuilder::default();
916 let op = Operator::new(builder.clone())
918 .unwrap()
919 .layer(
920 RetryLayer::new()
921 .with_min_delay(Duration::from_secs_f32(0.1))
922 .with_max_times(5),
923 )
924 .finish();
925
926 let paths = vec!["hello", "world", "test", "batch"];
927 op.delete_stream(stream::iter(paths)).await.unwrap();
928 assert_eq!(*builder.attempt.lock().unwrap(), 5);
929 }
930}