opendal/raw/oio/delete/
batch_delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::future::Future;
20
21use crate::raw::*;
22use crate::*;
23
24/// BatchDelete is used to implement [`oio::Delete`] based on batch delete operation.
25///
26/// OneShotDeleter will perform delete operation while calling `close`.
27pub trait BatchDelete: Send + Sync + Unpin + 'static {
28    /// delete_once delete one path at once.
29    ///
30    /// Implementations should make sure that the data is deleted correctly at once.
31    ///
32    /// BatchDeleter may call this method while there are only one path to delete.
33    fn delete_once(
34        &self,
35        path: String,
36        args: OpDelete,
37    ) -> impl Future<Output = Result<()>> + MaybeSend;
38
39    /// delete_batch delete multiple paths at once.
40    ///
41    /// - Implementations should make sure that the length of `batch` equals to the return result's length.
42    /// - Implementations should return error no path is deleted.
43    fn delete_batch(
44        &self,
45        batch: Vec<(String, OpDelete)>,
46    ) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
47}
48
49/// BatchDeleteResult is the result of batch delete operation.
50#[derive(Default)]
51pub struct BatchDeleteResult {
52    /// Collection of successful deletions, containing tuples of (path, args)
53    pub succeeded: Vec<(String, OpDelete)>,
54    /// Collection of failed deletions, containing tuples of (path, args, error)
55    pub failed: Vec<(String, OpDelete, Error)>,
56}
57
58/// BatchDeleter is used to implement [`oio::Delete`] based on batch delete.
59pub struct BatchDeleter<D: BatchDelete> {
60    inner: D,
61    buffer: HashSet<(String, OpDelete)>,
62    max_batch_size: usize,
63}
64
65impl<D: BatchDelete> BatchDeleter<D> {
66    /// Create a new batch deleter.
67    pub fn new(inner: D, max_batch_size: Option<usize>) -> Self {
68        debug_assert!(
69            max_batch_size.is_some(),
70            "BatchDeleter requires delete_max_size to be configured"
71        );
72        let max_batch_size = max_batch_size.unwrap_or(1);
73
74        Self {
75            inner,
76            buffer: HashSet::default(),
77            max_batch_size,
78        }
79    }
80
81    async fn flush_buffer(&mut self) -> Result<usize> {
82        if self.buffer.is_empty() {
83            return Ok(0);
84        }
85
86        if self.buffer.len() == 1 {
87            let (path, args) = self
88                .buffer
89                .iter()
90                .next()
91                .expect("the delete buffer size must be 1")
92                .clone();
93            self.inner.delete_once(path, args).await?;
94            self.buffer.clear();
95            return Ok(1);
96        }
97
98        let batch = self.buffer.iter().cloned().collect();
99        let result = self.inner.delete_batch(batch).await?;
100
101        if result.succeeded.is_empty() {
102            return Err(Error::new(
103                ErrorKind::Unexpected,
104                "batch delete returned zero successes",
105            ));
106        }
107        if result.succeeded.len() + result.failed.len() != self.buffer.len() {
108            return Err(Error::new(
109                ErrorKind::Unexpected,
110                "batch delete result size mismatch",
111            ));
112        }
113
114        let mut deleted = 0;
115        for i in result.succeeded {
116            self.buffer.remove(&i);
117            deleted += 1;
118        }
119
120        for (path, op, err) in result.failed {
121            if !err.is_temporary() {
122                return Err(err
123                    .with_context("path", path)
124                    .with_context("version", op.version().unwrap_or("<latest>")));
125            }
126        }
127
128        Ok(deleted)
129    }
130}
131
132impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
133    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
134        self.buffer.insert((path.to_string(), args));
135        if self.buffer.len() >= self.max_batch_size {
136            let _ = self.flush_buffer().await?;
137            return Ok(());
138        }
139
140        Ok(())
141    }
142
143    async fn close(&mut self) -> Result<()> {
144        loop {
145            let deleted = self.flush_buffer().await?;
146
147            if self.buffer.is_empty() {
148                break;
149            }
150
151            if deleted == 0 {
152                return Err(Error::new(
153                    ErrorKind::Unexpected,
154                    "batch delete made no progress while buffer remains",
155                ));
156            }
157        }
158
159        Ok(())
160    }
161}