opendal/types/read/
futures_bytes_stream.rs1use std::io;
19use std::ops::RangeBounds;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25
26use bytes::Bytes;
27use futures::Stream;
28use futures::StreamExt;
29
30use crate::raw::*;
31use crate::*;
32
33pub struct FuturesBytesStream {
40 stream: BufferStream,
41 buf: Buffer,
42}
43
44unsafe impl Sync for FuturesBytesStream {}
46
47impl FuturesBytesStream {
48 pub(crate) async fn new(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Result<Self> {
50 let stream = BufferStream::create(ctx, range).await?;
51
52 Ok(FuturesBytesStream {
53 stream,
54 buf: Buffer::new(),
55 })
56 }
57}
58
59impl Stream for FuturesBytesStream {
60 type Item = io::Result<Bytes>;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let this = self.get_mut();
64
65 loop {
66 if let Some(bs) = Iterator::next(&mut this.buf) {
68 return Poll::Ready(Some(Ok(bs)));
69 }
70
71 this.buf = match ready!(this.stream.poll_next_unpin(cx)) {
72 Some(Ok(buf)) => buf,
73 Some(Err(err)) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
74 None => return Poll::Ready(None),
75 };
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use std::sync::Arc;
83
84 use bytes::Bytes;
85 use futures::TryStreamExt;
86 use pretty_assertions::assert_eq;
87
88 use super::*;
89
90 #[tokio::test]
91 async fn test_trait() -> Result<()> {
92 let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
93 let ctx = Arc::new(ReadContext::new(
94 acc,
95 "test".to_string(),
96 OpRead::new(),
97 OpReader::new(),
98 ));
99 let v = FuturesBytesStream::new(ctx, 4..8).await?;
100
101 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
102
103 Ok(())
104 }
105
106 #[tokio::test]
107 async fn test_futures_bytes_stream() -> Result<()> {
108 let op = Operator::via_iter(Scheme::Memory, [])?;
109 op.write(
110 "test",
111 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
112 )
113 .await?;
114
115 let acc = op.into_inner();
116 let ctx = Arc::new(ReadContext::new(
117 acc,
118 "test".to_string(),
119 OpRead::new(),
120 OpReader::new(),
121 ));
122
123 let s = FuturesBytesStream::new(ctx, 4..8).await?;
124 let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
125 assert_eq!(&bufs[0], "o".as_bytes());
126 assert_eq!(&bufs[1], "Wor".as_bytes());
127
128 Ok(())
129 }
130
131 #[tokio::test]
132 async fn test_futures_bytes_stream_with_concurrent() -> Result<()> {
133 let op = Operator::via_iter(Scheme::Memory, [])?;
134 op.write(
135 "test",
136 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
137 )
138 .await?;
139
140 let acc = op.into_inner();
141 let ctx = Arc::new(ReadContext::new(
142 acc,
143 "test".to_string(),
144 OpRead::new(),
145 OpReader::new().with_concurrent(3).with_chunk(1),
146 ));
147
148 let s = FuturesBytesStream::new(ctx, 4..8).await?;
149 let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
150 assert_eq!(&bufs[0], "o".as_bytes());
151 assert_eq!(&bufs[1], "W".as_bytes());
152 assert_eq!(&bufs[2], "o".as_bytes());
153 assert_eq!(&bufs[3], "r".as_bytes());
154
155 Ok(())
156 }
157}