opendal_core/raw/oio/copy/
api.rs1use std::future::Future;
19use std::ops::DerefMut;
20
21use crate::raw::*;
22use crate::*;
23
24pub type Copier = Box<dyn CopyDyn>;
26
27pub trait Copy: Unpin + Send + Sync {
29 fn next(&mut self) -> impl Future<Output = Result<Option<usize>>> + MaybeSend;
34
35 fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
37
38 fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
40}
41
42impl Copy for () {
43 async fn next(&mut self) -> Result<Option<usize>> {
44 Ok(None)
45 }
46
47 async fn close(&mut self) -> Result<Metadata> {
48 Ok(Metadata::default())
49 }
50
51 async fn abort(&mut self) -> Result<()> {
52 Ok(())
53 }
54}
55
56pub struct OneShotCopier {
58 factory: Option<Box<dyn FnMut() -> BoxedStaticFuture<Result<Metadata>>>>,
59 fut: Option<BoxedStaticFuture<Result<Metadata>>>,
60 meta: Option<Metadata>,
61 consumed: bool,
62}
63
64unsafe impl Sync for OneShotCopier {}
68
69unsafe impl Send for OneShotCopier {}
73
74impl OneShotCopier {
75 pub fn new(fut: impl Future<Output = Result<Metadata>> + MaybeSend + 'static) -> Self {
77 Self {
78 factory: None,
79 fut: Some(Box::pin(fut)),
80 meta: None,
81 consumed: false,
82 }
83 }
84
85 pub fn new_with<F, Fut>(mut factory: F) -> Self
90 where
91 F: FnMut() -> Fut + 'static,
92 Fut: Future<Output = Result<Metadata>> + MaybeSend + 'static,
93 {
94 Self {
95 factory: Some(Box::new(move || {
96 Box::pin(factory()) as BoxedStaticFuture<Result<Metadata>>
97 })),
98 fut: None,
99 meta: None,
100 consumed: false,
101 }
102 }
103
104 pub fn completed() -> Self {
106 Self {
107 factory: None,
108 fut: None,
109 meta: Some(Metadata::default()),
110 consumed: true,
111 }
112 }
113}
114
115impl Copy for OneShotCopier {
116 async fn next(&mut self) -> Result<Option<usize>> {
117 if self.meta.is_none() {
118 self.close().await?;
119 }
120
121 Ok(None)
122 }
123
124 async fn close(&mut self) -> Result<Metadata> {
125 if let Some(meta) = self.meta.clone() {
126 return Ok(meta);
127 }
128
129 if self.consumed {
130 return Err(Error::new(
131 ErrorKind::Unexpected,
132 "one-shot copier has already been consumed",
133 )
134 .set_persistent());
135 }
136
137 if self.fut.is_none() {
138 if let Some(factory) = self.factory.as_mut() {
139 self.fut = Some(factory());
140 }
141 }
142
143 if let Some(fut) = self.fut.take() {
144 match fut.await {
145 Ok(meta) => {
146 self.consumed = true;
147 self.meta = Some(meta.clone());
148 return Ok(meta);
149 }
150 Err(err) => {
151 if self.factory.is_none() {
152 self.consumed = true;
153 return Err(err.set_persistent());
154 }
155
156 return Err(err);
157 }
158 }
159 }
160
161 self.consumed = true;
162 Err(Error::new(
163 ErrorKind::Unexpected,
164 "one-shot copier has no future to drive",
165 )
166 .set_persistent())
167 }
168
169 async fn abort(&mut self) -> Result<()> {
170 self.factory = None;
171 self.fut = None;
172 self.meta = None;
173 self.consumed = true;
174 Ok(())
175 }
176}
177
178pub trait CopyDyn: Unpin + Send + Sync {
180 fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<usize>>>;
182
183 fn close_dyn(&mut self) -> BoxedFuture<'_, Result<Metadata>>;
185
186 fn abort_dyn(&mut self) -> BoxedFuture<'_, Result<()>>;
188}
189
190impl<T: Copy + ?Sized> CopyDyn for T {
191 fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<usize>>> {
192 Box::pin(self.next())
193 }
194
195 fn close_dyn(&mut self) -> BoxedFuture<'_, Result<Metadata>> {
196 Box::pin(self.close())
197 }
198
199 fn abort_dyn(&mut self) -> BoxedFuture<'_, Result<()>> {
200 Box::pin(self.abort())
201 }
202}
203
204impl<T: CopyDyn + ?Sized> Copy for Box<T> {
205 async fn next(&mut self) -> Result<Option<usize>> {
206 self.deref_mut().next_dyn().await
207 }
208
209 async fn close(&mut self) -> Result<Metadata> {
210 self.deref_mut().close_dyn().await
211 }
212
213 async fn abort(&mut self) -> Result<()> {
214 self.deref_mut().abort_dyn().await
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221 use std::sync::atomic::AtomicUsize;
222 use std::sync::atomic::Ordering;
223
224 use super::*;
225
226 #[tokio::test]
227 async fn one_shot_copier_rebuilds_future_after_error() {
228 let attempts = Arc::new(AtomicUsize::new(0));
229 let retry_attempts = attempts.clone();
230
231 let mut copier = OneShotCopier::new_with(move || {
232 let attempts = retry_attempts.clone();
233
234 async move {
235 if attempts.fetch_add(1, Ordering::Relaxed) == 0 {
236 return Err(
237 Error::new(ErrorKind::Unexpected, "temporary copy failure").set_temporary()
238 );
239 }
240
241 Ok(Metadata::new(EntryMode::FILE))
242 }
243 });
244
245 assert!(copier.close().await.unwrap_err().is_temporary());
246 assert_eq!(copier.close().await.unwrap().mode(), EntryMode::FILE);
247 assert_eq!(attempts.load(Ordering::Relaxed), 2);
248 }
249
250 #[tokio::test]
251 async fn one_shot_copier_does_not_succeed_after_consumed_error() {
252 let mut copier = OneShotCopier::new(async {
253 Err(Error::new(ErrorKind::Unexpected, "copy failure").set_temporary())
254 });
255
256 let err = copier.close().await.unwrap_err();
257 assert_eq!(err.kind(), ErrorKind::Unexpected);
258 assert!(err.is_persistent());
259
260 let err = copier.close().await.unwrap_err();
261 assert_eq!(err.kind(), ErrorKind::Unexpected);
262 assert!(err.is_persistent());
263 }
264}