opendal/layers/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31/// Add retry for temporary failed operations.
32///
33/// # Notes
34///
35/// This layer will retry failed operations when [`Error::is_temporary`]
36/// returns true. If operation still failed, this layer will set error to
37/// `Persistent` which means error has been retried.
38///
39/// # Panics
40///
41/// While retrying `Reader` or `Writer` operations, please make sure either:
42///
43/// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`.
44/// - Or, won't call any `Reader` or `Writer` methods after retry returns final error.
45///
46/// Otherwise, `RetryLayer` could panic while hitting in bad states.
47///
48/// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive.
49///
50/// ```no_run
51/// # use std::time::Duration;
52///
53/// # use opendal::layers::RetryLayer;
54/// # use opendal::layers::TimeoutLayer;
55/// # use opendal::services;
56/// # use opendal::Operator;
57/// # use opendal::Result;
58///
59/// # fn main() -> Result<()> {
60/// let op = Operator::new(services::Memory::default())?
61///     // This is fine, since timeout happen during retry.
62///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
63///     .layer(RetryLayer::new())
64///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
65///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
66///     .finish();
67/// Ok(())
68/// # }
69/// ```
70///
71/// # Examples
72///
73/// ```no_run
74/// # use opendal::layers::RetryLayer;
75/// # use opendal::services;
76/// # use opendal::Operator;
77/// # use opendal::Result;
78///
79/// # fn main() -> Result<()> {
80/// let _ = Operator::new(services::Memory::default())?
81///     .layer(RetryLayer::new())
82///     .finish();
83/// Ok(())
84/// # }
85/// ```
86///
87/// ## Customize retry interceptor
88///
89/// RetryLayer accepts [`RetryInterceptor`] to allow users to customize
90/// their own retry interceptor logic.
91///
92/// ```no_run
93/// # use std::time::Duration;
94///
95/// # use opendal::layers::RetryInterceptor;
96/// # use opendal::layers::RetryLayer;
97/// # use opendal::services;
98/// # use opendal::Error;
99/// # use opendal::Operator;
100/// # use opendal::Result;
101///
102/// struct MyRetryInterceptor;
103///
104/// impl RetryInterceptor for MyRetryInterceptor {
105///     fn intercept(&self, err: &Error, dur: Duration) {
106///         // do something
107///     }
108/// }
109///
110/// # fn main() -> Result<()> {
111/// let _ = Operator::new(services::Memory::default())?
112///     .layer(RetryLayer::new().with_notify(MyRetryInterceptor))
113///     .finish();
114/// Ok(())
115/// # }
116/// ```
117pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
118    builder: ExponentialBuilder,
119    notify: Arc<I>,
120}
121
122impl<I: RetryInterceptor> Clone for RetryLayer<I> {
123    fn clone(&self) -> Self {
124        Self {
125            builder: self.builder,
126            notify: self.notify.clone(),
127        }
128    }
129}
130
131impl Default for RetryLayer {
132    fn default() -> Self {
133        Self {
134            builder: ExponentialBuilder::default(),
135            notify: Arc::new(DefaultRetryInterceptor),
136        }
137    }
138}
139
140impl RetryLayer {
141    /// Create a new retry layer.
142    /// # Examples
143    ///
144    /// ```no_run
145    /// use anyhow::Result;
146    /// use opendal::layers::RetryLayer;
147    /// use opendal::services;
148    /// use opendal::Operator;
149    ///
150    /// let _ = Operator::new(services::Memory::default())
151    ///     .expect("must init")
152    ///     .layer(RetryLayer::new());
153    /// ```
154    pub fn new() -> RetryLayer {
155        Self::default()
156    }
157}
158
159impl<I: RetryInterceptor> RetryLayer<I> {
160    /// Set the retry interceptor as new notify.
161    ///
162    /// ```no_run
163    /// use opendal::layers::RetryLayer;
164    /// use opendal::services;
165    /// use opendal::Operator;
166    ///
167    /// fn notify(_err: &opendal::Error, _dur: std::time::Duration) {}
168    ///
169    /// let _ = Operator::new(services::Memory::default())
170    ///     .expect("must init")
171    ///     .layer(RetryLayer::new().with_notify(notify))
172    ///     .finish();
173    /// ```
174    pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
175        RetryLayer {
176            builder: self.builder,
177            notify: Arc::new(notify),
178        }
179    }
180
181    /// Set jitter of current backoff.
182    ///
183    /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay)
184    /// to current delay.
185    pub fn with_jitter(mut self) -> Self {
186        self.builder = self.builder.with_jitter();
187        self
188    }
189
190    /// Set factor of current backoff.
191    ///
192    /// # Panics
193    ///
194    /// This function will panic if input factor smaller than `1.0`.
195    pub fn with_factor(mut self, factor: f32) -> Self {
196        self.builder = self.builder.with_factor(factor);
197        self
198    }
199
200    /// Set min_delay of current backoff.
201    pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
202        self.builder = self.builder.with_min_delay(min_delay);
203        self
204    }
205
206    /// Set max_delay of current backoff.
207    ///
208    /// Delay will not increase if current delay is larger than max_delay.
209    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
210        self.builder = self.builder.with_max_delay(max_delay);
211        self
212    }
213
214    /// Set max_times of current backoff.
215    ///
216    /// Backoff will return `None` if max times is reaching.
217    pub fn with_max_times(mut self, max_times: usize) -> Self {
218        self.builder = self.builder.with_max_times(max_times);
219        self
220    }
221}
222
223impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
224    type LayeredAccess = RetryAccessor<A, I>;
225
226    fn layer(&self, inner: A) -> Self::LayeredAccess {
227        RetryAccessor {
228            inner: Arc::new(inner),
229            builder: self.builder,
230            notify: self.notify.clone(),
231        }
232    }
233}
234
235/// RetryInterceptor is used to intercept while retry happened.
236pub trait RetryInterceptor: Send + Sync + 'static {
237    /// Everytime RetryLayer is retrying, this function will be called.
238    ///
239    /// # Timing
240    ///
241    /// just before the retry sleep.
242    ///
243    /// # Inputs
244    ///
245    /// - err: The error that caused the current retry.
246    /// - dur: The duration that will sleep before next retry.
247    ///
248    /// # Notes
249    ///
250    /// The intercept must be quick and non-blocking. No heavy IO is
251    /// allowed. Otherwise, the retry will be blocked.
252    fn intercept(&self, err: &Error, dur: Duration);
253}
254
255impl<F> RetryInterceptor for F
256where
257    F: Fn(&Error, Duration) + Send + Sync + 'static,
258{
259    fn intercept(&self, err: &Error, dur: Duration) {
260        self(err, dur);
261    }
262}
263
264/// The DefaultRetryInterceptor will log the retry error in warning level.
265pub struct DefaultRetryInterceptor;
266
267impl RetryInterceptor for DefaultRetryInterceptor {
268    fn intercept(&self, err: &Error, dur: Duration) {
269        warn!(
270            target: "opendal::layers::retry",
271            "will retry after {}s because: {}",
272            dur.as_secs_f64(), err)
273    }
274}
275
276pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
277    inner: Arc<A>,
278    builder: ExponentialBuilder,
279    notify: Arc<I>,
280}
281
282impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
283    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("RetryAccessor")
285            .field("inner", &self.inner)
286            .finish_non_exhaustive()
287    }
288}
289
290impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
291    type Inner = A;
292    type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
293    type Writer = RetryWrapper<A::Writer, I>;
294    type Lister = RetryWrapper<A::Lister, I>;
295    type Deleter = RetryWrapper<A::Deleter, I>;
296
297    fn inner(&self) -> &Self::Inner {
298        &self.inner
299    }
300
301    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
302        { || self.inner.create_dir(path, args.clone()) }
303            .retry(self.builder)
304            .when(|e| e.is_temporary())
305            .notify(|err, dur: Duration| self.notify.intercept(err, dur))
306            .await
307            .map_err(|e| e.set_persistent())
308    }
309
310    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
311        let (rp, reader) = { || self.inner.read(path, args.clone()) }
312            .retry(self.builder)
313            .when(|e| e.is_temporary())
314            .notify(|err, dur| self.notify.intercept(err, dur))
315            .await
316            .map_err(|e| e.set_persistent())?;
317
318        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
319        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
320
321        Ok((rp, retry_wrapper))
322    }
323
324    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
325        { || self.inner.write(path, args.clone()) }
326            .retry(self.builder)
327            .when(|e| e.is_temporary())
328            .notify(|err, dur| self.notify.intercept(err, dur))
329            .await
330            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
331            .map_err(|e| e.set_persistent())
332    }
333
334    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
335        { || self.inner.stat(path, args.clone()) }
336            .retry(self.builder)
337            .when(|e| e.is_temporary())
338            .notify(|err, dur| self.notify.intercept(err, dur))
339            .await
340            .map_err(|e| e.set_persistent())
341    }
342
343    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
344        { || self.inner.delete() }
345            .retry(self.builder)
346            .when(|e| e.is_temporary())
347            .notify(|err, dur| self.notify.intercept(err, dur))
348            .await
349            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
350            .map_err(|e| e.set_persistent())
351    }
352
353    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
354        { || self.inner.copy(from, to, args.clone()) }
355            .retry(self.builder)
356            .when(|e| e.is_temporary())
357            .notify(|err, dur| self.notify.intercept(err, dur))
358            .await
359            .map_err(|e| e.set_persistent())
360    }
361
362    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
363        { || self.inner.rename(from, to, args.clone()) }
364            .retry(self.builder)
365            .when(|e| e.is_temporary())
366            .notify(|err, dur| self.notify.intercept(err, dur))
367            .await
368            .map_err(|e| e.set_persistent())
369    }
370
371    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
372        { || self.inner.list(path, args.clone()) }
373            .retry(self.builder)
374            .when(|e| e.is_temporary())
375            .notify(|err, dur| self.notify.intercept(err, dur))
376            .await
377            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
378            .map_err(|e| e.set_persistent())
379    }
380}
381
382pub struct RetryReader<A, R> {
383    inner: Arc<A>,
384    reader: Option<R>,
385
386    path: String,
387    args: OpRead,
388}
389
390impl<A, R> RetryReader<A, R> {
391    fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
392        Self {
393            inner,
394            reader: Some(r),
395
396            path,
397            args,
398        }
399    }
400}
401
402impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
403    async fn read(&mut self) -> Result<Buffer> {
404        loop {
405            match self.reader.take() {
406                None => {
407                    let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
408                    self.reader = Some(r);
409                    continue;
410                }
411                Some(mut reader) => {
412                    let buf = reader.read().await?;
413                    self.reader = Some(reader);
414                    self.args.range_mut().advance(buf.len() as u64);
415                    return Ok(buf);
416                }
417            }
418        }
419    }
420}
421
422pub struct RetryWrapper<R, I> {
423    inner: Option<R>,
424    notify: Arc<I>,
425
426    builder: ExponentialBuilder,
427}
428
429impl<R, I> RetryWrapper<R, I> {
430    fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
431        Self {
432            inner: Some(inner),
433            notify,
434            builder: backoff,
435        }
436    }
437
438    fn take_inner(&mut self) -> Result<R> {
439        self.inner.take().ok_or_else(|| {
440            Error::new(
441                ErrorKind::Unexpected,
442                "retry layer is in bad state, please make sure future not dropped before ready",
443            )
444        })
445    }
446}
447
448impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
449    async fn read(&mut self) -> Result<Buffer> {
450        use backon::RetryableWithContext;
451
452        let inner = self.take_inner()?;
453
454        let (inner, res) = {
455            |mut r: R| async move {
456                let res = r.read().await;
457
458                (r, res)
459            }
460        }
461        .retry(self.builder)
462        .when(|e| e.is_temporary())
463        .context(inner)
464        .notify(|err, dur| self.notify.intercept(err, dur))
465        .await;
466
467        self.inner = Some(inner);
468        res.map_err(|err| err.set_persistent())
469    }
470}
471
472impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
473    async fn write(&mut self, bs: Buffer) -> Result<()> {
474        use backon::RetryableWithContext;
475
476        let inner = self.take_inner()?;
477
478        let ((inner, _), res) = {
479            |(mut r, bs): (R, Buffer)| async move {
480                let res = r.write(bs.clone()).await;
481
482                ((r, bs), res)
483            }
484        }
485        .retry(self.builder)
486        .when(|e| e.is_temporary())
487        .context((inner, bs))
488        .notify(|err, dur| self.notify.intercept(err, dur))
489        .await;
490
491        self.inner = Some(inner);
492        res.map_err(|err| err.set_persistent())
493    }
494
495    async fn abort(&mut self) -> Result<()> {
496        use backon::RetryableWithContext;
497
498        let inner = self.take_inner()?;
499
500        let (inner, res) = {
501            |mut r: R| async move {
502                let res = r.abort().await;
503
504                (r, res)
505            }
506        }
507        .retry(self.builder)
508        .when(|e| e.is_temporary())
509        .context(inner)
510        .notify(|err, dur| self.notify.intercept(err, dur))
511        .await;
512
513        self.inner = Some(inner);
514        res.map_err(|err| err.set_persistent())
515    }
516
517    async fn close(&mut self) -> Result<Metadata> {
518        use backon::RetryableWithContext;
519
520        let inner = self.take_inner()?;
521
522        let (inner, res) = {
523            |mut r: R| async move {
524                let res = r.close().await;
525
526                (r, res)
527            }
528        }
529        .retry(self.builder)
530        .when(|e| e.is_temporary())
531        .context(inner)
532        .notify(|err, dur| self.notify.intercept(err, dur))
533        .await;
534
535        self.inner = Some(inner);
536        res.map_err(|err| err.set_persistent())
537    }
538}
539
540impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
541    async fn next(&mut self) -> Result<Option<oio::Entry>> {
542        use backon::RetryableWithContext;
543
544        let inner = self.take_inner()?;
545
546        let (inner, res) = {
547            |mut p: P| async move {
548                let res = p.next().await;
549
550                (p, res)
551            }
552        }
553        .retry(self.builder)
554        .when(|e| e.is_temporary())
555        .context(inner)
556        .notify(|err, dur| self.notify.intercept(err, dur))
557        .await;
558
559        self.inner = Some(inner);
560        res.map_err(|err| err.set_persistent())
561    }
562}
563
564impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
565    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
566        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
567            .retry(self.builder)
568            .when(|e| e.is_temporary())
569            .notify(|err, dur| {
570                self.notify.intercept(err, dur);
571            })
572            .call()
573            .map_err(|e| e.set_persistent())
574    }
575
576    async fn flush(&mut self) -> Result<usize> {
577        use backon::RetryableWithContext;
578
579        let inner = self.take_inner()?;
580
581        let (inner, res) = {
582            |mut p: P| async move {
583                let res = p.flush().await;
584
585                (p, res)
586            }
587        }
588        .retry(self.builder)
589        .when(|e| e.is_temporary())
590        .context(inner)
591        .notify(|err, dur| self.notify.intercept(err, dur))
592        .await;
593
594        self.inner = Some(inner);
595        res.map_err(|err| err.set_persistent())
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use std::mem;
602    use std::sync::Arc;
603    use std::sync::Mutex;
604
605    use bytes::Bytes;
606    use futures::TryStreamExt;
607    use futures::stream;
608    use tracing_subscriber::filter::LevelFilter;
609
610    use super::*;
611    use crate::layers::LoggingLayer;
612
613    #[derive(Default, Clone)]
614    struct MockBuilder {
615        attempt: Arc<Mutex<usize>>,
616    }
617
618    impl Builder for MockBuilder {
619        type Config = ();
620
621        fn build(self) -> Result<impl Access> {
622            Ok(MockService {
623                attempt: self.attempt.clone(),
624            })
625        }
626    }
627
628    #[derive(Debug, Clone, Default)]
629    struct MockService {
630        attempt: Arc<Mutex<usize>>,
631    }
632
633    impl Access for MockService {
634        type Reader = MockReader;
635        type Writer = MockWriter;
636        type Lister = MockLister;
637        type Deleter = MockDeleter;
638
639        fn info(&self) -> Arc<AccessorInfo> {
640            let am = AccessorInfo::default();
641            am.set_scheme("mock").set_native_capability(Capability {
642                read: true,
643                write: true,
644                write_can_multi: true,
645                delete: true,
646                delete_max_size: Some(10),
647                stat: true,
648                list: true,
649                list_with_recursive: true,
650                ..Default::default()
651            });
652
653            am.into()
654        }
655
656        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
657            Ok(RpStat::new(
658                Metadata::new(EntryMode::FILE).with_content_length(13),
659            ))
660        }
661
662        async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
663            Ok((
664                RpRead::new(),
665                MockReader {
666                    buf: Bytes::from("Hello, World!").into(),
667                    range: args.range(),
668                    attempt: self.attempt.clone(),
669                },
670            ))
671        }
672
673        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
674            Ok((
675                RpDelete::default(),
676                MockDeleter {
677                    size: 0,
678                    attempt: self.attempt.clone(),
679                },
680            ))
681        }
682
683        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
684            Ok((RpWrite::new(), MockWriter {}))
685        }
686
687        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
688            let lister = MockLister::default();
689            Ok((RpList::default(), lister))
690        }
691    }
692
693    #[derive(Debug, Clone, Default)]
694    struct MockReader {
695        buf: Buffer,
696        range: BytesRange,
697        attempt: Arc<Mutex<usize>>,
698    }
699
700    impl oio::Read for MockReader {
701        async fn read(&mut self) -> Result<Buffer> {
702            let mut attempt = self.attempt.lock().unwrap();
703            *attempt += 1;
704
705            match *attempt {
706                1 => Err(
707                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
708                        .set_temporary(),
709                ),
710                2 => Err(
711                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
712                        .set_temporary(),
713                ),
714                // Should read out all data.
715                3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
716                4 => Err(
717                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
718                        .set_temporary(),
719                ),
720                // Should be empty.
721                5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
722                _ => unreachable!(),
723            }
724        }
725    }
726
727    #[derive(Debug, Clone, Default)]
728    struct MockWriter {}
729
730    impl oio::Write for MockWriter {
731        async fn write(&mut self, _: Buffer) -> Result<()> {
732            Ok(())
733        }
734
735        async fn close(&mut self) -> Result<Metadata> {
736            Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
737        }
738
739        async fn abort(&mut self) -> Result<()> {
740            Ok(())
741        }
742    }
743
744    #[derive(Debug, Clone, Default)]
745    struct MockLister {
746        attempt: usize,
747    }
748
749    impl oio::List for MockLister {
750        async fn next(&mut self) -> Result<Option<oio::Entry>> {
751            self.attempt += 1;
752            match self.attempt {
753                1 => Err(Error::new(
754                    ErrorKind::RateLimited,
755                    "retryable rate limited error from lister",
756                )
757                .set_temporary()),
758                2 => Ok(Some(oio::Entry::new(
759                    "hello",
760                    Metadata::new(EntryMode::FILE),
761                ))),
762                3 => Ok(Some(oio::Entry::new(
763                    "world",
764                    Metadata::new(EntryMode::FILE),
765                ))),
766                4 => Err(
767                    Error::new(ErrorKind::Unexpected, "retryable internal server error")
768                        .set_temporary(),
769                ),
770                5 => Ok(Some(oio::Entry::new(
771                    "2023/",
772                    Metadata::new(EntryMode::DIR),
773                ))),
774                6 => Ok(Some(oio::Entry::new(
775                    "0208/",
776                    Metadata::new(EntryMode::DIR),
777                ))),
778                7 => Ok(None),
779                _ => {
780                    unreachable!()
781                }
782            }
783        }
784    }
785
786    #[derive(Debug, Clone, Default)]
787    struct MockDeleter {
788        size: usize,
789        attempt: Arc<Mutex<usize>>,
790    }
791
792    impl oio::Delete for MockDeleter {
793        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
794            self.size += 1;
795            Ok(())
796        }
797
798        async fn flush(&mut self) -> Result<usize> {
799            let mut attempt = self.attempt.lock().unwrap();
800            *attempt += 1;
801
802            match *attempt {
803                1 => Err(
804                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
805                        .set_temporary(),
806                ),
807                2 => {
808                    self.size -= 1;
809                    Ok(1)
810                }
811                3 => Err(
812                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
813                        .set_temporary(),
814                ),
815                4 => Err(
816                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
817                        .set_temporary(),
818                ),
819                5 => {
820                    let s = mem::take(&mut self.size);
821                    Ok(s)
822                }
823                _ => unreachable!(),
824            }
825        }
826    }
827
828    #[tokio::test]
829    async fn test_retry_read() {
830        let _ = tracing_subscriber::fmt()
831            .with_max_level(LevelFilter::TRACE)
832            .with_test_writer()
833            .try_init();
834
835        let builder = MockBuilder::default();
836        let op = Operator::new(builder.clone())
837            .unwrap()
838            .layer(LoggingLayer::default())
839            .layer(RetryLayer::new())
840            .finish();
841
842        let r = op.reader("retryable_error").await.unwrap();
843        let mut content = Vec::new();
844        let size = r
845            .read_into(&mut content, ..)
846            .await
847            .expect("read must succeed");
848        assert_eq!(size, 13);
849        assert_eq!(content, "Hello, World!".as_bytes());
850        // The error is retryable, we should request it 3 times.
851        assert_eq!(*builder.attempt.lock().unwrap(), 5);
852    }
853
854    /// This test is used to reproduce the panic issue while composing retry layer with timeout layer.
855    #[tokio::test]
856    async fn test_retry_write_fail_on_close() {
857        let _ = tracing_subscriber::fmt()
858            .with_max_level(LevelFilter::TRACE)
859            .with_test_writer()
860            .try_init();
861
862        let builder = MockBuilder::default();
863        let op = Operator::new(builder.clone())
864            .unwrap()
865            .layer(
866                RetryLayer::new()
867                    .with_min_delay(Duration::from_millis(1))
868                    .with_max_delay(Duration::from_millis(1))
869                    .with_jitter(),
870            )
871            // Uncomment this to reproduce timeout layer panic.
872            // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
873            .layer(LoggingLayer::default())
874            .finish();
875
876        let mut w = op.writer("test_write").await.unwrap();
877        w.write("aaa").await.unwrap();
878        w.write("bbb").await.unwrap();
879        match w.close().await {
880            Ok(_) => (),
881            Err(_) => {
882                w.abort().await.unwrap();
883            }
884        };
885    }
886
887    #[tokio::test]
888    async fn test_retry_list() {
889        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
890
891        let builder = MockBuilder::default();
892        let op = Operator::new(builder.clone())
893            .unwrap()
894            .layer(RetryLayer::new())
895            .finish();
896
897        let expected = vec!["hello", "world", "2023/", "0208/"];
898
899        let mut lister = op
900            .lister("retryable_error/")
901            .await
902            .expect("service must support list");
903        let mut actual = Vec::new();
904        while let Some(obj) = lister.try_next().await.expect("must success") {
905            actual.push(obj.name().to_owned());
906        }
907
908        assert_eq!(actual, expected);
909    }
910
911    #[tokio::test]
912    async fn test_retry_batch() {
913        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
914
915        let builder = MockBuilder::default();
916        // set to a lower delay to make it run faster
917        let op = Operator::new(builder.clone())
918            .unwrap()
919            .layer(
920                RetryLayer::new()
921                    .with_min_delay(Duration::from_secs_f32(0.1))
922                    .with_max_times(5),
923            )
924            .finish();
925
926        let paths = vec!["hello", "world", "test", "batch"];
927        op.delete_stream(stream::iter(paths)).await.unwrap();
928        assert_eq!(*builder.attempt.lock().unwrap(), 5);
929    }
930}