opendal/types/write/
futures_bytes_sink.rs1use std::pin::Pin;
19use std::task::Context;
20use std::task::Poll;
21
22use bytes::Bytes;
23use futures::SinkExt;
24
25use crate::raw::*;
26use crate::*;
27
28pub struct FuturesBytesSink {
35 sink: BufferSink,
36}
37
38impl FuturesBytesSink {
39 #[inline]
41 pub(crate) fn new(w: WriteGenerator<oio::Writer>) -> Self {
42 FuturesBytesSink {
43 sink: BufferSink::new(w),
44 }
45 }
46}
47
48impl futures::Sink<Bytes> for FuturesBytesSink {
49 type Error = std::io::Error;
50
51 fn poll_ready(
52 mut self: Pin<&mut Self>,
53 cx: &mut Context<'_>,
54 ) -> Poll<Result<(), std::io::Error>> {
55 self.sink.poll_ready_unpin(cx).map_err(format_std_io_error)
56 }
57
58 fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> Result<(), std::io::Error> {
59 self.sink
60 .start_send_unpin(Buffer::from(item))
61 .map_err(format_std_io_error)
62 }
63
64 fn poll_flush(
65 mut self: Pin<&mut Self>,
66 cx: &mut Context<'_>,
67 ) -> Poll<Result<(), std::io::Error>> {
68 self.sink.poll_flush_unpin(cx).map_err(format_std_io_error)
69 }
70
71 fn poll_close(
72 mut self: Pin<&mut Self>,
73 cx: &mut Context<'_>,
74 ) -> Poll<Result<(), std::io::Error>> {
75 self.sink.poll_close_unpin(cx).map_err(format_std_io_error)
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use std::sync::Arc;
82
83 use super::*;
84 use crate::raw::MaybeSend;
85
86 #[tokio::test]
87 async fn test_trait() {
88 let op = Operator::via_iter(Scheme::Memory, []).unwrap();
89
90 let acc = op.into_inner();
91 let ctx = Arc::new(WriteContext::new(
92 acc,
93 "test".to_string(),
94 OpWrite::new(),
95 OpWriter::new().with_chunk(1),
96 ));
97 let write_gen = WriteGenerator::create(ctx).await.unwrap();
98
99 let v = FuturesBytesSink::new(write_gen);
100
101 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
102 }
103}