opendal_core/raw/http_util/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::LazyLock;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Bytes;
32use futures::Future;
33use futures::TryStreamExt;
34use http::Request;
35use http::Response;
36use http_body::Frame;
37use http_body::SizeHint;
38use raw::oio::Read;
39
40use super::HttpBody;
41use super::parse_content_encoding;
42use super::parse_content_length;
43use crate::raw::*;
44use crate::*;
45
46/// Http client used across opendal for loading credentials.
47/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
48/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
49#[allow(dead_code)]
50pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
51
52/// HttpFetcher is a type erased [`HttpFetch`].
53pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
54
55/// An HTTP client instance for OpenDAL's services.
56///
57/// # Notes
58///
59/// * A http client must support redirections that follows 3xx response.
60#[derive(Clone)]
61pub struct HttpClient {
62    fetcher: HttpFetcher,
63}
64
65/// A reqsign `HttpSend` implementation that always forwards requests to the
66/// current http client stored inside [`AccessorInfo`].
67#[derive(Clone)]
68pub struct AccessorInfoHttpSend {
69    info: Arc<AccessorInfo>,
70}
71
72impl AccessorInfoHttpSend {
73    /// Create a new [`AccessorInfoHttpSend`].
74    pub fn new(info: Arc<AccessorInfo>) -> Self {
75        Self { info }
76    }
77}
78
79/// We don't want users to know details about our clients.
80impl Debug for HttpClient {
81    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("HttpClient").finish()
83    }
84}
85
86impl Debug for AccessorInfoHttpSend {
87    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("AccessorInfoHttpSend").finish()
89    }
90}
91
92impl Default for HttpClient {
93    fn default() -> Self {
94        Self {
95            fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
96        }
97    }
98}
99
100impl HttpClient {
101    /// Create a new http client in async context.
102    pub fn new() -> Result<Self> {
103        Ok(Self::default())
104    }
105
106    /// Construct `Self` with given [`reqwest::Client`]
107    pub fn with(client: impl HttpFetch) -> Self {
108        let fetcher = Arc::new(client);
109        Self { fetcher }
110    }
111
112    /// Get the inner http client.
113    pub fn into_inner(self) -> HttpFetcher {
114        self.fetcher
115    }
116
117    /// Send a request and consume response.
118    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
119        let (parts, mut body) = self.fetch(req).await?.into_parts();
120        let buffer = body.read_all().await?;
121        Ok(Response::from_parts(parts, buffer))
122    }
123
124    /// Fetch a request and return a streamable [`HttpBody`].
125    ///
126    /// Services can use [`HttpBody`] as [`Access::Read`].
127    pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
128        self.fetcher.fetch(req).await
129    }
130}
131
132impl reqsign_core::HttpSend for HttpClient {
133    async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
134        let req = req.map(Buffer::from);
135        let resp = self.send(req).await.map_err(|err| {
136            let retryable = err.is_temporary();
137            reqsign_core::Error::unexpected("send request via OpenDAL HttpClient")
138                .with_source(err)
139                .set_retryable(retryable)
140        })?;
141
142        let (parts, body) = resp.into_parts();
143        Ok(Response::from_parts(parts, body.to_bytes()))
144    }
145}
146
147impl reqsign_core::HttpSend for AccessorInfoHttpSend {
148    async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
149        let client = self.info.http_client();
150        reqsign_core::HttpSend::http_send(&client, req).await
151    }
152}
153
154/// HttpFetch is the trait to fetch a request in async way.
155/// User should implement this trait to provide their own http client.
156pub trait HttpFetch: Send + Sync + Unpin + 'static {
157    /// Fetch a request in async way.
158    fn fetch(
159        &self,
160        req: Request<Buffer>,
161    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
162}
163
164/// HttpFetchDyn is the dyn version of [`HttpFetch`]
165/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
166/// User should never implement this trait, but use `HttpFetch` instead.
167pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
168    /// The dyn version of [`HttpFetch::fetch`].
169    ///
170    /// This function returns a boxed future to make it object safe.
171    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
172}
173
174impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
175    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
176        Box::pin(self.fetch(req))
177    }
178}
179
180impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
181    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
182        self.deref().fetch_dyn(req).await
183    }
184}
185
186impl HttpFetch for reqwest::Client {
187    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
188        // Uri stores all string alike data in `Bytes` which means
189        // the clone here is cheap.
190        let uri = req.uri().clone();
191        let is_head = req.method() == http::Method::HEAD;
192
193        let (parts, body) = req.into_parts();
194
195        let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
196            Error::new(ErrorKind::Unexpected, "request url is invalid")
197                .with_operation("http_util::Client::send::fetch")
198                .with_context("url", uri.to_string())
199                .set_source(err)
200        })?;
201
202        let mut req_builder = self.request(parts.method, url).headers(parts.headers);
203
204        // Client under wasm doesn't support set version.
205        #[cfg(not(target_arch = "wasm32"))]
206        {
207            req_builder = req_builder.version(parts.version);
208        }
209
210        // Don't set body if body is empty.
211        if !body.is_empty() {
212            #[cfg(not(target_arch = "wasm32"))]
213            {
214                req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
215            }
216            #[cfg(target_arch = "wasm32")]
217            {
218                req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
219            }
220        }
221
222        let mut resp = req_builder.send().await.map_err(|err| {
223            Error::new(ErrorKind::Unexpected, "send http request")
224                .with_operation("http_util::Client::send")
225                .with_context("url", uri.to_string())
226                .with_temporary(is_temporary_error(&err))
227                .set_source(err)
228        })?;
229
230        // Get content length from header so that we can check it.
231        //
232        // - If the request method is HEAD, we will ignore content length.
233        // - If response contains content_encoding, we should omit its content length.
234        let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
235            None
236        } else {
237            parse_content_length(resp.headers())?
238        };
239
240        let mut hr = Response::builder()
241            .status(resp.status())
242            // Insert uri into response extension so that we can fetch
243            // it later.
244            .extension(uri.clone());
245
246        // Response builder under wasm doesn't support set version.
247        #[cfg(not(target_arch = "wasm32"))]
248        {
249            hr = hr.version(resp.version());
250        }
251
252        // Swap headers directly instead of copy the entire map.
253        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
254
255        let bs = HttpBody::new(
256            resp.bytes_stream()
257                .try_filter(|v| future::ready(!v.is_empty()))
258                .map_ok(Buffer::from)
259                .map_err(move |err| {
260                    Error::new(ErrorKind::Unexpected, "read data from http response")
261                        .with_operation("http_util::Client::send")
262                        .with_context("url", uri.to_string())
263                        .with_temporary(is_temporary_error(&err))
264                        .set_source(err)
265                }),
266            content_length,
267        );
268
269        let resp = hr.body(bs).expect("response must build succeed");
270        Ok(resp)
271    }
272}
273
274#[inline]
275fn is_temporary_error(err: &reqwest::Error) -> bool {
276    // error sending request
277    err.is_request()||
278    // request or response body error
279    err.is_body() ||
280    // error decoding response body, for example, connection reset.
281    err.is_decode()
282}
283
284struct HttpBufferBody(Buffer);
285
286impl http_body::Body for HttpBufferBody {
287    type Data = Bytes;
288    type Error = Infallible;
289
290    fn poll_frame(
291        mut self: Pin<&mut Self>,
292        _: &mut Context<'_>,
293    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
294        match self.0.next() {
295            Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
296            None => Poll::Ready(None),
297        }
298    }
299
300    fn is_end_stream(&self) -> bool {
301        self.0.is_empty()
302    }
303
304    fn size_hint(&self) -> SizeHint {
305        SizeHint::with_exact(self.0.len() as u64)
306    }
307}