opendal_core/raw/oio/read/
api.rs1use std::mem;
19use std::ops::Deref;
20use std::ops::DerefMut;
21
22use bytes::Bytes;
23use futures::Future;
24
25use crate::raw::*;
26use crate::*;
27
28pub type Reader = Box<dyn ReadDyn>;
30
31pub trait Read: Unpin + Send + Sync {
38 fn open(
40 &self,
41 range: BytesRange,
42 ) -> impl Future<Output = Result<(RpRead, Box<dyn ReadStreamDyn>)>> + MaybeSend;
43
44 fn read(&self, range: BytesRange)
46 -> impl Future<Output = Result<(RpRead, Buffer)>> + MaybeSend;
47}
48
49impl Read for () {
50 async fn open(&self, _: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
51 Err(Error::new(
52 ErrorKind::Unsupported,
53 "output reader doesn't support open",
54 ))
55 }
56
57 async fn read(&self, _: BytesRange) -> Result<(RpRead, Buffer)> {
58 Err(Error::new(
59 ErrorKind::Unsupported,
60 "output reader doesn't support read",
61 ))
62 }
63}
64
65pub trait ReadDyn: Unpin + Send + Sync {
70 fn open_dyn(
72 &self,
73 range: BytesRange,
74 ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>>;
75
76 fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>>;
78}
79
80impl<T: Read + ?Sized> ReadDyn for T {
81 fn open_dyn(
82 &self,
83 range: BytesRange,
84 ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>> {
85 Box::pin(self.open(range))
86 }
87
88 fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>> {
89 Box::pin(self.read(range))
90 }
91}
92
93impl<T: ReadDyn + ?Sized> Read for Box<T> {
94 async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
95 self.deref().open_dyn(range).await
96 }
97
98 async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
99 self.deref().read_dyn(range).await
100 }
101}
102
103pub trait ReadStream: Unpin + Send + Sync {
110 fn read(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend;
112
113 fn read_all(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend {
115 async {
116 let mut bufs = vec![];
117 loop {
118 match self.read().await {
119 Ok(buf) if buf.is_empty() => break,
120 Ok(buf) => bufs.push(buf),
121 Err(err) => return Err(err),
122 }
123 }
124 Ok(bufs.into_iter().flatten().collect())
125 }
126 }
127}
128
129impl ReadStream for () {
130 async fn read(&mut self) -> Result<Buffer> {
131 Err(Error::new(
132 ErrorKind::Unsupported,
133 "output reader doesn't support read",
134 ))
135 }
136}
137
138impl ReadStream for Bytes {
139 async fn read(&mut self) -> Result<Buffer> {
140 Ok(Buffer::from(self.split_off(0)))
141 }
142}
143
144impl ReadStream for Buffer {
145 async fn read(&mut self) -> Result<Buffer> {
146 Ok(mem::take(self))
147 }
148}
149
150pub trait ReadStreamDyn: Unpin + Send + Sync {
155 fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
157
158 fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
160}
161
162impl<T: ReadStream + ?Sized> ReadStreamDyn for T {
163 fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
164 Box::pin(self.read())
165 }
166
167 fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
168 Box::pin(self.read_all())
169 }
170}
171
172impl<T: ReadStreamDyn + ?Sized> ReadStream for Box<T> {
177 async fn read(&mut self) -> Result<Buffer> {
178 self.deref_mut().read_dyn().await
179 }
180
181 async fn read_all(&mut self) -> Result<Buffer> {
182 self.deref_mut().read_all_dyn().await
183 }
184}