opendal_core/raw/oio/copy/
block_copy.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::Arc;
20
21use futures::FutureExt;
22use futures::select;
23use uuid::Uuid;
24
25use crate::raw::*;
26use crate::*;
27
28/// BlockCopy is used to implement [`oio::Copy`] based on block copy.
29///
30/// By implementing BlockCopy, services only need to provide service-specific
31/// block copy operations. [`BlockCopier`] will drive source metadata loading,
32/// block id generation, block queue, completion, and abort state.
33pub trait BlockCopy: Send + Sync + Unpin + 'static {
34    /// source_metadata returns source metadata for planning block copy.
35    ///
36    /// BlockCopier will call this API when source content length hint is not
37    /// provided.
38    fn source_metadata(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
39
40    /// copy_once is used to copy the source object at once.
41    ///
42    /// BlockCopier will call this API when the source object can be copied
43    /// without starting block copy.
44    fn copy_once(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
45
46    /// copy_block copies one source range into one block.
47    fn copy_block(
48        &self,
49        block_id: Uuid,
50        range: BytesRange,
51    ) -> impl Future<Output = Result<()>> + MaybeSend;
52
53    /// complete_block completes the block copy with the ordered block id list.
54    fn complete_block(
55        &self,
56        block_ids: Vec<Uuid>,
57    ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
58
59    /// abort_block cancels the pending block copy and purges intermediate state.
60    fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
61}
62
63struct CopyInput<C: BlockCopy> {
64    copier: Arc<C>,
65    executor: Executor,
66    block_id: Uuid,
67    block_number: usize,
68    range: BytesRange,
69}
70
71impl<C: BlockCopy> Clone for CopyInput<C> {
72    fn clone(&self) -> Self {
73        Self {
74            copier: self.copier.clone(),
75            executor: self.executor.clone(),
76            block_id: self.block_id,
77            block_number: self.block_number,
78            range: self.range,
79        }
80    }
81}
82
83struct CopiedBlock {
84    block_id: Uuid,
85    block_number: usize,
86    size: u64,
87}
88
89/// BlockCopier implements [`oio::Copy`] based on block copy.
90pub struct BlockCopier<C: BlockCopy> {
91    copier: Arc<C>,
92    executor: Executor,
93
94    block_ids: Vec<(usize, Uuid)>,
95    scheduled_block_ids: Vec<Uuid>,
96    next_block_number: usize,
97    next_offset: u64,
98    source_size: Option<u64>,
99    copy_once_threshold: u64,
100    block_size: u64,
101    concurrent: usize,
102    completed: bool,
103    metadata: Option<Metadata>,
104
105    tasks: ConcurrentTasks<CopyInput<C>, CopiedBlock>,
106}
107
108impl<C: BlockCopy> BlockCopier<C> {
109    /// Create a new BlockCopier.
110    pub fn new(
111        info: Arc<AccessorInfo>,
112        inner: C,
113        source_content_length_hint: Option<u64>,
114        copy_once_threshold: u64,
115        block_size: u64,
116        concurrent: usize,
117    ) -> Self {
118        let copier = Arc::new(inner);
119        let executor = info.executor();
120        let concurrent = concurrent.max(1);
121
122        Self {
123            copier,
124            executor: executor.clone(),
125            block_ids: Vec::new(),
126            scheduled_block_ids: Vec::new(),
127            next_block_number: 0,
128            next_offset: 0,
129            source_size: source_content_length_hint,
130            copy_once_threshold,
131            block_size,
132            concurrent,
133            completed: false,
134            metadata: None,
135
136            tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
137                Box::pin(async move {
138                    let size = input.range.size().expect("block copy range must be sized");
139                    let fut = input.copier.copy_block(input.block_id, input.range);
140
141                    let result = match input.executor.timeout() {
142                        None => fut.await.map(|_| CopiedBlock {
143                            block_id: input.block_id,
144                            block_number: input.block_number,
145                            size,
146                        }),
147                        Some(timeout) => {
148                            select! {
149                                result = fut.fuse() => {
150                                    result.map(|_| CopiedBlock {
151                                        block_id: input.block_id,
152                                        block_number: input.block_number,
153                                        size,
154                                    })
155                                }
156                                _ = timeout.fuse() => {
157                                    Err(Error::new(
158                                        ErrorKind::Unexpected, "copy block timeout")
159                                            .with_context("block_id", input.block_id.to_string())
160                                            .set_temporary())
161                                }
162                            }
163                        }
164                    };
165
166                    (input, result)
167                })
168            }),
169        }
170    }
171
172    async fn source_size(&mut self) -> Result<u64> {
173        match self.source_size {
174            Some(size) => Ok(size),
175            None => {
176                let size = self.copier.source_metadata().await?.content_length();
177                self.source_size = Some(size);
178                Ok(size)
179            }
180        }
181    }
182
183    async fn fill_tasks(&mut self, source_size: u64) -> Result<()> {
184        let mut scheduled = 0;
185
186        while self.next_offset < source_size
187            && self.tasks.has_remaining()
188            && scheduled < self.concurrent
189        {
190            let size = self.block_size.min(source_size - self.next_offset);
191            let range = BytesRange::new(self.next_offset, Some(size));
192
193            let input = CopyInput {
194                copier: self.copier.clone(),
195                executor: self.executor.clone(),
196                block_id: Uuid::new_v4(),
197                block_number: self.next_block_number,
198                range,
199            };
200
201            loop {
202                match self.tasks.execute(input.clone()).await {
203                    Ok(()) => break,
204                    Err(err) if err.is_temporary() => continue,
205                    Err(err) => return Err(err),
206                }
207            }
208
209            self.scheduled_block_ids.push(input.block_id);
210            self.next_offset += size;
211            self.next_block_number += 1;
212            scheduled += 1;
213
214            if self.tasks.has_result() {
215                break;
216            }
217        }
218
219        Ok(())
220    }
221}
222
223impl<C> oio::Copy for BlockCopier<C>
224where
225    C: BlockCopy,
226{
227    async fn next(&mut self) -> Result<Option<usize>> {
228        if self.completed {
229            return Ok(None);
230        }
231
232        let source_size = self.source_size().await?;
233
234        if self.block_ids.is_empty() && source_size <= self.copy_once_threshold {
235            self.metadata = Some(self.copier.copy_once().await?);
236            self.completed = true;
237            return Ok(None);
238        }
239
240        self.fill_tasks(source_size).await?;
241
242        loop {
243            match self.tasks.next().await {
244                Some(Ok(result)) => {
245                    let size = result.size.try_into().map_err(|_| {
246                        Error::new(ErrorKind::Unexpected, "block copy size exceeds usize")
247                    })?;
248                    self.block_ids.push((result.block_number, result.block_id));
249                    return Ok(Some(size));
250                }
251                Some(Err(err)) if err.is_temporary() => continue,
252                Some(Err(err)) => return Err(err),
253                None => break,
254            }
255        }
256
257        if self.block_ids.len() != self.next_block_number {
258            return Err(Error::new(
259                ErrorKind::Unexpected,
260                "block copy numbers mismatch, please report bug to opendal",
261            )
262            .with_context("expected", self.next_block_number)
263            .with_context("actual", self.block_ids.len()));
264        }
265
266        self.block_ids
267            .sort_by_key(|(block_number, _)| *block_number);
268        let block_ids = self
269            .block_ids
270            .iter()
271            .map(|(_, block_id)| *block_id)
272            .collect();
273        self.metadata = Some(self.copier.complete_block(block_ids).await?);
274        self.completed = true;
275        Ok(None)
276    }
277
278    async fn close(&mut self) -> Result<Metadata> {
279        while !self.completed {
280            self.next().await?;
281        }
282
283        Ok(self.metadata.clone().unwrap_or_default())
284    }
285
286    async fn abort(&mut self) -> Result<()> {
287        self.tasks.clear();
288        if self.scheduled_block_ids.is_empty() {
289            self.completed = true;
290            self.metadata = None;
291            return Ok(());
292        }
293
294        self.copier
295            .abort_block(self.scheduled_block_ids.clone())
296            .await?;
297        self.completed = true;
298        self.metadata = None;
299        Ok(())
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use std::collections::HashMap;
306    use std::sync::Mutex;
307
308    use tokio::time::Duration;
309    use tokio::time::sleep;
310
311    use super::*;
312    use crate::raw::oio::Copy;
313
314    #[derive(Default)]
315    struct TestState {
316        ranges: HashMap<Uuid, BytesRange>,
317        completed_ranges: Vec<BytesRange>,
318    }
319
320    struct TestCopy {
321        state: Arc<Mutex<TestState>>,
322    }
323
324    impl BlockCopy for TestCopy {
325        async fn source_metadata(&self) -> Result<Metadata> {
326            Ok(Metadata::default().with_content_length(4))
327        }
328
329        async fn copy_once(&self) -> Result<Metadata> {
330            Ok(Metadata::default())
331        }
332
333        async fn copy_block(&self, block_id: Uuid, range: BytesRange) -> Result<()> {
334            if range.offset() == 0 {
335                sleep(Duration::from_millis(50)).await;
336            }
337
338            self.state
339                .lock()
340                .expect("test state mutex poisoned")
341                .ranges
342                .insert(block_id, range);
343
344            Ok(())
345        }
346
347        async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
348            let mut state = self.state.lock().expect("test state mutex poisoned");
349            state.completed_ranges = block_ids
350                .into_iter()
351                .map(|block_id| state.ranges[&block_id])
352                .collect();
353            Ok(Metadata::default())
354        }
355
356        async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
357            Ok(())
358        }
359    }
360
361    #[tokio::test]
362    async fn test_block_copier_completes_blocks_in_source_order() -> Result<()> {
363        let state = Arc::new(Mutex::new(TestState::default()));
364        let inner = TestCopy {
365            state: state.clone(),
366        };
367        let mut copier = BlockCopier::new(Arc::default(), inner, None, 0, 2, 2);
368
369        while copier.next().await?.is_some() {}
370
371        let completed_ranges = state
372            .lock()
373            .expect("test state mutex poisoned")
374            .completed_ranges
375            .clone();
376        assert_eq!(
377            completed_ranges,
378            vec![BytesRange::new(0, Some(2)), BytesRange::new(2, Some(2))]
379        );
380
381        Ok(())
382    }
383}