Skip to main content

opendal_core/raw/oio/read/
api.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem;
19use std::ops::Deref;
20use std::ops::DerefMut;
21
22use bytes::Bytes;
23use futures::Future;
24
25use crate::raw::*;
26use crate::*;
27
28/// Reader is a type erased [`Read`].
29pub type Reader = Box<dyn ReadDyn>;
30
31/// Read is the internal trait used by OpenDAL to read ranges from storage.
32///
33/// Users should not use or import this trait unless they are implementing a `Service`.
34///
35/// This trait returns `impl Future`, so it is not object safe. Use [`ReadDyn`] when
36/// type erasure is required.
37pub trait Read: Unpin + Send + Sync {
38    /// Open a range stream for the given range.
39    fn open(
40        &self,
41        range: BytesRange,
42    ) -> impl Future<Output = Result<(RpRead, Box<dyn ReadStreamDyn>)>> + MaybeSend;
43
44    /// Read an exact bounded range into [`Buffer`].
45    fn read(&self, range: BytesRange)
46    -> impl Future<Output = Result<(RpRead, Buffer)>> + MaybeSend;
47}
48
49impl Read for () {
50    async fn open(&self, _: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
51        Err(Error::new(
52            ErrorKind::Unsupported,
53            "output reader doesn't support open",
54        ))
55    }
56
57    async fn read(&self, _: BytesRange) -> Result<(RpRead, Buffer)> {
58        Err(Error::new(
59            ErrorKind::Unsupported,
60            "output reader doesn't support read",
61        ))
62    }
63}
64
65/// ReadDyn is the dyn-compatible adapter for [`Read`].
66///
67/// It boxes returned futures to support `Box<dyn ReadDyn>`, adding one allocation
68/// per call at the type-erasure boundary.
69pub trait ReadDyn: Unpin + Send + Sync {
70    /// The dyn version of [`Read::open`].
71    fn open_dyn(
72        &self,
73        range: BytesRange,
74    ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>>;
75
76    /// The dyn version of [`Read::read`].
77    fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>>;
78}
79
80impl<T: Read + ?Sized> ReadDyn for T {
81    fn open_dyn(
82        &self,
83        range: BytesRange,
84    ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>> {
85        Box::pin(self.open(range))
86    }
87
88    fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>> {
89        Box::pin(self.read(range))
90    }
91}
92
93impl<T: ReadDyn + ?Sized> Read for Box<T> {
94    async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
95        self.deref().open_dyn(range).await
96    }
97
98    async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
99        self.deref().read_dyn(range).await
100    }
101}
102
103/// ReadStream is the internal trait used by OpenDAL to stream data from storage.
104///
105/// Users should not use or import this trait unless they are implementing a `Service`.
106///
107/// This trait returns `impl Future`, so it is not object safe. Use [`ReadStreamDyn`]
108/// when type erasure is required.
109pub trait ReadStream: Unpin + Send + Sync {
110    /// Read the next data chunk from the stream.
111    fn read(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend;
112
113    /// Read all data from the reader.
114    fn read_all(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend {
115        async {
116            let mut bufs = vec![];
117            loop {
118                match self.read().await {
119                    Ok(buf) if buf.is_empty() => break,
120                    Ok(buf) => bufs.push(buf),
121                    Err(err) => return Err(err),
122                }
123            }
124            Ok(bufs.into_iter().flatten().collect())
125        }
126    }
127}
128
129impl ReadStream for () {
130    async fn read(&mut self) -> Result<Buffer> {
131        Err(Error::new(
132            ErrorKind::Unsupported,
133            "output reader doesn't support read",
134        ))
135    }
136}
137
138impl ReadStream for Bytes {
139    async fn read(&mut self) -> Result<Buffer> {
140        Ok(Buffer::from(self.split_off(0)))
141    }
142}
143
144impl ReadStream for Buffer {
145    async fn read(&mut self) -> Result<Buffer> {
146        Ok(mem::take(self))
147    }
148}
149
150/// ReadStreamDyn is the dyn-compatible adapter for [`ReadStream`].
151///
152/// It boxes returned futures to support `Box<dyn ReadStreamDyn>`, adding one
153/// allocation per call at the type-erasure boundary.
154pub trait ReadStreamDyn: Unpin + Send + Sync {
155    /// The dyn version of [`ReadStream::read`].
156    fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
157
158    /// The dyn version of [`ReadStream::read_all`].
159    fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
160}
161
162impl<T: ReadStream + ?Sized> ReadStreamDyn for T {
163    fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
164        Box::pin(self.read())
165    }
166
167    fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
168        Box::pin(self.read_all())
169    }
170}
171
172/// # NOTE
173///
174/// Take care about the `deref_mut()` here. This makes sure that we are calling functions
175/// upon `&mut T` instead of `&mut Box<T>`. The later could result in infinite recursion.
176impl<T: ReadStreamDyn + ?Sized> ReadStream for Box<T> {
177    async fn read(&mut self) -> Result<Buffer> {
178        self.deref_mut().read_dyn().await
179    }
180
181    async fn read_all(&mut self) -> Result<Buffer> {
182        self.deref_mut().read_all_dyn().await
183    }
184}