opendal/layers/
throttle.rs1use std::num::NonZeroU32;
19use std::sync::Arc;
20
21use governor::Quota;
22use governor::RateLimiter;
23use governor::clock::DefaultClock;
24use governor::middleware::NoOpMiddleware;
25use governor::state::InMemoryState;
26use governor::state::NotKeyed;
27
28use crate::raw::*;
29use crate::*;
30
31#[derive(Clone)]
67pub struct ThrottleLayer {
68 bandwidth: NonZeroU32,
69 burst: NonZeroU32,
70}
71
72impl ThrottleLayer {
73 pub fn new(bandwidth: u32, burst: u32) -> Self {
78 assert!(bandwidth > 0);
79 assert!(burst > 0);
80 Self {
81 bandwidth: NonZeroU32::new(bandwidth).unwrap(),
82 burst: NonZeroU32::new(burst).unwrap(),
83 }
84 }
85}
86
87impl<A: Access> Layer<A> for ThrottleLayer {
88 type LayeredAccess = ThrottleAccessor<A>;
89
90 fn layer(&self, accessor: A) -> Self::LayeredAccess {
91 let rate_limiter = Arc::new(RateLimiter::direct(
92 Quota::per_second(self.bandwidth).allow_burst(self.burst),
93 ));
94 ThrottleAccessor {
95 inner: accessor,
96 rate_limiter,
97 }
98 }
99}
100
101type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
105
106#[derive(Debug, Clone)]
107pub struct ThrottleAccessor<A: Access> {
108 inner: A,
109 rate_limiter: SharedRateLimiter,
110}
111
112impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
113 type Inner = A;
114 type Reader = ThrottleWrapper<A::Reader>;
115 type Writer = ThrottleWrapper<A::Writer>;
116 type Lister = A::Lister;
117 type Deleter = A::Deleter;
118
119 fn inner(&self) -> &Self::Inner {
120 &self.inner
121 }
122
123 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
124 let limiter = self.rate_limiter.clone();
125
126 self.inner
127 .read(path, args)
128 .await
129 .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
130 }
131
132 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
133 let limiter = self.rate_limiter.clone();
134
135 self.inner
136 .write(path, args)
137 .await
138 .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
139 }
140
141 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
142 self.inner.delete().await
143 }
144
145 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
146 self.inner.list(path, args).await
147 }
148}
149
150pub struct ThrottleWrapper<R> {
151 inner: R,
152 limiter: SharedRateLimiter,
153}
154
155impl<R> ThrottleWrapper<R> {
156 pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
157 Self {
158 inner,
159 limiter: rate_limiter,
160 }
161 }
162}
163
164impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
165 async fn read(&mut self) -> Result<Buffer> {
166 self.inner.read().await
167 }
168}
169
170impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
171 async fn write(&mut self, bs: Buffer) -> Result<()> {
172 let len = bs.len();
173 if len == 0 {
174 return self.inner.write(bs).await;
175 }
176
177 if len > u32::MAX as usize {
178 return Err(Error::new(
179 ErrorKind::RateLimited,
180 "request size exceeds throttle quota capacity",
181 ));
182 }
183
184 let buf_length =
185 NonZeroU32::new(len as u32).expect("len is non-zero so NonZeroU32 must exist");
186
187 self.limiter.until_n_ready(buf_length).await.map_err(|_| {
188 Error::new(
189 ErrorKind::RateLimited,
190 "burst size is smaller than the request size",
191 )
192 })?;
193
194 self.inner.write(bs).await
195 }
196
197 async fn abort(&mut self) -> Result<()> {
198 self.inner.abort().await
199 }
200
201 async fn close(&mut self) -> Result<Metadata> {
202 self.inner.close().await
203 }
204}