opendal/types/delete/deleter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::pin::pin;
19
20use futures::Stream;
21use futures::StreamExt;
22
23use crate::raw::oio::DeleteDyn;
24use crate::raw::*;
25use crate::*;
26
27/// Deleter is designed to continuously remove content from storage.
28///
29/// It leverages batch deletion capabilities provided by storage services for efficient removal.
30///
31/// # Usage
32///
33/// [`Deleter`] provides several ways to delete files:
34///
35/// ## Direct Deletion
36///
37/// Use the `delete` method to remove a single file:
38///
39/// ```rust
40/// use opendal::Operator;
41/// use opendal::Result;
42///
43/// async fn example(op: Operator) -> Result<()> {
44/// let mut d = op.deleter().await?;
45/// d.delete("path/to/file").await?;
46/// d.close().await?;
47/// Ok(())
48/// }
49/// ```
50///
51/// Delete multiple files via a stream:
52///
53/// ```rust
54/// use futures::stream;
55/// use opendal::Operator;
56/// use opendal::Result;
57///
58/// async fn example(op: Operator) -> Result<()> {
59/// let mut d = op.deleter().await?;
60/// d.delete_stream(stream::iter(vec!["path/to/file"])).await?;
61/// d.close().await?;
62/// Ok(())
63/// }
64/// ```
65///
66/// ## Using as a Sink
67///
68/// Deleter can be used as a Sink for file deletion:
69///
70/// ```rust
71/// use futures::stream;
72/// use futures::Sink;
73/// use futures::SinkExt;
74/// use opendal::Operator;
75/// use opendal::Result;
76///
77/// async fn example(op: Operator) -> Result<()> {
78/// let mut sink = op.deleter().await?.into_sink();
79/// sink.send("path/to/file").await?;
80/// sink.close().await?;
81/// Ok(())
82/// }
83/// ```
84pub struct Deleter {
85 deleter: oio::Deleter,
86}
87
88impl Deleter {
89 pub(crate) async fn create(acc: Accessor) -> Result<Self> {
90 let (_, deleter) = acc.delete().await?;
91
92 Ok(Self { deleter })
93 }
94
95 /// Delete a path.
96 pub async fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
97 let input = input.into_delete_input();
98 let mut op = OpDelete::default();
99 if let Some(version) = &input.version {
100 op = op.with_version(version);
101 }
102 if input.recursive {
103 op = op.with_recursive(true);
104 }
105
106 self.deleter.delete_dyn(&input.path, op).await?;
107 Ok(())
108 }
109
110 /// Delete an infallible iterator of paths.
111 ///
112 /// Also see:
113 ///
114 /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
115 /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
116 /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
117 pub async fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
118 where
119 I: IntoIterator<Item = D>,
120 D: IntoDeleteInput,
121 {
122 for entry in iter {
123 self.delete(entry).await?;
124 }
125 Ok(())
126 }
127
128 /// Delete a fallible iterator of paths.
129 ///
130 /// Also see:
131 ///
132 /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
133 /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
134 /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
135 pub async fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
136 where
137 I: IntoIterator<Item = Result<D>>,
138 D: IntoDeleteInput,
139 {
140 for entry in try_iter {
141 self.delete(entry?).await?;
142 }
143
144 Ok(())
145 }
146
147 /// Delete an infallible stream of paths.
148 ///
149 /// Also see:
150 ///
151 /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
152 /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
153 /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
154 pub async fn delete_stream<S, D>(&mut self, stream: S) -> Result<()>
155 where
156 S: Stream<Item = D>,
157 D: IntoDeleteInput,
158 {
159 let mut stream = pin!(stream);
160 while let Some(entry) = stream.next().await {
161 self.delete(entry).await?;
162 }
163
164 Ok(())
165 }
166
167 /// Delete a fallible stream of paths.
168 ///
169 /// Also see:
170 ///
171 /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
172 /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
173 /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
174 pub async fn delete_try_stream<S, D>(&mut self, try_stream: S) -> Result<()>
175 where
176 S: Stream<Item = Result<D>>,
177 D: IntoDeleteInput,
178 {
179 let mut stream = pin!(try_stream);
180 while let Some(entry) = stream.next().await.transpose()? {
181 self.delete(entry).await?;
182 }
183
184 Ok(())
185 }
186
187 /// Close the deleter, this will flush the deleter and wait until all paths are deleted.
188 pub async fn close(&mut self) -> Result<()> {
189 self.deleter.close_dyn().await
190 }
191
192 /// Convert the deleter into a sink.
193 pub fn into_sink<T: IntoDeleteInput>(self) -> FuturesDeleteSink<T> {
194 FuturesDeleteSink::new(self)
195 }
196}