opendal/raw/http_util/
client.rs1use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::task::Context;
28use std::task::Poll;
29
30use bytes::Bytes;
31use futures::Future;
32use futures::TryStreamExt;
33use http::Request;
34use http::Response;
35use http_body::Frame;
36use http_body::SizeHint;
37use raw::oio::Read;
38use std::sync::LazyLock;
39
40use super::parse_content_encoding;
41use super::parse_content_length;
42use super::HttpBody;
43use crate::raw::*;
44use crate::*;
45
46#[allow(dead_code)]
50pub(crate) static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> =
51 LazyLock::new(reqwest::Client::new);
52
53pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
55
56#[derive(Clone)]
62pub struct HttpClient {
63 fetcher: HttpFetcher,
64}
65
66impl Debug for HttpClient {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("HttpClient").finish()
70 }
71}
72
73impl Default for HttpClient {
74 fn default() -> Self {
75 Self {
76 fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
77 }
78 }
79}
80
81impl HttpClient {
82 pub fn new() -> Result<Self> {
84 Ok(Self::default())
85 }
86
87 pub fn with(client: impl HttpFetch) -> Self {
89 let fetcher = Arc::new(client);
90 Self { fetcher }
91 }
92
93 pub(crate) fn into_inner(self) -> HttpFetcher {
95 self.fetcher
96 }
97
98 #[deprecated]
100 pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
101 let client = builder.build().map_err(|err| {
102 Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
103 })?;
104 let fetcher = Arc::new(client);
105 Ok(Self { fetcher })
106 }
107
108 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110 let (parts, mut body) = self.fetch(req).await?.into_parts();
111 let buffer = body.read_all().await?;
112 Ok(Response::from_parts(parts, buffer))
113 }
114
115 pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
119 self.fetcher.fetch(req).await
120 }
121}
122
123pub trait HttpFetch: Send + Sync + Unpin + 'static {
126 fn fetch(
128 &self,
129 req: Request<Buffer>,
130 ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
131}
132
133pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
137 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
141}
142
143impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
144 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
145 Box::pin(self.fetch(req))
146 }
147}
148
149impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
150 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
151 self.deref().fetch_dyn(req).await
152 }
153}
154
155impl HttpFetch for reqwest::Client {
156 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
157 let uri = req.uri().clone();
160 let is_head = req.method() == http::Method::HEAD;
161
162 let (parts, body) = req.into_parts();
163
164 let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
165 Error::new(ErrorKind::Unexpected, "request url is invalid")
166 .with_operation("http_util::Client::send::fetch")
167 .with_context("url", uri.to_string())
168 .set_source(err)
169 })?;
170
171 let mut req_builder = self.request(parts.method, url).headers(parts.headers);
172
173 #[cfg(not(target_arch = "wasm32"))]
175 {
176 req_builder = req_builder.version(parts.version);
177 }
178
179 if !body.is_empty() {
181 #[cfg(not(target_arch = "wasm32"))]
182 {
183 req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
184 }
185 #[cfg(target_arch = "wasm32")]
186 {
187 req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
188 }
189 }
190
191 let mut resp = req_builder.send().await.map_err(|err| {
192 Error::new(ErrorKind::Unexpected, "send http request")
193 .with_operation("http_util::Client::send")
194 .with_context("url", uri.to_string())
195 .with_temporary(is_temporary_error(&err))
196 .set_source(err)
197 })?;
198
199 let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
204 None
205 } else {
206 parse_content_length(resp.headers())?
207 };
208
209 let mut hr = Response::builder()
210 .status(resp.status())
211 .extension(uri.clone());
214
215 #[cfg(not(target_arch = "wasm32"))]
217 {
218 hr = hr.version(resp.version());
219 }
220
221 mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
223
224 let bs = HttpBody::new(
225 resp.bytes_stream()
226 .try_filter(|v| future::ready(!v.is_empty()))
227 .map_ok(Buffer::from)
228 .map_err(move |err| {
229 Error::new(ErrorKind::Unexpected, "read data from http response")
230 .with_operation("http_util::Client::send")
231 .with_context("url", uri.to_string())
232 .with_temporary(is_temporary_error(&err))
233 .set_source(err)
234 }),
235 content_length,
236 );
237
238 let resp = hr.body(bs).expect("response must build succeed");
239 Ok(resp)
240 }
241}
242
243#[inline]
244fn is_temporary_error(err: &reqwest::Error) -> bool {
245 err.is_request()||
247 err.is_body() ||
249 err.is_decode()
251}
252
253struct HttpBufferBody(Buffer);
254
255impl http_body::Body for HttpBufferBody {
256 type Data = Bytes;
257 type Error = Infallible;
258
259 fn poll_frame(
260 mut self: Pin<&mut Self>,
261 _: &mut Context<'_>,
262 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
263 match self.0.next() {
264 Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
265 None => Poll::Ready(None),
266 }
267 }
268
269 fn is_end_stream(&self) -> bool {
270 self.0.is_empty()
271 }
272
273 fn size_hint(&self) -> SizeHint {
274 SizeHint::with_exact(self.0.len() as u64)
275 }
276}