opendal/types/read/
futures_bytes_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io;
19use std::ops::RangeBounds;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25
26use bytes::Bytes;
27use futures::Stream;
28use futures::StreamExt;
29
30use crate::raw::*;
31use crate::*;
32
33/// FuturesBytesStream is the adapter of [`Stream`] generated by [`Reader::into_bytes_stream`].
34///
35/// Users can use this adapter in cases where they need to use [`Stream`] trait. FuturesBytesStream
36/// reuses the same concurrent adand chunk settings from [`Reader`].
37///ad
38/// FuturesStream also implements [`Unpin`], [`Send`] and [`Sync`].
39pub struct FuturesBytesStream {
40    stream: BufferStream,
41    buf: Buffer,
42}
43
44/// Safety: FuturesBytesStream only exposes `&mut self` to the outside world,
45unsafe impl Sync for FuturesBytesStream {}
46
47impl FuturesBytesStream {
48    /// NOTE: don't allow users to create FuturesStream directly.
49    pub(crate) async fn new(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Result<Self> {
50        let stream = BufferStream::create(ctx, range).await?;
51
52        Ok(FuturesBytesStream {
53            stream,
54            buf: Buffer::new(),
55        })
56    }
57}
58
59impl Stream for FuturesBytesStream {
60    type Item = io::Result<Bytes>;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        let this = self.get_mut();
64
65        loop {
66            // Consume current buffer
67            if let Some(bs) = Iterator::next(&mut this.buf) {
68                return Poll::Ready(Some(Ok(bs)));
69            }
70
71            this.buf = match ready!(this.stream.poll_next_unpin(cx)) {
72                Some(Ok(buf)) => buf,
73                Some(Err(err)) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
74                None => return Poll::Ready(None),
75            };
76        }
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use std::sync::Arc;
83
84    use bytes::Bytes;
85    use futures::TryStreamExt;
86    use pretty_assertions::assert_eq;
87
88    use super::*;
89
90    #[tokio::test]
91    async fn test_trait() -> Result<()> {
92        let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
93        let ctx = Arc::new(ReadContext::new(
94            acc,
95            "test".to_string(),
96            OpRead::new(),
97            OpReader::new(),
98        ));
99        let v = FuturesBytesStream::new(ctx, 4..8).await?;
100
101        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
102
103        Ok(())
104    }
105
106    #[tokio::test]
107    async fn test_futures_bytes_stream() -> Result<()> {
108        let op = Operator::via_iter(Scheme::Memory, [])?;
109        op.write(
110            "test",
111            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
112        )
113        .await?;
114
115        let acc = op.into_inner();
116        let ctx = Arc::new(ReadContext::new(
117            acc,
118            "test".to_string(),
119            OpRead::new(),
120            OpReader::new(),
121        ));
122
123        let s = FuturesBytesStream::new(ctx, 4..8).await?;
124        let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
125        assert_eq!(&bufs[0], "o".as_bytes());
126        assert_eq!(&bufs[1], "Wor".as_bytes());
127
128        Ok(())
129    }
130
131    #[tokio::test]
132    async fn test_futures_bytes_stream_with_concurrent() -> Result<()> {
133        let op = Operator::via_iter(Scheme::Memory, [])?;
134        op.write(
135            "test",
136            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
137        )
138        .await?;
139
140        let acc = op.into_inner();
141        let ctx = Arc::new(ReadContext::new(
142            acc,
143            "test".to_string(),
144            OpRead::new(),
145            OpReader::new().with_concurrent(3).with_chunk(1),
146        ));
147
148        let s = FuturesBytesStream::new(ctx, 4..8).await?;
149        let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
150        assert_eq!(&bufs[0], "o".as_bytes());
151        assert_eq!(&bufs[1], "W".as_bytes());
152        assert_eq!(&bufs[2], "o".as_bytes());
153        assert_eq!(&bufs[3], "r".as_bytes());
154
155        Ok(())
156    }
157}