opendal/layers/
await_tree.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use await_tree::InstrumentAwait;
19use futures::Future;
20
21use crate::raw::*;
22use crate::*;
23
24/// Add an Instrument await-tree for actor-based applications to the underlying services.
25///
26/// # AwaitTree
27///
28/// await-tree allows developers to dump this execution tree at runtime,
29/// with the span of each Future annotated by instrument_await.
30/// Read more about [await-tree](https://docs.rs/await-tree/latest/await_tree/)
31///
32/// # Examples
33///
34/// ```no_run
35/// # use opendal::layers::AwaitTreeLayer;
36/// # use opendal::services;
37/// # use opendal::Operator;
38/// # use opendal::Result;
39///
40/// # fn main() -> Result<()> {
41/// let _ = Operator::new(services::Memory::default())?
42///     .layer(AwaitTreeLayer::new())
43///     .finish();
44/// Ok(())
45/// # }
46/// ```
47#[derive(Clone, Default)]
48pub struct AwaitTreeLayer {}
49
50impl AwaitTreeLayer {
51    /// Create a new `AwaitTreeLayer`.
52    pub fn new() -> Self {
53        Self {}
54    }
55}
56
57impl<A: Access> Layer<A> for AwaitTreeLayer {
58    type LayeredAccess = AwaitTreeAccessor<A>;
59
60    fn layer(&self, accessor: A) -> Self::LayeredAccess {
61        AwaitTreeAccessor { inner: accessor }
62    }
63}
64
65#[derive(Debug, Clone)]
66pub struct AwaitTreeAccessor<A: Access> {
67    inner: A,
68}
69
70impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
71    type Inner = A;
72    type Reader = AwaitTreeWrapper<A::Reader>;
73    type Writer = AwaitTreeWrapper<A::Writer>;
74    type Lister = AwaitTreeWrapper<A::Lister>;
75    type Deleter = AwaitTreeWrapper<A::Deleter>;
76
77    fn inner(&self) -> &Self::Inner {
78        &self.inner
79    }
80
81    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
82        self.inner
83            .read(path, args)
84            .instrument_await(format!("opendal::{}", Operation::Read))
85            .await
86            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
87    }
88
89    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
90        self.inner
91            .write(path, args)
92            .instrument_await(format!("opendal::{}", Operation::Write))
93            .await
94            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
95    }
96
97    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
98        self.inner()
99            .copy(from, to, args)
100            .instrument_await(format!("opendal::{}", Operation::Copy))
101            .await
102    }
103
104    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
105        self.inner()
106            .rename(from, to, args)
107            .instrument_await(format!("opendal::{}", Operation::Rename))
108            .await
109    }
110
111    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
112        self.inner
113            .stat(path, args)
114            .instrument_await(format!("opendal::{}", Operation::Stat))
115            .await
116    }
117
118    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
119        self.inner
120            .delete()
121            .instrument_await(format!("opendal::{}", Operation::Delete))
122            .await
123            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
124    }
125
126    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
127        self.inner
128            .list(path, args)
129            .instrument_await(format!("opendal::{}", Operation::List))
130            .await
131            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
132    }
133
134    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
135        self.inner
136            .presign(path, args)
137            .instrument_await(format!("opendal::{}", Operation::Presign))
138            .await
139    }
140}
141
142pub struct AwaitTreeWrapper<R> {
143    inner: R,
144}
145
146impl<R> AwaitTreeWrapper<R> {
147    fn new(inner: R) -> Self {
148        Self { inner }
149    }
150}
151
152impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
153    async fn read(&mut self) -> Result<Buffer> {
154        self.inner
155            .read()
156            .instrument_await(format!("opendal::{}", Operation::Read))
157            .await
158    }
159}
160
161impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
162    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
163        self.inner
164            .write(bs)
165            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
166    }
167
168    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
169        self.inner
170            .abort()
171            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
172    }
173
174    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
175        self.inner
176            .close()
177            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
178    }
179}
180
181impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
182    async fn next(&mut self) -> Result<Option<oio::Entry>> {
183        self.inner
184            .next()
185            .instrument_await(format!("opendal::{}", Operation::List))
186            .await
187    }
188}
189
190impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
191    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
192        self.inner.delete(path, args).await
193    }
194
195    async fn close(&mut self) -> Result<()> {
196        self.inner
197            .close()
198            .instrument_await(format!("opendal::{}", Operation::Delete))
199            .await
200    }
201}