opendal_core/raw/http_util/
client.rs1use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::LazyLock;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Bytes;
32use futures::Future;
33use futures::TryStreamExt;
34use http::Request;
35use http::Response;
36use http_body::Frame;
37use http_body::SizeHint;
38use raw::oio::Read;
39
40use super::HttpBody;
41use super::parse_content_encoding;
42use super::parse_content_length;
43use crate::raw::*;
44use crate::*;
45
46#[allow(dead_code)]
50pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
51
52pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
54
55#[derive(Clone)]
61pub struct HttpClient {
62 fetcher: HttpFetcher,
63}
64
65#[derive(Clone)]
68pub struct AccessorInfoHttpSend {
69 info: Arc<AccessorInfo>,
70}
71
72impl AccessorInfoHttpSend {
73 pub fn new(info: Arc<AccessorInfo>) -> Self {
75 Self { info }
76 }
77}
78
79impl Debug for HttpClient {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("HttpClient").finish()
83 }
84}
85
86impl Debug for AccessorInfoHttpSend {
87 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("AccessorInfoHttpSend").finish()
89 }
90}
91
92impl Default for HttpClient {
93 fn default() -> Self {
94 Self {
95 fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
96 }
97 }
98}
99
100impl HttpClient {
101 pub fn new() -> Result<Self> {
103 Ok(Self::default())
104 }
105
106 pub fn with(client: impl HttpFetch) -> Self {
108 let fetcher = Arc::new(client);
109 Self { fetcher }
110 }
111
112 pub fn into_inner(self) -> HttpFetcher {
114 self.fetcher
115 }
116
117 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
119 let (parts, mut body) = self.fetch(req).await?.into_parts();
120 let buffer = body.read_all().await?;
121 Ok(Response::from_parts(parts, buffer))
122 }
123
124 pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
128 self.fetcher.fetch(req).await
129 }
130}
131
132impl reqsign_core::HttpSend for HttpClient {
133 async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
134 let req = req.map(Buffer::from);
135 let resp = self.send(req).await.map_err(|err| {
136 let retryable = err.is_temporary();
137 reqsign_core::Error::unexpected("send request via OpenDAL HttpClient")
138 .with_source(err)
139 .set_retryable(retryable)
140 })?;
141
142 let (parts, body) = resp.into_parts();
143 Ok(Response::from_parts(parts, body.to_bytes()))
144 }
145}
146
147impl reqsign_core::HttpSend for AccessorInfoHttpSend {
148 async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
149 let client = self.info.http_client();
150 reqsign_core::HttpSend::http_send(&client, req).await
151 }
152}
153
154pub trait HttpFetch: Send + Sync + Unpin + 'static {
157 fn fetch(
159 &self,
160 req: Request<Buffer>,
161 ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
162}
163
164pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
168 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
172}
173
174impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
175 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
176 Box::pin(self.fetch(req))
177 }
178}
179
180impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
181 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
182 self.deref().fetch_dyn(req).await
183 }
184}
185
186impl HttpFetch for reqwest::Client {
187 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
188 let uri = req.uri().clone();
191 let is_head = req.method() == http::Method::HEAD;
192
193 let (parts, body) = req.into_parts();
194
195 let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
196 Error::new(ErrorKind::Unexpected, "request url is invalid")
197 .with_operation("http_util::Client::send::fetch")
198 .with_context("url", uri.to_string())
199 .set_source(err)
200 })?;
201
202 let mut req_builder = self.request(parts.method, url).headers(parts.headers);
203
204 #[cfg(not(target_arch = "wasm32"))]
206 {
207 req_builder = req_builder.version(parts.version);
208 }
209
210 if !body.is_empty() {
212 #[cfg(not(target_arch = "wasm32"))]
213 {
214 req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
215 }
216 #[cfg(target_arch = "wasm32")]
217 {
218 req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
219 }
220 }
221
222 let mut resp = req_builder.send().await.map_err(|err| {
223 Error::new(ErrorKind::Unexpected, "send http request")
224 .with_operation("http_util::Client::send")
225 .with_context("url", uri.to_string())
226 .with_temporary(is_temporary_error(&err))
227 .set_source(err)
228 })?;
229
230 let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
235 None
236 } else {
237 parse_content_length(resp.headers())?
238 };
239
240 let mut hr = Response::builder()
241 .status(resp.status())
242 .extension(uri.clone());
245
246 #[cfg(not(target_arch = "wasm32"))]
248 {
249 hr = hr.version(resp.version());
250 }
251
252 mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
254
255 let bs = HttpBody::new(
256 resp.bytes_stream()
257 .try_filter(|v| future::ready(!v.is_empty()))
258 .map_ok(Buffer::from)
259 .map_err(move |err| {
260 Error::new(ErrorKind::Unexpected, "read data from http response")
261 .with_operation("http_util::Client::send")
262 .with_context("url", uri.to_string())
263 .with_temporary(is_temporary_error(&err))
264 .set_source(err)
265 }),
266 content_length,
267 );
268
269 let resp = hr.body(bs).expect("response must build succeed");
270 Ok(resp)
271 }
272}
273
274#[inline]
275fn is_temporary_error(err: &reqwest::Error) -> bool {
276 err.is_request()||
278 err.is_body() ||
280 err.is_decode()
282}
283
284struct HttpBufferBody(Buffer);
285
286impl http_body::Body for HttpBufferBody {
287 type Data = Bytes;
288 type Error = Infallible;
289
290 fn poll_frame(
291 mut self: Pin<&mut Self>,
292 _: &mut Context<'_>,
293 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
294 match self.0.next() {
295 Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
296 None => Poll::Ready(None),
297 }
298 }
299
300 fn is_end_stream(&self) -> bool {
301 self.0.is_empty()
302 }
303
304 fn size_hint(&self) -> SizeHint {
305 SizeHint::with_exact(self.0.len() as u64)
306 }
307}