1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::io;
use bytes::Buf;
use bytes::Bytes;
use crate::raw::*;
/// StdIterator is the adapter of [`Iterator`] for [`BlockingReader`][crate::BlockingReader].
///
/// Users can use this adapter in cases where they need to use [`Iterator`] trait.
///
/// StdIterator also implements [`Send`] and [`Sync`].
pub struct StdBytesIterator {
inner: oio::BlockingReader,
offset: u64,
size: u64,
cap: usize,
cur: u64,
}
impl StdBytesIterator {
/// NOTE: don't allow users to create StdIterator directly.
#[inline]
pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> Self {
StdBytesIterator {
inner: r,
offset: range.start,
size: range.end - range.start,
// TODO: should use services preferred io size.
cap: 4 * 1024 * 1024,
cur: 0,
}
}
/// Set the capacity of this reader to control the IO size.
pub fn with_capacity(mut self, cap: usize) -> Self {
self.cap = cap;
self
}
}
impl Iterator for StdBytesIterator {
type Item = io::Result<Bytes>;
fn next(&mut self) -> Option<Self::Item> {
if self.cur >= self.size {
return None;
}
let next_offset = self.offset + self.cur;
let next_size = (self.size - self.cur).min(self.cap as u64) as usize;
match self.inner.read_at(next_offset, next_size) {
Ok(buf) if !buf.has_remaining() => None,
Ok(mut buf) => {
self.cur += buf.remaining() as u64;
Some(Ok(buf.copy_to_bytes(buf.remaining())))
}
Err(err) => Some(Err(format_std_io_error(err))),
}
}
}