opendal_core/raw/http_util/
client.rs1use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::LazyLock;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Bytes;
32use futures::Future;
33use futures::TryStreamExt;
34use http::Request;
35use http::Response;
36use http_body::Frame;
37use http_body::SizeHint;
38use raw::oio::Read;
39
40use super::HttpBody;
41use super::parse_content_encoding;
42use super::parse_content_length;
43use crate::raw::*;
44use crate::*;
45
46#[allow(dead_code)]
50pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
51
52pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
54
55#[derive(Clone)]
61pub struct HttpClient {
62 fetcher: HttpFetcher,
63}
64
65impl Debug for HttpClient {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("HttpClient").finish()
69 }
70}
71
72impl Default for HttpClient {
73 fn default() -> Self {
74 Self {
75 fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
76 }
77 }
78}
79
80impl HttpClient {
81 pub fn new() -> Result<Self> {
83 Ok(Self::default())
84 }
85
86 pub fn with(client: impl HttpFetch) -> Self {
88 let fetcher = Arc::new(client);
89 Self { fetcher }
90 }
91
92 pub fn into_inner(self) -> HttpFetcher {
94 self.fetcher
95 }
96
97 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
99 let (parts, mut body) = self.fetch(req).await?.into_parts();
100 let buffer = body.read_all().await?;
101 Ok(Response::from_parts(parts, buffer))
102 }
103
104 pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
108 self.fetcher.fetch(req).await
109 }
110}
111
112pub trait HttpFetch: Send + Sync + Unpin + 'static {
115 fn fetch(
117 &self,
118 req: Request<Buffer>,
119 ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
120}
121
122pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
126 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
130}
131
132impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
133 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
134 Box::pin(self.fetch(req))
135 }
136}
137
138impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
139 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
140 self.deref().fetch_dyn(req).await
141 }
142}
143
144impl HttpFetch for reqwest::Client {
145 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
146 let uri = req.uri().clone();
149 let is_head = req.method() == http::Method::HEAD;
150
151 let (parts, body) = req.into_parts();
152
153 let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
154 Error::new(ErrorKind::Unexpected, "request url is invalid")
155 .with_operation("http_util::Client::send::fetch")
156 .with_context("url", uri.to_string())
157 .set_source(err)
158 })?;
159
160 let mut req_builder = self.request(parts.method, url).headers(parts.headers);
161
162 #[cfg(not(target_arch = "wasm32"))]
164 {
165 req_builder = req_builder.version(parts.version);
166 }
167
168 if !body.is_empty() {
170 #[cfg(not(target_arch = "wasm32"))]
171 {
172 req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
173 }
174 #[cfg(target_arch = "wasm32")]
175 {
176 req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
177 }
178 }
179
180 let mut resp = req_builder.send().await.map_err(|err| {
181 Error::new(ErrorKind::Unexpected, "send http request")
182 .with_operation("http_util::Client::send")
183 .with_context("url", uri.to_string())
184 .with_temporary(is_temporary_error(&err))
185 .set_source(err)
186 })?;
187
188 let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
193 None
194 } else {
195 parse_content_length(resp.headers())?
196 };
197
198 let mut hr = Response::builder()
199 .status(resp.status())
200 .extension(uri.clone());
203
204 #[cfg(not(target_arch = "wasm32"))]
206 {
207 hr = hr.version(resp.version());
208 }
209
210 mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
212
213 let bs = HttpBody::new(
214 resp.bytes_stream()
215 .try_filter(|v| future::ready(!v.is_empty()))
216 .map_ok(Buffer::from)
217 .map_err(move |err| {
218 Error::new(ErrorKind::Unexpected, "read data from http response")
219 .with_operation("http_util::Client::send")
220 .with_context("url", uri.to_string())
221 .with_temporary(is_temporary_error(&err))
222 .set_source(err)
223 }),
224 content_length,
225 );
226
227 let resp = hr.body(bs).expect("response must build succeed");
228 Ok(resp)
229 }
230}
231
232#[inline]
233fn is_temporary_error(err: &reqwest::Error) -> bool {
234 err.is_request()||
236 err.is_body() ||
238 err.is_decode()
240}
241
242struct HttpBufferBody(Buffer);
243
244impl http_body::Body for HttpBufferBody {
245 type Data = Bytes;
246 type Error = Infallible;
247
248 fn poll_frame(
249 mut self: Pin<&mut Self>,
250 _: &mut Context<'_>,
251 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
252 match self.0.next() {
253 Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
254 None => Poll::Ready(None),
255 }
256 }
257
258 fn is_end_stream(&self) -> bool {
259 self.0.is_empty()
260 }
261
262 fn size_hint(&self) -> SizeHint {
263 SizeHint::with_exact(self.0.len() as u64)
264 }
265}