opendal_core/raw/oio/copy/
block_copy.rs1use std::future::Future;
19use std::sync::Arc;
20
21use futures::FutureExt;
22use futures::select;
23use uuid::Uuid;
24
25use crate::raw::*;
26use crate::*;
27
28pub trait BlockCopy: Send + Sync + Unpin + 'static {
34 fn source_metadata(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
39
40 fn copy_once(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
45
46 fn copy_block(
48 &self,
49 block_id: Uuid,
50 range: BytesRange,
51 ) -> impl Future<Output = Result<()>> + MaybeSend;
52
53 fn complete_block(
55 &self,
56 block_ids: Vec<Uuid>,
57 ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
58
59 fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
61}
62
63struct CopyInput<C: BlockCopy> {
64 copier: Arc<C>,
65 executor: Executor,
66 block_id: Uuid,
67 block_number: usize,
68 range: BytesRange,
69}
70
71impl<C: BlockCopy> Clone for CopyInput<C> {
72 fn clone(&self) -> Self {
73 Self {
74 copier: self.copier.clone(),
75 executor: self.executor.clone(),
76 block_id: self.block_id,
77 block_number: self.block_number,
78 range: self.range,
79 }
80 }
81}
82
83struct CopiedBlock {
84 block_id: Uuid,
85 block_number: usize,
86 size: u64,
87}
88
89pub struct BlockCopier<C: BlockCopy> {
91 copier: Arc<C>,
92 executor: Executor,
93
94 block_ids: Vec<(usize, Uuid)>,
95 scheduled_block_ids: Vec<Uuid>,
96 next_block_number: usize,
97 next_offset: u64,
98 source_size: Option<u64>,
99 copy_once_threshold: u64,
100 block_size: u64,
101 concurrent: usize,
102 completed: bool,
103 metadata: Option<Metadata>,
104
105 tasks: ConcurrentTasks<CopyInput<C>, CopiedBlock>,
106}
107
108impl<C: BlockCopy> BlockCopier<C> {
109 pub fn new(
111 info: Arc<AccessorInfo>,
112 inner: C,
113 source_content_length_hint: Option<u64>,
114 copy_once_threshold: u64,
115 block_size: u64,
116 concurrent: usize,
117 ) -> Self {
118 let copier = Arc::new(inner);
119 let executor = info.executor();
120 let concurrent = concurrent.max(1);
121
122 Self {
123 copier,
124 executor: executor.clone(),
125 block_ids: Vec::new(),
126 scheduled_block_ids: Vec::new(),
127 next_block_number: 0,
128 next_offset: 0,
129 source_size: source_content_length_hint,
130 copy_once_threshold,
131 block_size,
132 concurrent,
133 completed: false,
134 metadata: None,
135
136 tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
137 Box::pin(async move {
138 let size = input.range.size().expect("block copy range must be sized");
139 let fut = input.copier.copy_block(input.block_id, input.range);
140
141 let result = match input.executor.timeout() {
142 None => fut.await.map(|_| CopiedBlock {
143 block_id: input.block_id,
144 block_number: input.block_number,
145 size,
146 }),
147 Some(timeout) => {
148 select! {
149 result = fut.fuse() => {
150 result.map(|_| CopiedBlock {
151 block_id: input.block_id,
152 block_number: input.block_number,
153 size,
154 })
155 }
156 _ = timeout.fuse() => {
157 Err(Error::new(
158 ErrorKind::Unexpected, "copy block timeout")
159 .with_context("block_id", input.block_id.to_string())
160 .set_temporary())
161 }
162 }
163 }
164 };
165
166 (input, result)
167 })
168 }),
169 }
170 }
171
172 async fn source_size(&mut self) -> Result<u64> {
173 match self.source_size {
174 Some(size) => Ok(size),
175 None => {
176 let size = self.copier.source_metadata().await?.content_length();
177 self.source_size = Some(size);
178 Ok(size)
179 }
180 }
181 }
182
183 async fn fill_tasks(&mut self, source_size: u64) -> Result<()> {
184 let mut scheduled = 0;
185
186 while self.next_offset < source_size
187 && self.tasks.has_remaining()
188 && scheduled < self.concurrent
189 {
190 let size = self.block_size.min(source_size - self.next_offset);
191 let range = BytesRange::new(self.next_offset, Some(size));
192
193 let input = CopyInput {
194 copier: self.copier.clone(),
195 executor: self.executor.clone(),
196 block_id: Uuid::new_v4(),
197 block_number: self.next_block_number,
198 range,
199 };
200
201 loop {
202 match self.tasks.execute(input.clone()).await {
203 Ok(()) => break,
204 Err(err) if err.is_temporary() => continue,
205 Err(err) => return Err(err),
206 }
207 }
208
209 self.scheduled_block_ids.push(input.block_id);
210 self.next_offset += size;
211 self.next_block_number += 1;
212 scheduled += 1;
213
214 if self.tasks.has_result() {
215 break;
216 }
217 }
218
219 Ok(())
220 }
221}
222
223impl<C> oio::Copy for BlockCopier<C>
224where
225 C: BlockCopy,
226{
227 async fn next(&mut self) -> Result<Option<usize>> {
228 if self.completed {
229 return Ok(None);
230 }
231
232 let source_size = self.source_size().await?;
233
234 if self.block_ids.is_empty() && source_size <= self.copy_once_threshold {
235 self.metadata = Some(self.copier.copy_once().await?);
236 self.completed = true;
237 return Ok(None);
238 }
239
240 self.fill_tasks(source_size).await?;
241
242 loop {
243 match self.tasks.next().await {
244 Some(Ok(result)) => {
245 let size = result.size.try_into().map_err(|_| {
246 Error::new(ErrorKind::Unexpected, "block copy size exceeds usize")
247 })?;
248 self.block_ids.push((result.block_number, result.block_id));
249 return Ok(Some(size));
250 }
251 Some(Err(err)) if err.is_temporary() => continue,
252 Some(Err(err)) => return Err(err),
253 None => break,
254 }
255 }
256
257 if self.block_ids.len() != self.next_block_number {
258 return Err(Error::new(
259 ErrorKind::Unexpected,
260 "block copy numbers mismatch, please report bug to opendal",
261 )
262 .with_context("expected", self.next_block_number)
263 .with_context("actual", self.block_ids.len()));
264 }
265
266 self.block_ids
267 .sort_by_key(|(block_number, _)| *block_number);
268 let block_ids = self
269 .block_ids
270 .iter()
271 .map(|(_, block_id)| *block_id)
272 .collect();
273 self.metadata = Some(self.copier.complete_block(block_ids).await?);
274 self.completed = true;
275 Ok(None)
276 }
277
278 async fn close(&mut self) -> Result<Metadata> {
279 while !self.completed {
280 self.next().await?;
281 }
282
283 Ok(self.metadata.clone().unwrap_or_default())
284 }
285
286 async fn abort(&mut self) -> Result<()> {
287 self.tasks.clear();
288 if self.scheduled_block_ids.is_empty() {
289 self.completed = true;
290 self.metadata = None;
291 return Ok(());
292 }
293
294 self.copier
295 .abort_block(self.scheduled_block_ids.clone())
296 .await?;
297 self.completed = true;
298 self.metadata = None;
299 Ok(())
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use std::collections::HashMap;
306 use std::sync::Mutex;
307
308 use tokio::time::Duration;
309 use tokio::time::sleep;
310
311 use super::*;
312 use crate::raw::oio::Copy;
313
314 #[derive(Default)]
315 struct TestState {
316 ranges: HashMap<Uuid, BytesRange>,
317 completed_ranges: Vec<BytesRange>,
318 }
319
320 struct TestCopy {
321 state: Arc<Mutex<TestState>>,
322 }
323
324 impl BlockCopy for TestCopy {
325 async fn source_metadata(&self) -> Result<Metadata> {
326 Ok(Metadata::default().with_content_length(4))
327 }
328
329 async fn copy_once(&self) -> Result<Metadata> {
330 Ok(Metadata::default())
331 }
332
333 async fn copy_block(&self, block_id: Uuid, range: BytesRange) -> Result<()> {
334 if range.offset() == 0 {
335 sleep(Duration::from_millis(50)).await;
336 }
337
338 self.state
339 .lock()
340 .expect("test state mutex poisoned")
341 .ranges
342 .insert(block_id, range);
343
344 Ok(())
345 }
346
347 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
348 let mut state = self.state.lock().expect("test state mutex poisoned");
349 state.completed_ranges = block_ids
350 .into_iter()
351 .map(|block_id| state.ranges[&block_id])
352 .collect();
353 Ok(Metadata::default())
354 }
355
356 async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
357 Ok(())
358 }
359 }
360
361 #[tokio::test]
362 async fn test_block_copier_completes_blocks_in_source_order() -> Result<()> {
363 let state = Arc::new(Mutex::new(TestState::default()));
364 let inner = TestCopy {
365 state: state.clone(),
366 };
367 let mut copier = BlockCopier::new(Arc::default(), inner, None, 0, 2, 2);
368
369 while copier.next().await?.is_some() {}
370
371 let completed_ranges = state
372 .lock()
373 .expect("test state mutex poisoned")
374 .completed_ranges
375 .clone();
376 assert_eq!(
377 completed_ranges,
378 vec![BytesRange::new(0, Some(2)), BytesRange::new(2, Some(2))]
379 );
380
381 Ok(())
382 }
383}