opendal_core/raw/http_util/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::LazyLock;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Bytes;
32use futures::Future;
33use futures::TryStreamExt;
34use http::Request;
35use http::Response;
36use http_body::Frame;
37use http_body::SizeHint;
38use raw::oio::Read;
39
40use super::HttpBody;
41use super::parse_content_encoding;
42use super::parse_content_length;
43use crate::raw::*;
44use crate::*;
45
46/// Http client used across opendal for loading credentials.
47/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
48/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
49#[allow(dead_code)]
50pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
51
52/// HttpFetcher is a type erased [`HttpFetch`].
53pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
54
55/// A HTTP client instance for OpenDAL's services.
56///
57/// # Notes
58///
59/// * A http client must support redirections that follows 3xx response.
60#[derive(Clone)]
61pub struct HttpClient {
62    fetcher: HttpFetcher,
63}
64
65/// We don't want users to know details about our clients.
66impl Debug for HttpClient {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("HttpClient").finish()
69    }
70}
71
72impl Default for HttpClient {
73    fn default() -> Self {
74        Self {
75            fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
76        }
77    }
78}
79
80impl HttpClient {
81    /// Create a new http client in async context.
82    pub fn new() -> Result<Self> {
83        Ok(Self::default())
84    }
85
86    /// Construct `Self` with given [`reqwest::Client`]
87    pub fn with(client: impl HttpFetch) -> Self {
88        let fetcher = Arc::new(client);
89        Self { fetcher }
90    }
91
92    /// Get the inner http client.
93    pub fn into_inner(self) -> HttpFetcher {
94        self.fetcher
95    }
96
97    /// Send a request and consume response.
98    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
99        let (parts, mut body) = self.fetch(req).await?.into_parts();
100        let buffer = body.read_all().await?;
101        Ok(Response::from_parts(parts, buffer))
102    }
103
104    /// Fetch a request and return a streamable [`HttpBody`].
105    ///
106    /// Services can use [`HttpBody`] as [`Access::Read`].
107    pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
108        self.fetcher.fetch(req).await
109    }
110}
111
112/// HttpFetch is the trait to fetch a request in async way.
113/// User should implement this trait to provide their own http client.
114pub trait HttpFetch: Send + Sync + Unpin + 'static {
115    /// Fetch a request in async way.
116    fn fetch(
117        &self,
118        req: Request<Buffer>,
119    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
120}
121
122/// HttpFetchDyn is the dyn version of [`HttpFetch`]
123/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
124/// User should never implement this trait, but use `HttpFetch` instead.
125pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
126    /// The dyn version of [`HttpFetch::fetch`].
127    ///
128    /// This function returns a boxed future to make it object safe.
129    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
130}
131
132impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
133    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
134        Box::pin(self.fetch(req))
135    }
136}
137
138impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
139    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
140        self.deref().fetch_dyn(req).await
141    }
142}
143
144impl HttpFetch for reqwest::Client {
145    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
146        // Uri stores all string alike data in `Bytes` which means
147        // the clone here is cheap.
148        let uri = req.uri().clone();
149        let is_head = req.method() == http::Method::HEAD;
150
151        let (parts, body) = req.into_parts();
152
153        let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
154            Error::new(ErrorKind::Unexpected, "request url is invalid")
155                .with_operation("http_util::Client::send::fetch")
156                .with_context("url", uri.to_string())
157                .set_source(err)
158        })?;
159
160        let mut req_builder = self.request(parts.method, url).headers(parts.headers);
161
162        // Client under wasm doesn't support set version.
163        #[cfg(not(target_arch = "wasm32"))]
164        {
165            req_builder = req_builder.version(parts.version);
166        }
167
168        // Don't set body if body is empty.
169        if !body.is_empty() {
170            #[cfg(not(target_arch = "wasm32"))]
171            {
172                req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
173            }
174            #[cfg(target_arch = "wasm32")]
175            {
176                req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
177            }
178        }
179
180        let mut resp = req_builder.send().await.map_err(|err| {
181            Error::new(ErrorKind::Unexpected, "send http request")
182                .with_operation("http_util::Client::send")
183                .with_context("url", uri.to_string())
184                .with_temporary(is_temporary_error(&err))
185                .set_source(err)
186        })?;
187
188        // Get content length from header so that we can check it.
189        //
190        // - If the request method is HEAD, we will ignore content length.
191        // - If response contains content_encoding, we should omit its content length.
192        let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
193            None
194        } else {
195            parse_content_length(resp.headers())?
196        };
197
198        let mut hr = Response::builder()
199            .status(resp.status())
200            // Insert uri into response extension so that we can fetch
201            // it later.
202            .extension(uri.clone());
203
204        // Response builder under wasm doesn't support set version.
205        #[cfg(not(target_arch = "wasm32"))]
206        {
207            hr = hr.version(resp.version());
208        }
209
210        // Swap headers directly instead of copy the entire map.
211        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
212
213        let bs = HttpBody::new(
214            resp.bytes_stream()
215                .try_filter(|v| future::ready(!v.is_empty()))
216                .map_ok(Buffer::from)
217                .map_err(move |err| {
218                    Error::new(ErrorKind::Unexpected, "read data from http response")
219                        .with_operation("http_util::Client::send")
220                        .with_context("url", uri.to_string())
221                        .with_temporary(is_temporary_error(&err))
222                        .set_source(err)
223                }),
224            content_length,
225        );
226
227        let resp = hr.body(bs).expect("response must build succeed");
228        Ok(resp)
229    }
230}
231
232#[inline]
233fn is_temporary_error(err: &reqwest::Error) -> bool {
234    // error sending request
235    err.is_request()||
236    // request or response body error
237    err.is_body() ||
238    // error decoding response body, for example, connection reset.
239    err.is_decode()
240}
241
242struct HttpBufferBody(Buffer);
243
244impl http_body::Body for HttpBufferBody {
245    type Data = Bytes;
246    type Error = Infallible;
247
248    fn poll_frame(
249        mut self: Pin<&mut Self>,
250        _: &mut Context<'_>,
251    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
252        match self.0.next() {
253            Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
254            None => Poll::Ready(None),
255        }
256    }
257
258    fn is_end_stream(&self) -> bool {
259        self.0.is_empty()
260    }
261
262    fn size_hint(&self) -> SizeHint {
263        SizeHint::with_exact(self.0.len() as u64)
264    }
265}