opendal/types/
buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::convert::Infallible;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::io::BufRead;
23use std::io::IoSlice;
24use std::io::Read;
25use std::io::Seek;
26use std::io::SeekFrom;
27use std::io::{self};
28use std::mem;
29use std::ops::Bound;
30use std::ops::RangeBounds;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::Context;
34use std::task::Poll;
35
36use bytes::Buf;
37use bytes::BufMut;
38use bytes::Bytes;
39use bytes::BytesMut;
40use futures::Stream;
41
42use crate::*;
43
44/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`.
45///
46/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example,
47/// http based storage like s3 could generate non-contiguous bytes by stream.
48///
49/// ## Features
50///
51/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly.
52/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation.
53/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write.
54///
55/// ## Examples
56///
57/// ### As `Buf`
58///
59/// `Buffer` implements `Buf` trait:
60///
61/// ```rust
62/// use bytes::Buf;
63/// use opendal::Buffer;
64/// use serde_json;
65///
66/// fn test(mut buf: Buffer) -> Vec<String> {
67///     serde_json::from_reader(buf.reader()).unwrap()
68/// }
69/// ```
70///
71/// ### As Bytes `Iterator`
72///
73/// `Buffer` implements `Iterator<Item=Bytes>` trait:
74///
75/// ```rust
76/// use bytes::Bytes;
77/// use opendal::Buffer;
78///
79/// fn test(mut buf: Buffer) -> Vec<Bytes> {
80///     buf.into_iter().collect()
81/// }
82/// ```
83///
84/// ### As Bytes `Stream`
85///
86/// `Buffer` implements `Stream<Item=Result<Bytes, Infallible>>` trait:
87///
88/// ```rust
89/// use bytes::Bytes;
90/// use futures::TryStreamExt;
91/// use opendal::Buffer;
92///
93/// async fn test(mut buf: Buffer) -> Vec<Bytes> {
94///     buf.into_iter().try_collect().await.unwrap()
95/// }
96/// ```
97///
98/// ### As one contiguous Bytes
99///
100/// `Buffer` can make contiguous by transform into `Bytes` or `Vec<u8>`.
101/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't
102/// reuse the same memory region anymore.
103///
104/// ```rust
105/// use bytes::Bytes;
106/// use opendal::Buffer;
107///
108/// fn test_to_vec(buf: Buffer) -> Vec<u8> {
109///     buf.to_vec()
110/// }
111///
112/// fn test_to_bytes(buf: Buffer) -> Bytes {
113///     buf.to_bytes()
114/// }
115/// ```
116#[derive(Clone)]
117pub struct Buffer(Inner);
118
119#[derive(Clone)]
120enum Inner {
121    Contiguous(Bytes),
122    NonContiguous {
123        parts: Arc<[Bytes]>,
124        size: usize,
125        idx: usize,
126        offset: usize,
127    },
128}
129
130impl Debug for Buffer {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        let mut b = f.debug_struct("Buffer");
133
134        match &self.0 {
135            Inner::Contiguous(bs) => {
136                b.field("type", &"contiguous");
137                b.field("size", &bs.len());
138            }
139            Inner::NonContiguous {
140                parts,
141                size,
142                idx,
143                offset,
144            } => {
145                b.field("type", &"non_contiguous");
146                b.field("parts", &parts);
147                b.field("size", &size);
148                b.field("idx", &idx);
149                b.field("offset", &offset);
150            }
151        }
152        b.finish_non_exhaustive()
153    }
154}
155
156impl Default for Buffer {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl Buffer {
163    /// Create a new empty buffer.
164    ///
165    /// This operation is const and no allocation will be performed.
166    #[inline]
167    pub const fn new() -> Self {
168        Self(Inner::Contiguous(Bytes::new()))
169    }
170
171    /// Get the length of the buffer.
172    #[inline]
173    pub fn len(&self) -> usize {
174        match &self.0 {
175            Inner::Contiguous(b) => b.remaining(),
176            Inner::NonContiguous { size, .. } => *size,
177        }
178    }
179
180    /// Check if buffer is empty.
181    #[inline]
182    pub fn is_empty(&self) -> bool {
183        self.len() == 0
184    }
185
186    /// Number of [`Bytes`] in [`Buffer`].
187    ///
188    /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes
189    /// available for use.
190    pub fn count(&self) -> usize {
191        match &self.0 {
192            Inner::Contiguous(_) => 1,
193            Inner::NonContiguous {
194                parts,
195                idx,
196                size,
197                offset,
198            } => {
199                parts
200                    .iter()
201                    .skip(*idx)
202                    .fold((0, size + offset), |(count, size), bytes| {
203                        if size == 0 {
204                            (count, 0)
205                        } else {
206                            (count + 1, size.saturating_sub(bytes.len()))
207                        }
208                    })
209                    .0
210            }
211        }
212    }
213
214    /// Get current [`Bytes`].
215    pub fn current(&self) -> Bytes {
216        match &self.0 {
217            Inner::Contiguous(inner) => inner.clone(),
218            Inner::NonContiguous {
219                parts,
220                idx,
221                offset,
222                size,
223            } => {
224                let chunk = &parts[*idx];
225                let n = (chunk.len() - *offset).min(*size);
226                chunk.slice(*offset..*offset + n)
227            }
228        }
229    }
230
231    /// Shortens the buffer, keeping the first `len` bytes and dropping the rest.
232    ///
233    /// If `len` is greater than the buffer’s current length, this has no effect.
234    #[inline]
235    pub fn truncate(&mut self, len: usize) {
236        match &mut self.0 {
237            Inner::Contiguous(bs) => bs.truncate(len),
238            Inner::NonContiguous { size, .. } => {
239                *size = (*size).min(len);
240            }
241        }
242    }
243
244    /// Returns a slice of self for the provided range.
245    ///
246    /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice.
247    ///
248    /// This operation is O(1).
249    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
250        let len = self.len();
251
252        let begin = match range.start_bound() {
253            Bound::Included(&n) => n,
254            Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
255            Bound::Unbounded => 0,
256        };
257
258        let end = match range.end_bound() {
259            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
260            Bound::Excluded(&n) => n,
261            Bound::Unbounded => len,
262        };
263
264        assert!(
265            begin <= end,
266            "range start must not be greater than end: {begin:?} <= {end:?}",
267        );
268        assert!(end <= len, "range end out of bounds: {end:?} <= {len:?}",);
269
270        if end == begin {
271            return Buffer::new();
272        }
273
274        let mut ret = self.clone();
275        ret.truncate(end);
276        ret.advance(begin);
277        ret
278    }
279
280    /// Combine all bytes together into one single [`Bytes`].
281    ///
282    /// This operation is zero copy if the underlying bytes are contiguous.
283    /// Otherwise, it will copy all bytes into one single [`Bytes`].
284    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
285    #[inline]
286    pub fn to_bytes(&self) -> Bytes {
287        match &self.0 {
288            Inner::Contiguous(bytes) => bytes.clone(),
289            Inner::NonContiguous {
290                parts,
291                size,
292                idx: _,
293                offset,
294            } => {
295                if parts.len() == 1 {
296                    parts[0].slice(*offset..(*offset + *size))
297                } else {
298                    let mut ret = BytesMut::with_capacity(self.len());
299                    ret.put(self.clone());
300                    ret.freeze()
301                }
302            }
303        }
304    }
305
306    /// Combine all bytes together into one single [`Vec<u8>`].
307    ///
308    /// This operation is not zero copy, it will copy all bytes into one single [`Vec<u8>`].
309    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
310    #[inline]
311    pub fn to_vec(&self) -> Vec<u8> {
312        let mut ret = Vec::with_capacity(self.len());
313        ret.put(self.clone());
314        ret
315    }
316
317    /// Convert buffer into a slice of [`IoSlice`] for vectored write.
318    #[inline]
319    pub fn to_io_slice(&self) -> Vec<IoSlice<'_>> {
320        match &self.0 {
321            Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())],
322            Inner::NonContiguous {
323                parts, idx, offset, ..
324            } => {
325                let mut ret = Vec::with_capacity(parts.len() - *idx);
326                let mut new_offset = *offset;
327                for part in parts.iter().skip(*idx) {
328                    ret.push(IoSlice::new(&part[new_offset..]));
329                    new_offset = 0;
330                }
331                ret
332            }
333        }
334    }
335}
336
337impl From<Vec<u8>> for Buffer {
338    #[inline]
339    fn from(bs: Vec<u8>) -> Self {
340        Self(Inner::Contiguous(bs.into()))
341    }
342}
343
344impl From<Bytes> for Buffer {
345    #[inline]
346    fn from(bs: Bytes) -> Self {
347        Self(Inner::Contiguous(bs))
348    }
349}
350
351impl From<String> for Buffer {
352    #[inline]
353    fn from(s: String) -> Self {
354        Self(Inner::Contiguous(Bytes::from(s)))
355    }
356}
357
358impl From<&'static [u8]> for Buffer {
359    #[inline]
360    fn from(s: &'static [u8]) -> Self {
361        Self(Inner::Contiguous(Bytes::from_static(s)))
362    }
363}
364
365impl From<&'static str> for Buffer {
366    #[inline]
367    fn from(s: &'static str) -> Self {
368        Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
369    }
370}
371
372impl FromIterator<u8> for Buffer {
373    #[inline]
374    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
375        Self(Inner::Contiguous(Bytes::from_iter(iter)))
376    }
377}
378
379impl From<VecDeque<Bytes>> for Buffer {
380    #[inline]
381    fn from(bs: VecDeque<Bytes>) -> Self {
382        let size = bs.iter().map(Bytes::len).sum();
383        Self(Inner::NonContiguous {
384            parts: Vec::from(bs).into(),
385            size,
386            idx: 0,
387            offset: 0,
388        })
389    }
390}
391
392impl From<Vec<Bytes>> for Buffer {
393    #[inline]
394    fn from(bs: Vec<Bytes>) -> Self {
395        let size = bs.iter().map(Bytes::len).sum();
396        Self(Inner::NonContiguous {
397            parts: bs.into(),
398            size,
399            idx: 0,
400            offset: 0,
401        })
402    }
403}
404
405impl From<Arc<[Bytes]>> for Buffer {
406    #[inline]
407    fn from(bs: Arc<[Bytes]>) -> Self {
408        let size = bs.iter().map(Bytes::len).sum();
409        Self(Inner::NonContiguous {
410            parts: bs,
411            size,
412            idx: 0,
413            offset: 0,
414        })
415    }
416}
417
418impl FromIterator<Bytes> for Buffer {
419    #[inline]
420    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
421        let mut size = 0;
422        let bs = iter.into_iter().inspect(|v| size += v.len());
423        // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead
424        // of iterator -> `Vec<Bytes>` -> `Arc<[Bytes]>`.
425        let parts = Arc::from_iter(bs);
426        Self(Inner::NonContiguous {
427            parts,
428            size,
429            idx: 0,
430            offset: 0,
431        })
432    }
433}
434
435impl Buf for Buffer {
436    #[inline]
437    fn remaining(&self) -> usize {
438        self.len()
439    }
440
441    #[inline]
442    fn chunk(&self) -> &[u8] {
443        match &self.0 {
444            Inner::Contiguous(b) => b.chunk(),
445            Inner::NonContiguous {
446                parts,
447                size,
448                idx,
449                offset,
450            } => {
451                if *size == 0 {
452                    return &[];
453                }
454
455                let chunk = &parts[*idx];
456                let n = (chunk.len() - *offset).min(*size);
457                &parts[*idx][*offset..*offset + n]
458            }
459        }
460    }
461
462    #[inline]
463    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
464        match &self.0 {
465            Inner::Contiguous(b) => {
466                if dst.is_empty() {
467                    return 0;
468                }
469
470                dst[0] = IoSlice::new(b.chunk());
471                1
472            }
473            Inner::NonContiguous {
474                parts, idx, offset, ..
475            } => {
476                if dst.is_empty() {
477                    return 0;
478                }
479
480                let mut new_offset = *offset;
481                parts
482                    .iter()
483                    .skip(*idx)
484                    .zip(dst.iter_mut())
485                    .map(|(part, dst)| {
486                        *dst = IoSlice::new(&part[new_offset..]);
487                        new_offset = 0;
488                    })
489                    .count()
490            }
491        }
492    }
493
494    #[inline]
495    fn advance(&mut self, cnt: usize) {
496        match &mut self.0 {
497            Inner::Contiguous(b) => b.advance(cnt),
498            Inner::NonContiguous {
499                parts,
500                size,
501                idx,
502                offset,
503            } => {
504                assert!(
505                    cnt <= *size,
506                    "cannot advance past {cnt} bytes, only {size} bytes left"
507                );
508
509                let mut new_idx = *idx;
510                let mut new_offset = *offset;
511                let mut remaining_cnt = cnt;
512                while remaining_cnt > 0 {
513                    let part_len = parts[new_idx].len();
514                    let remaining_in_part = part_len - new_offset;
515
516                    if remaining_cnt < remaining_in_part {
517                        new_offset += remaining_cnt;
518                        break;
519                    }
520
521                    remaining_cnt -= remaining_in_part;
522                    new_idx += 1;
523                    new_offset = 0;
524                }
525
526                *idx = new_idx;
527                *offset = new_offset;
528                *size -= cnt;
529            }
530        }
531    }
532}
533
534impl Iterator for Buffer {
535    type Item = Bytes;
536
537    fn next(&mut self) -> Option<Self::Item> {
538        match &mut self.0 {
539            Inner::Contiguous(bs) => {
540                if bs.is_empty() {
541                    None
542                } else {
543                    Some(mem::take(bs))
544                }
545            }
546            Inner::NonContiguous {
547                parts,
548                size,
549                idx,
550                offset,
551            } => {
552                if *size == 0 {
553                    return None;
554                }
555
556                let chunk = &parts[*idx];
557                let n = (chunk.len() - *offset).min(*size);
558                let buf = chunk.slice(*offset..*offset + n);
559                *size -= n;
560                *offset += n;
561
562                if *offset == chunk.len() {
563                    *idx += 1;
564                    *offset = 0;
565                }
566
567                Some(buf)
568            }
569        }
570    }
571
572    fn size_hint(&self) -> (usize, Option<usize>) {
573        match &self.0 {
574            Inner::Contiguous(bs) => {
575                if bs.is_empty() {
576                    (0, Some(0))
577                } else {
578                    (1, Some(1))
579                }
580            }
581            Inner::NonContiguous { parts, idx, .. } => {
582                let remaining = parts.len().saturating_sub(*idx);
583                (remaining, Some(remaining))
584            }
585        }
586    }
587}
588
589impl Stream for Buffer {
590    type Item = Result<Bytes, Infallible>;
591
592    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
593        Poll::Ready(self.get_mut().next().map(Ok))
594    }
595
596    fn size_hint(&self) -> (usize, Option<usize>) {
597        Iterator::size_hint(self)
598    }
599}
600
601impl Read for Buffer {
602    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
603        let chunk = self.chunk();
604        let len = chunk.len().min(buf.len());
605        buf[..len].copy_from_slice(&chunk[..len]);
606        self.advance(len);
607        Ok(len)
608    }
609}
610
611impl Seek for Buffer {
612    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
613        let len = self.len() as u64;
614        let new_pos = match pos {
615            SeekFrom::Start(offset) => offset,
616            SeekFrom::End(offset) => {
617                if offset < 0 {
618                    len.checked_sub(offset.unsigned_abs())
619                        .ok_or(io::Error::new(
620                            io::ErrorKind::InvalidInput,
621                            "invalid seek to a negative position",
622                        ))?
623                } else {
624                    len.checked_add(offset as u64).ok_or(io::Error::new(
625                        io::ErrorKind::InvalidInput,
626                        "seek out of bounds",
627                    ))?
628                }
629            }
630            SeekFrom::Current(offset) => {
631                let current_pos = (len - self.remaining() as u64) as i64;
632                let new_pos = current_pos.checked_add(offset).ok_or(io::Error::new(
633                    io::ErrorKind::InvalidInput,
634                    "seek out of bounds",
635                ))?;
636                if new_pos < 0 {
637                    return Err(io::Error::new(
638                        io::ErrorKind::InvalidInput,
639                        "invalid seek to a negative position",
640                    ));
641                }
642                new_pos as u64
643            }
644        };
645
646        if new_pos > len {
647            return Err(io::Error::new(
648                io::ErrorKind::InvalidInput,
649                "seek out of bounds",
650            ));
651        }
652
653        self.advance((new_pos - (len - self.remaining() as u64)) as usize);
654        Ok(new_pos)
655    }
656}
657
658impl BufRead for Buffer {
659    fn fill_buf(&mut self) -> io::Result<&[u8]> {
660        let chunk = match &self.0 {
661            Inner::Contiguous(b) => b.chunk(),
662            Inner::NonContiguous {
663                parts,
664                size,
665                idx,
666                offset,
667            } => {
668                if *size == 0 {
669                    return Ok(&[]);
670                }
671
672                let chunk = &parts[*idx];
673                let n = (chunk.len() - *offset).min(*size);
674                &parts[*idx][*offset..*offset + n]
675            }
676        };
677        Ok(chunk)
678    }
679
680    fn consume(&mut self, amt: usize) {
681        self.advance(amt);
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use std::io::BufRead;
688    use std::io::Read;
689    use std::io::Seek;
690    use std::io::SeekFrom;
691
692    use pretty_assertions::assert_eq;
693    use rand::prelude::*;
694
695    use super::*;
696
697    const EMPTY_SLICE: &[u8] = &[];
698
699    #[test]
700    fn test_contiguous_buffer() {
701        let mut buf = Buffer::new();
702
703        assert_eq!(buf.remaining(), 0);
704        assert_eq!(buf.chunk(), EMPTY_SLICE);
705        assert_eq!(buf.next(), None);
706    }
707
708    #[test]
709    fn test_empty_non_contiguous_buffer() {
710        let mut buf = Buffer::from(vec![Bytes::new()]);
711
712        assert_eq!(buf.remaining(), 0);
713        assert_eq!(buf.chunk(), EMPTY_SLICE);
714        assert_eq!(buf.next(), None);
715    }
716
717    #[test]
718    fn test_non_contiguous_buffer_with_empty_chunks() {
719        let mut buf = Buffer::from(vec![Bytes::from("a")]);
720
721        assert_eq!(buf.remaining(), 1);
722        assert_eq!(buf.chunk(), b"a");
723
724        buf.advance(1);
725
726        assert_eq!(buf.remaining(), 0);
727        assert_eq!(buf.chunk(), EMPTY_SLICE);
728    }
729
730    #[test]
731    fn test_non_contiguous_buffer_with_next() {
732        let mut buf = Buffer::from(vec![Bytes::from("a")]);
733
734        assert_eq!(buf.remaining(), 1);
735        assert_eq!(buf.chunk(), b"a");
736
737        let bs = buf.next();
738
739        assert_eq!(bs, Some(Bytes::from("a")));
740        assert_eq!(buf.remaining(), 0);
741        assert_eq!(buf.chunk(), EMPTY_SLICE);
742    }
743
744    #[test]
745    fn test_buffer_advance() {
746        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
747
748        assert_eq!(buf.remaining(), 3);
749        assert_eq!(buf.chunk(), b"a");
750
751        buf.advance(1);
752
753        assert_eq!(buf.remaining(), 2);
754        assert_eq!(buf.chunk(), b"b");
755
756        buf.advance(1);
757
758        assert_eq!(buf.remaining(), 1);
759        assert_eq!(buf.chunk(), b"c");
760
761        buf.advance(1);
762
763        assert_eq!(buf.remaining(), 0);
764        assert_eq!(buf.chunk(), EMPTY_SLICE);
765
766        buf.advance(0);
767
768        assert_eq!(buf.remaining(), 0);
769        assert_eq!(buf.chunk(), EMPTY_SLICE);
770    }
771
772    #[test]
773    fn test_buffer_truncate() {
774        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
775
776        assert_eq!(buf.remaining(), 3);
777        assert_eq!(buf.chunk(), b"a");
778
779        buf.truncate(100);
780
781        assert_eq!(buf.remaining(), 3);
782        assert_eq!(buf.chunk(), b"a");
783
784        buf.truncate(2);
785
786        assert_eq!(buf.remaining(), 2);
787        assert_eq!(buf.chunk(), b"a");
788
789        buf.truncate(0);
790
791        assert_eq!(buf.remaining(), 0);
792        assert_eq!(buf.chunk(), EMPTY_SLICE);
793    }
794
795    /// This setup will return
796    ///
797    /// - A buffer
798    /// - Total size of this buffer.
799    /// - Total content of this buffer.
800    fn setup_buffer() -> (Buffer, usize, Bytes) {
801        let mut rng = thread_rng();
802
803        let bs = (0..100)
804            .map(|_| {
805                let len = rng.gen_range(1..100);
806                let mut buf = vec![0; len];
807                rng.fill(&mut buf[..]);
808                Bytes::from(buf)
809            })
810            .collect::<Vec<_>>();
811
812        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
813        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
814        let buf = Buffer::from(bs);
815
816        (buf, total_size, total_content)
817    }
818
819    #[test]
820    fn fuzz_buffer_advance() {
821        let mut rng = thread_rng();
822
823        let (mut buf, total_size, total_content) = setup_buffer();
824        assert_eq!(buf.remaining(), total_size);
825        assert_eq!(buf.to_bytes(), total_content);
826
827        let mut cur = 0;
828        // Loop at most 10000 times.
829        let mut times = 10000;
830        while !buf.is_empty() && times > 0 {
831            times -= 1;
832
833            let cnt = rng.gen_range(0..total_size - cur);
834            cur += cnt;
835            buf.advance(cnt);
836
837            assert_eq!(buf.remaining(), total_size - cur);
838            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
839        }
840    }
841
842    #[test]
843    fn fuzz_buffer_iter() {
844        let mut rng = thread_rng();
845
846        let (mut buf, total_size, total_content) = setup_buffer();
847        assert_eq!(buf.remaining(), total_size);
848        assert_eq!(buf.to_bytes(), total_content);
849
850        let mut cur = 0;
851        while buf.is_empty() {
852            let cnt = rng.gen_range(0..total_size - cur);
853            cur += cnt;
854            buf.advance(cnt);
855
856            // Before next
857            assert_eq!(buf.remaining(), total_size - cur);
858            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
859
860            if let Some(bs) = buf.next() {
861                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
862                cur += bs.len();
863            }
864
865            // After next
866            assert_eq!(buf.remaining(), total_size - cur);
867            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
868        }
869    }
870
871    #[test]
872    fn fuzz_buffer_truncate() {
873        let mut rng = thread_rng();
874
875        let (mut buf, total_size, total_content) = setup_buffer();
876        assert_eq!(buf.remaining(), total_size);
877        assert_eq!(buf.to_bytes(), total_content);
878
879        let mut cur = 0;
880        while buf.is_empty() {
881            let cnt = rng.gen_range(0..total_size - cur);
882            cur += cnt;
883            buf.advance(cnt);
884
885            // Before truncate
886            assert_eq!(buf.remaining(), total_size - cur);
887            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
888
889            let truncate_size = rng.gen_range(0..total_size - cur);
890            buf.truncate(truncate_size);
891
892            // After truncate
893            assert_eq!(buf.remaining(), truncate_size);
894            assert_eq!(
895                buf.to_bytes(),
896                total_content.slice(cur..cur + truncate_size)
897            );
898
899            // Try next after truncate
900            if let Some(bs) = buf.next() {
901                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
902                cur += bs.len();
903            }
904
905            // After next
906            assert_eq!(buf.remaining(), total_size - cur);
907            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
908        }
909    }
910
911    #[test]
912    fn test_read_trait() {
913        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
914        let mut output = vec![0; 5];
915        let size = buffer.read(&mut output).unwrap();
916        assert_eq!(size, 5);
917        assert_eq!(&output, b"Hello");
918    }
919
920    #[test]
921    fn test_seek_trait() {
922        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
923        buffer.seek(SeekFrom::Start(5)).unwrap();
924        let mut output = vec![0; 5];
925        buffer.read_exact(&mut output).unwrap();
926        assert_eq!(&output, b"World");
927    }
928
929    #[test]
930    fn test_bufread_trait() {
931        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
932        let mut output = String::new();
933        buffer.read_to_string(&mut output).unwrap();
934        assert_eq!(output, "HelloWorld");
935
936        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
937        let buf = buffer.fill_buf().unwrap();
938        assert_eq!(buf, b"Hello");
939        buffer.consume(5);
940        let buf = buffer.fill_buf().unwrap();
941        assert_eq!(buf, b"World");
942    }
943
944    #[test]
945    fn test_read_partial() {
946        let mut buffer = Buffer::from(vec![Bytes::from("Partial"), Bytes::from("Read")]);
947        let mut output = vec![0; 4];
948        let size = buffer.read(&mut output).unwrap();
949        assert_eq!(size, 4);
950        assert_eq!(&output, b"Part");
951
952        let size = buffer.read(&mut output).unwrap();
953        assert_eq!(size, 3);
954        assert_eq!(&output[..3], b"ial");
955    }
956
957    #[test]
958    fn test_seek_and_read() {
959        let mut buffer = Buffer::from(vec![Bytes::from("SeekAndRead")]);
960        buffer.seek(SeekFrom::Start(4)).unwrap();
961        let mut output = vec![0; 3];
962        buffer.read_exact(&mut output).unwrap();
963        assert_eq!(&output, b"And");
964    }
965
966    #[test]
967    fn test_bufread_consume() {
968        let mut buffer = Buffer::from(vec![Bytes::from("ConsumeTest")]);
969        let buf = buffer.fill_buf().unwrap();
970        assert_eq!(buf, b"ConsumeTest");
971        buffer.consume(7);
972        let buf = buffer.fill_buf().unwrap();
973        assert_eq!(buf, b"Test");
974    }
975
976    #[test]
977    fn test_empty_buffer() {
978        let mut buffer = Buffer::new();
979        let mut output = vec![0; 5];
980        let size = buffer.read(&mut output).unwrap();
981        assert_eq!(size, 0);
982        assert_eq!(&output, &[0; 5]);
983    }
984
985    #[test]
986    fn test_seek_out_of_bounds() {
987        let mut buffer = Buffer::from(vec![Bytes::from("OutOfBounds")]);
988        let result = buffer.seek(SeekFrom::Start(100));
989        assert!(result.is_err());
990    }
991}