Skip to main content

opendal_core/raw/oio/copy/
api.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::ops::DerefMut;
20
21use crate::raw::*;
22use crate::*;
23
24/// Copier is the type-erased [`Copy`].
25pub type Copier = Box<dyn CopyDyn>;
26
27/// Copy is the trait that OpenDAL returns for stateful copy operations.
28pub trait Copy: Unpin + Send + Sync {
29    /// Drive the copy operation forward.
30    ///
31    /// `Ok(Some(n))` means the copy operation made progress by `n` bytes.
32    /// `Ok(None)` means the copy operation has completed.
33    fn next(&mut self) -> impl Future<Output = Result<Option<usize>>> + MaybeSend;
34
35    /// Close the copier and return metadata from the server-side completion response.
36    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
37
38    /// Abort the pending copy operation.
39    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
40}
41
42impl Copy for () {
43    async fn next(&mut self) -> Result<Option<usize>> {
44        Ok(None)
45    }
46
47    async fn close(&mut self) -> Result<Metadata> {
48        Ok(Metadata::default())
49    }
50
51    async fn abort(&mut self) -> Result<()> {
52        Ok(())
53    }
54}
55
56/// OneShotCopier drives a single asynchronous copy step.
57pub struct OneShotCopier {
58    factory: Option<Box<dyn FnMut() -> BoxedStaticFuture<Result<Metadata>>>>,
59    fut: Option<BoxedStaticFuture<Result<Metadata>>>,
60    meta: Option<Metadata>,
61    consumed: bool,
62}
63
64/// # Safety
65///
66/// OneShotCopier is only accessed by `&mut self`.
67unsafe impl Sync for OneShotCopier {}
68
69/// # Safety
70///
71/// On wasm targets, futures are local but still only polled through `&mut self`.
72unsafe impl Send for OneShotCopier {}
73
74impl OneShotCopier {
75    /// Create a new one-shot copier.
76    pub fn new(fut: impl Future<Output = Result<Metadata>> + MaybeSend + 'static) -> Self {
77        Self {
78            factory: None,
79            fut: Some(Box::pin(fut)),
80            meta: None,
81            consumed: false,
82        }
83    }
84
85    /// Create a new one-shot copier with a future factory.
86    ///
87    /// The factory will be called again if the previous future fails, allowing
88    /// upper layers like retry to rerun the one-shot copy body.
89    pub fn new_with<F, Fut>(mut factory: F) -> Self
90    where
91        F: FnMut() -> Fut + 'static,
92        Fut: Future<Output = Result<Metadata>> + MaybeSend + 'static,
93    {
94        Self {
95            factory: Some(Box::new(move || {
96                Box::pin(factory()) as BoxedStaticFuture<Result<Metadata>>
97            })),
98            fut: None,
99            meta: None,
100            consumed: false,
101        }
102    }
103
104    /// Create a one-shot copier that has already completed.
105    pub fn completed() -> Self {
106        Self {
107            factory: None,
108            fut: None,
109            meta: Some(Metadata::default()),
110            consumed: true,
111        }
112    }
113}
114
115impl Copy for OneShotCopier {
116    async fn next(&mut self) -> Result<Option<usize>> {
117        if self.meta.is_none() {
118            self.close().await?;
119        }
120
121        Ok(None)
122    }
123
124    async fn close(&mut self) -> Result<Metadata> {
125        if let Some(meta) = self.meta.clone() {
126            return Ok(meta);
127        }
128
129        if self.consumed {
130            return Err(Error::new(
131                ErrorKind::Unexpected,
132                "one-shot copier has already been consumed",
133            )
134            .set_persistent());
135        }
136
137        if self.fut.is_none() {
138            if let Some(factory) = self.factory.as_mut() {
139                self.fut = Some(factory());
140            }
141        }
142
143        if let Some(fut) = self.fut.take() {
144            match fut.await {
145                Ok(meta) => {
146                    self.consumed = true;
147                    self.meta = Some(meta.clone());
148                    return Ok(meta);
149                }
150                Err(err) => {
151                    if self.factory.is_none() {
152                        self.consumed = true;
153                        return Err(err.set_persistent());
154                    }
155
156                    return Err(err);
157                }
158            }
159        }
160
161        self.consumed = true;
162        Err(Error::new(
163            ErrorKind::Unexpected,
164            "one-shot copier has no future to drive",
165        )
166        .set_persistent())
167    }
168
169    async fn abort(&mut self) -> Result<()> {
170        self.factory = None;
171        self.fut = None;
172        self.meta = None;
173        self.consumed = true;
174        Ok(())
175    }
176}
177
178/// CopyDyn is the dyn version of [`Copy`].
179pub trait CopyDyn: Unpin + Send + Sync {
180    /// The dyn version of [`Copy::next`].
181    fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<usize>>>;
182
183    /// The dyn version of [`Copy::close`].
184    fn close_dyn(&mut self) -> BoxedFuture<'_, Result<Metadata>>;
185
186    /// The dyn version of [`Copy::abort`].
187    fn abort_dyn(&mut self) -> BoxedFuture<'_, Result<()>>;
188}
189
190impl<T: Copy + ?Sized> CopyDyn for T {
191    fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<usize>>> {
192        Box::pin(self.next())
193    }
194
195    fn close_dyn(&mut self) -> BoxedFuture<'_, Result<Metadata>> {
196        Box::pin(self.close())
197    }
198
199    fn abort_dyn(&mut self) -> BoxedFuture<'_, Result<()>> {
200        Box::pin(self.abort())
201    }
202}
203
204impl<T: CopyDyn + ?Sized> Copy for Box<T> {
205    async fn next(&mut self) -> Result<Option<usize>> {
206        self.deref_mut().next_dyn().await
207    }
208
209    async fn close(&mut self) -> Result<Metadata> {
210        self.deref_mut().close_dyn().await
211    }
212
213    async fn abort(&mut self) -> Result<()> {
214        self.deref_mut().abort_dyn().await
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::Arc;
221    use std::sync::atomic::AtomicUsize;
222    use std::sync::atomic::Ordering;
223
224    use super::*;
225
226    #[tokio::test]
227    async fn one_shot_copier_rebuilds_future_after_error() {
228        let attempts = Arc::new(AtomicUsize::new(0));
229        let retry_attempts = attempts.clone();
230
231        let mut copier = OneShotCopier::new_with(move || {
232            let attempts = retry_attempts.clone();
233
234            async move {
235                if attempts.fetch_add(1, Ordering::Relaxed) == 0 {
236                    return Err(
237                        Error::new(ErrorKind::Unexpected, "temporary copy failure").set_temporary()
238                    );
239                }
240
241                Ok(Metadata::new(EntryMode::FILE))
242            }
243        });
244
245        assert!(copier.close().await.unwrap_err().is_temporary());
246        assert_eq!(copier.close().await.unwrap().mode(), EntryMode::FILE);
247        assert_eq!(attempts.load(Ordering::Relaxed), 2);
248    }
249
250    #[tokio::test]
251    async fn one_shot_copier_does_not_succeed_after_consumed_error() {
252        let mut copier = OneShotCopier::new(async {
253            Err(Error::new(ErrorKind::Unexpected, "copy failure").set_temporary())
254        });
255
256        let err = copier.close().await.unwrap_err();
257        assert_eq!(err.kind(), ErrorKind::Unexpected);
258        assert!(err.is_persistent());
259
260        let err = copier.close().await.unwrap_err();
261        assert_eq!(err.kind(), ErrorKind::Unexpected);
262        assert!(err.is_persistent());
263    }
264}