use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use bytes::Buf;
use futures::FutureExt;
use futures::TryFutureExt;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;
use crate::raw::Access;
use crate::raw::*;
use crate::*;
#[derive(Debug, Clone)]
pub struct PrometheusClientLayer {
metrics: PrometheusClientMetrics,
}
impl PrometheusClientLayer {
pub fn new(registry: &mut Registry) -> Self {
let metrics = PrometheusClientMetrics::register(registry);
Self { metrics }
}
}
impl<A: Access> Layer<A> for PrometheusClientLayer {
type LayeredAccess = PrometheusAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let meta = inner.info();
let scheme = meta.scheme();
let metrics = Arc::new(self.metrics.clone());
PrometheusAccessor {
inner,
metrics,
scheme,
}
}
}
type OperationLabels = [(&'static str, &'static str); 2];
type ErrorLabels = [(&'static str, &'static str); 3];
#[derive(Debug, Clone)]
struct PrometheusClientMetrics {
requests_total: Family<OperationLabels, Counter>,
errors_total: Family<ErrorLabels, Counter>,
request_duration_seconds: Family<OperationLabels, Histogram>,
bytes_histogram: Family<OperationLabels, Histogram>,
bytes_total: Family<OperationLabels, Counter>,
}
impl PrometheusClientMetrics {
pub fn register(registry: &mut Registry) -> Self {
let requests_total = Family::default();
let errors_total = Family::default();
let bytes_total = Family::default();
let request_duration_seconds = Family::<OperationLabels, _>::new_with_constructor(|| {
let buckets = histogram::exponential_buckets(0.01, 2.0, 16);
Histogram::new(buckets)
});
let bytes_histogram = Family::<OperationLabels, _>::new_with_constructor(|| {
let buckets = histogram::exponential_buckets(1.0, 2.0, 16);
Histogram::new(buckets)
});
registry.register("opendal_requests", "", requests_total.clone());
registry.register("opendal_errors", "", errors_total.clone());
registry.register(
"opendal_request_duration_seconds",
"",
request_duration_seconds.clone(),
);
registry.register("opendal_bytes_histogram", "", bytes_histogram.clone());
registry.register("opendal_bytes", "", bytes_total.clone());
Self {
requests_total,
errors_total,
request_duration_seconds,
bytes_histogram,
bytes_total,
}
}
fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) {
let labels = [
("scheme", scheme.into_static()),
("op", op.into_static()),
("err", err.into_static()),
];
self.errors_total.get_or_create(&labels).inc();
}
fn increment_request_total(&self, scheme: Scheme, op: Operation) {
let labels = [("scheme", scheme.into_static()), ("op", op.into_static())];
self.requests_total.get_or_create(&labels).inc();
}
fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) {
let labels = [("scheme", scheme.into_static()), ("op", op.into_static())];
self.bytes_histogram
.get_or_create(&labels)
.observe(bytes as f64);
self.bytes_total.get_or_create(&labels).inc_by(bytes as u64);
}
fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) {
let labels = [("scheme", scheme.into_static()), ("op", op.into_static())];
self.request_duration_seconds
.get_or_create(&labels)
.observe(duration.as_secs_f64());
}
}
#[derive(Clone)]
pub struct PrometheusAccessor<A: Access> {
inner: A,
metrics: Arc<PrometheusClientMetrics>,
scheme: Scheme,
}
impl<A: Access> Debug for PrometheusAccessor<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusAccessor")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
type Inner = A;
type Reader = PrometheusMetricWrapper<A::Reader>;
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
type Writer = PrometheusMetricWrapper<A::Writer>;
type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.metrics
.increment_request_total(self.scheme, Operation::CreateDir);
let start_time = Instant::now();
let create_res = self.inner.create_dir(path, args).await;
self.metrics.observe_request_duration(
self.scheme,
Operation::CreateDir,
start_time.elapsed(),
);
create_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::CreateDir, e.kind());
e
})
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.metrics
.increment_request_total(self.scheme, Operation::Read);
let read_res = self
.inner
.read(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::Read,
self.metrics.clone(),
self.scheme,
),
)
})
})
.await;
read_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Read, e.kind());
e
})
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.metrics
.increment_request_total(self.scheme, Operation::Write);
let write_res = self
.inner
.write(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::Write,
self.metrics.clone(),
self.scheme,
),
)
})
})
.await;
write_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Write, e.kind());
e
})
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.metrics
.increment_request_total(self.scheme, Operation::Stat);
let start_time = Instant::now();
let stat_res = self
.inner
.stat(path, args)
.inspect_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Stat, e.kind());
})
.await;
self.metrics
.observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed());
stat_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Stat, e.kind());
e
})
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.metrics
.increment_request_total(self.scheme, Operation::Delete);
let start_time = Instant::now();
let delete_res = self.inner.delete(path, args).await;
self.metrics
.observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed());
delete_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Delete, e.kind());
e
})
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.metrics
.increment_request_total(self.scheme, Operation::List);
let start_time = Instant::now();
let list_res = self.inner.list(path, args).await;
self.metrics
.observe_request_duration(self.scheme, Operation::List, start_time.elapsed());
list_res.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::List, e.kind());
e
})
}
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.metrics
.increment_request_total(self.scheme, Operation::Batch);
let start_time = Instant::now();
let result = self.inner.batch(args).await;
self.metrics
.observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed());
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Batch, e.kind());
e
})
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.metrics
.increment_request_total(self.scheme, Operation::Presign);
let start_time = Instant::now();
let result = self.inner.presign(path, args).await;
self.metrics.observe_request_duration(
self.scheme,
Operation::Presign,
start_time.elapsed(),
);
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::Presign, e.kind());
e
})
}
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingCreateDir);
let start_time = Instant::now();
let result = self.inner.blocking_create_dir(path, args);
self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingCreateDir,
start_time.elapsed(),
);
result.map_err(|e| {
self.metrics.increment_errors_total(
self.scheme,
Operation::BlockingCreateDir,
e.kind(),
);
e
})
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingRead);
let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingRead,
self.metrics.clone(),
self.scheme,
),
)
});
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::BlockingRead, e.kind());
e
})
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingWrite);
let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingWrite,
self.metrics.clone(),
self.scheme,
),
)
});
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind());
e
})
}
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingStat);
let start_time = Instant::now();
let result = self.inner.blocking_stat(path, args);
self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingStat,
start_time.elapsed(),
);
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::BlockingStat, e.kind());
e
})
}
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingDelete);
let start_time = Instant::now();
let result = self.inner.blocking_delete(path, args);
self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingDelete,
start_time.elapsed(),
);
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind());
e
})
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingList);
let start_time = Instant::now();
let result = self.inner.blocking_list(path, args);
self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingList,
start_time.elapsed(),
);
result.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, Operation::BlockingList, e.kind());
e
})
}
}
pub struct PrometheusMetricWrapper<R> {
inner: R,
op: Operation,
metrics: Arc<PrometheusClientMetrics>,
scheme: Scheme,
bytes_total: usize,
start_time: Instant,
}
impl<R> PrometheusMetricWrapper<R> {
fn new(inner: R, op: Operation, metrics: Arc<PrometheusClientMetrics>, scheme: Scheme) -> Self {
Self {
inner,
op,
metrics,
scheme,
bytes_total: 0,
start_time: Instant::now(),
}
}
}
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let start = Instant::now();
match self.inner.read_at(offset, limit).await {
Ok(bs) => {
self.metrics
.observe_bytes_total(self.scheme, self.op, bs.remaining());
self.metrics
.observe_request_duration(self.scheme, self.op, start.elapsed());
Ok(bs)
}
Err(e) => {
self.metrics
.increment_errors_total(self.scheme, self.op, e.kind());
Err(e)
}
}
}
}
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let start = Instant::now();
self.inner
.read_at(offset, limit)
.map(|bs| {
self.metrics
.observe_bytes_total(self.scheme, self.op, bs.remaining());
self.metrics
.observe_request_duration(self.scheme, self.op, start.elapsed());
bs
})
.map_err(|e| {
self.metrics
.increment_errors_total(self.scheme, self.op, e.kind());
e
})
}
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let start = Instant::now();
self.inner
.write(bs)
.await
.map(|n| {
self.metrics.observe_bytes_total(self.scheme, self.op, n);
self.metrics
.observe_request_duration(self.scheme, self.op, start.elapsed());
n
})
.map_err(|err| {
self.metrics
.increment_errors_total(self.scheme, self.op, err.kind());
err
})
}
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.metrics
.increment_errors_total(self.scheme, self.op, err.kind());
err
})
}
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
self.metrics
.increment_errors_total(self.scheme, self.op, err.kind());
err
})
}
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
self.bytes_total += n;
n
})
.map_err(|err| {
self.metrics
.increment_errors_total(self.scheme, self.op, err.kind());
err
})
}
fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
self.metrics
.increment_errors_total(self.scheme, self.op, err.kind());
err
})
}
}
impl<R> Drop for PrometheusMetricWrapper<R> {
fn drop(&mut self) {
self.metrics
.observe_bytes_total(self.scheme, self.op, self.bytes_total);
self.metrics
.observe_request_duration(self.scheme, self.op, self.start_time.elapsed());
}
}