opendal/raw/oio/delete/
batch_delete.rs1use std::collections::HashSet;
19use std::future::Future;
20
21use crate::raw::*;
22use crate::*;
23
24pub trait BatchDelete: Send + Sync + Unpin + 'static {
28 fn delete_once(
34 &self,
35 path: String,
36 args: OpDelete,
37 ) -> impl Future<Output = Result<()>> + MaybeSend;
38
39 fn delete_batch(
44 &self,
45 batch: Vec<(String, OpDelete)>,
46 ) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
47}
48
49#[derive(Default)]
51pub struct BatchDeleteResult {
52 pub succeeded: Vec<(String, OpDelete)>,
54 pub failed: Vec<(String, OpDelete, Error)>,
56}
57
58pub struct BatchDeleter<D: BatchDelete> {
60 inner: D,
61 buffer: HashSet<(String, OpDelete)>,
62 max_batch_size: usize,
63}
64
65impl<D: BatchDelete> BatchDeleter<D> {
66 pub fn new(inner: D, max_batch_size: Option<usize>) -> Self {
68 debug_assert!(
69 max_batch_size.is_some(),
70 "BatchDeleter requires delete_max_size to be configured"
71 );
72 let max_batch_size = max_batch_size.unwrap_or(1);
73
74 Self {
75 inner,
76 buffer: HashSet::default(),
77 max_batch_size,
78 }
79 }
80
81 async fn flush_buffer(&mut self) -> Result<usize> {
82 if self.buffer.is_empty() {
83 return Ok(0);
84 }
85
86 if self.buffer.len() == 1 {
87 let (path, args) = self
88 .buffer
89 .iter()
90 .next()
91 .expect("the delete buffer size must be 1")
92 .clone();
93 self.inner.delete_once(path, args).await?;
94 self.buffer.clear();
95 return Ok(1);
96 }
97
98 let batch = self.buffer.iter().cloned().collect();
99 let result = self.inner.delete_batch(batch).await?;
100
101 if result.succeeded.is_empty() {
102 return Err(Error::new(
103 ErrorKind::Unexpected,
104 "batch delete returned zero successes",
105 ));
106 }
107 if result.succeeded.len() + result.failed.len() != self.buffer.len() {
108 return Err(Error::new(
109 ErrorKind::Unexpected,
110 "batch delete result size mismatch",
111 ));
112 }
113
114 let mut deleted = 0;
115 for i in result.succeeded {
116 self.buffer.remove(&i);
117 deleted += 1;
118 }
119
120 for (path, op, err) in result.failed {
121 if !err.is_temporary() {
122 return Err(err
123 .with_context("path", path)
124 .with_context("version", op.version().unwrap_or("<latest>")));
125 }
126 }
127
128 Ok(deleted)
129 }
130}
131
132impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
133 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
134 self.buffer.insert((path.to_string(), args));
135 if self.buffer.len() >= self.max_batch_size {
136 let _ = self.flush_buffer().await?;
137 return Ok(());
138 }
139
140 Ok(())
141 }
142
143 async fn close(&mut self) -> Result<()> {
144 loop {
145 let deleted = self.flush_buffer().await?;
146
147 if self.buffer.is_empty() {
148 break;
149 }
150
151 if deleted == 0 {
152 return Err(Error::new(
153 ErrorKind::Unexpected,
154 "batch delete made no progress while buffer remains",
155 ));
156 }
157 }
158
159 Ok(())
160 }
161}