1use std::time::Duration;
19
20use prometheus::core::AtomicI64;
21use prometheus::core::AtomicU64;
22use prometheus::core::GenericCounterVec;
23use prometheus::core::GenericGaugeVec;
24use prometheus::register_histogram_vec_with_registry;
25use prometheus::register_int_counter_vec_with_registry;
26use prometheus::register_int_gauge_vec_with_registry;
27use prometheus::HistogramVec;
28use prometheus::Registry;
29
30use crate::layers::observe;
31use crate::raw::Access;
32use crate::raw::*;
33use crate::*;
34
35#[derive(Clone, Debug)]
141pub struct PrometheusLayer {
142 interceptor: PrometheusInterceptor,
143}
144
145impl PrometheusLayer {
146 pub fn builder() -> PrometheusLayerBuilder {
177 PrometheusLayerBuilder::default()
178 }
179}
180
181impl<A: Access> Layer<A> for PrometheusLayer {
182 type LayeredAccess = observe::MetricsAccessor<A, PrometheusInterceptor>;
183
184 fn layer(&self, inner: A) -> Self::LayeredAccess {
185 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
186 }
187}
188
189pub struct PrometheusLayerBuilder {
191 bytes_buckets: Vec<f64>,
192 bytes_rate_buckets: Vec<f64>,
193 entries_buckets: Vec<f64>,
194 entries_rate_buckets: Vec<f64>,
195 duration_seconds_buckets: Vec<f64>,
196 ttfb_buckets: Vec<f64>,
197}
198
199impl Default for PrometheusLayerBuilder {
200 fn default() -> Self {
201 Self {
202 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
203 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
204 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
205 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
206 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
207 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
208 }
209 }
210}
211
212impl PrometheusLayerBuilder {
213 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
215 if !buckets.is_empty() {
216 self.bytes_buckets = buckets;
217 }
218 self
219 }
220
221 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
223 if !buckets.is_empty() {
224 self.bytes_rate_buckets = buckets;
225 }
226 self
227 }
228
229 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
231 if !buckets.is_empty() {
232 self.entries_buckets = buckets;
233 }
234 self
235 }
236
237 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
239 if !buckets.is_empty() {
240 self.entries_rate_buckets = buckets;
241 }
242 self
243 }
244
245 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
247 if !buckets.is_empty() {
248 self.duration_seconds_buckets = buckets;
249 }
250 self
251 }
252
253 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
255 if !buckets.is_empty() {
256 self.ttfb_buckets = buckets;
257 }
258 self
259 }
260
261 pub fn register(self, registry: &Registry) -> Result<PrometheusLayer> {
286 let labels = OperationLabels::names();
287 let operation_bytes = {
288 let metric = observe::MetricValue::OperationBytes(0);
289 register_histogram_vec_with_registry!(
290 metric.name(),
291 metric.help(),
292 labels.as_ref(),
293 self.bytes_buckets.clone(),
294 registry
295 )
296 .map_err(parse_prometheus_error)?
297 };
298 let operation_bytes_rate = {
299 let metric = observe::MetricValue::OperationBytesRate(0.0);
300 register_histogram_vec_with_registry!(
301 metric.name(),
302 metric.help(),
303 labels.as_ref(),
304 self.bytes_rate_buckets.clone(),
305 registry
306 )
307 .map_err(parse_prometheus_error)?
308 };
309 let operation_entries = {
310 let metric = observe::MetricValue::OperationEntries(0);
311 register_histogram_vec_with_registry!(
312 metric.name(),
313 metric.help(),
314 labels.as_ref(),
315 self.entries_buckets,
316 registry
317 )
318 .map_err(parse_prometheus_error)?
319 };
320 let operation_entries_rate = {
321 let metric = observe::MetricValue::OperationEntriesRate(0.0);
322 register_histogram_vec_with_registry!(
323 metric.name(),
324 metric.help(),
325 labels.as_ref(),
326 self.entries_rate_buckets,
327 registry
328 )
329 .map_err(parse_prometheus_error)?
330 };
331 let operation_duration_seconds = {
332 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
333 register_histogram_vec_with_registry!(
334 metric.name(),
335 metric.help(),
336 labels.as_ref(),
337 self.duration_seconds_buckets.clone(),
338 registry
339 )
340 .map_err(parse_prometheus_error)?
341 };
342 let operation_executing = {
343 let metric = observe::MetricValue::OperationExecuting(0);
344 register_int_gauge_vec_with_registry!(
345 metric.name(),
346 metric.help(),
347 labels.as_ref(),
348 registry
349 )
350 .map_err(parse_prometheus_error)?
351 };
352 let operation_ttfb_seconds = {
353 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
354 register_histogram_vec_with_registry!(
355 metric.name(),
356 metric.help(),
357 labels.as_ref(),
358 self.ttfb_buckets.clone(),
359 registry
360 )
361 .map_err(parse_prometheus_error)?
362 };
363
364 let labels_with_error = OperationLabels::names().with_error();
365 let operation_errors_total = {
366 let metric = observe::MetricValue::OperationErrorsTotal;
367 register_int_counter_vec_with_registry!(
368 metric.name(),
369 metric.help(),
370 labels_with_error.as_ref(),
371 registry
372 )
373 .map_err(parse_prometheus_error)?
374 };
375
376 let http_executing = {
377 let metric = observe::MetricValue::HttpExecuting(0);
378 register_int_gauge_vec_with_registry!(
379 metric.name(),
380 metric.help(),
381 labels.as_ref(),
382 registry
383 )
384 .map_err(parse_prometheus_error)?
385 };
386 let http_request_bytes = {
387 let metric = observe::MetricValue::HttpRequestBytes(0);
388 register_histogram_vec_with_registry!(
389 metric.name(),
390 metric.help(),
391 labels.as_ref(),
392 self.bytes_buckets.clone(),
393 registry
394 )
395 .map_err(parse_prometheus_error)?
396 };
397 let http_request_bytes_rate = {
398 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
399 register_histogram_vec_with_registry!(
400 metric.name(),
401 metric.help(),
402 labels.as_ref(),
403 self.bytes_rate_buckets.clone(),
404 registry
405 )
406 .map_err(parse_prometheus_error)?
407 };
408 let http_request_duration_seconds = {
409 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
410 register_histogram_vec_with_registry!(
411 metric.name(),
412 metric.help(),
413 labels.as_ref(),
414 self.duration_seconds_buckets.clone(),
415 registry
416 )
417 .map_err(parse_prometheus_error)?
418 };
419 let http_response_bytes = {
420 let metric = observe::MetricValue::HttpResponseBytes(0);
421 register_histogram_vec_with_registry!(
422 metric.name(),
423 metric.help(),
424 labels.as_ref(),
425 self.bytes_buckets,
426 registry
427 )
428 .map_err(parse_prometheus_error)?
429 };
430 let http_response_bytes_rate = {
431 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
432 register_histogram_vec_with_registry!(
433 metric.name(),
434 metric.help(),
435 labels.as_ref(),
436 self.bytes_rate_buckets,
437 registry
438 )
439 .map_err(parse_prometheus_error)?
440 };
441 let http_response_duration_seconds = {
442 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
443 register_histogram_vec_with_registry!(
444 metric.name(),
445 metric.help(),
446 labels.as_ref(),
447 self.duration_seconds_buckets,
448 registry
449 )
450 .map_err(parse_prometheus_error)?
451 };
452 let http_connection_errors_total = {
453 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
454 register_int_counter_vec_with_registry!(
455 metric.name(),
456 metric.help(),
457 labels.as_ref(),
458 registry
459 )
460 .map_err(parse_prometheus_error)?
461 };
462
463 let labels_with_status_code = OperationLabels::names().with_status_code();
464 let http_status_errors_total = {
465 let metric = observe::MetricValue::HttpStatusErrorsTotal;
466 register_int_counter_vec_with_registry!(
467 metric.name(),
468 metric.help(),
469 labels_with_status_code.as_ref(),
470 registry
471 )
472 .map_err(parse_prometheus_error)?
473 };
474
475 Ok(PrometheusLayer {
476 interceptor: PrometheusInterceptor {
477 operation_bytes,
478 operation_bytes_rate,
479 operation_entries,
480 operation_entries_rate,
481 operation_duration_seconds,
482 operation_errors_total,
483 operation_executing,
484 operation_ttfb_seconds,
485
486 http_executing,
487 http_request_bytes,
488 http_request_bytes_rate,
489 http_request_duration_seconds,
490 http_response_bytes,
491 http_response_bytes_rate,
492 http_response_duration_seconds,
493 http_connection_errors_total,
494 http_status_errors_total,
495 },
496 })
497 }
498
499 pub fn register_default(self) -> Result<PrometheusLayer> {
524 let registry = prometheus::default_registry();
525 self.register(registry)
526 }
527}
528
529fn parse_prometheus_error(err: prometheus::Error) -> Error {
531 Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err)
532}
533
534#[derive(Clone, Debug)]
535pub struct PrometheusInterceptor {
536 operation_bytes: HistogramVec,
537 operation_bytes_rate: HistogramVec,
538 operation_entries: HistogramVec,
539 operation_entries_rate: HistogramVec,
540 operation_duration_seconds: HistogramVec,
541 operation_errors_total: GenericCounterVec<AtomicU64>,
542 operation_executing: GenericGaugeVec<AtomicI64>,
543 operation_ttfb_seconds: HistogramVec,
544
545 http_executing: GenericGaugeVec<AtomicI64>,
546 http_request_bytes: HistogramVec,
547 http_request_bytes_rate: HistogramVec,
548 http_request_duration_seconds: HistogramVec,
549 http_response_bytes: HistogramVec,
550 http_response_bytes_rate: HistogramVec,
551 http_response_duration_seconds: HistogramVec,
552 http_connection_errors_total: GenericCounterVec<AtomicU64>,
553 http_status_errors_total: GenericCounterVec<AtomicU64>,
554}
555
556impl observe::MetricsIntercept for PrometheusInterceptor {
557 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
558 let labels = OperationLabels(labels);
559 match value {
560 observe::MetricValue::OperationBytes(v) => self
561 .operation_bytes
562 .with_label_values(&labels.values())
563 .observe(v as f64),
564 observe::MetricValue::OperationBytesRate(v) => self
565 .operation_bytes_rate
566 .with_label_values(&labels.values())
567 .observe(v),
568 observe::MetricValue::OperationEntries(v) => self
569 .operation_entries
570 .with_label_values(&labels.values())
571 .observe(v as f64),
572 observe::MetricValue::OperationEntriesRate(v) => self
573 .operation_entries_rate
574 .with_label_values(&labels.values())
575 .observe(v),
576 observe::MetricValue::OperationDurationSeconds(v) => self
577 .operation_duration_seconds
578 .with_label_values(&labels.values())
579 .observe(v.as_secs_f64()),
580 observe::MetricValue::OperationErrorsTotal => self
581 .operation_errors_total
582 .with_label_values(&labels.values())
583 .inc(),
584 observe::MetricValue::OperationExecuting(v) => self
585 .operation_executing
586 .with_label_values(&labels.values())
587 .add(v as i64),
588 observe::MetricValue::OperationTtfbSeconds(v) => self
589 .operation_ttfb_seconds
590 .with_label_values(&labels.values())
591 .observe(v.as_secs_f64()),
592
593 observe::MetricValue::HttpExecuting(v) => self
594 .http_executing
595 .with_label_values(&labels.values())
596 .add(v as i64),
597 observe::MetricValue::HttpRequestBytes(v) => self
598 .http_request_bytes
599 .with_label_values(&labels.values())
600 .observe(v as f64),
601 observe::MetricValue::HttpRequestBytesRate(v) => self
602 .http_request_bytes_rate
603 .with_label_values(&labels.values())
604 .observe(v),
605 observe::MetricValue::HttpRequestDurationSeconds(v) => self
606 .http_request_duration_seconds
607 .with_label_values(&labels.values())
608 .observe(v.as_secs_f64()),
609 observe::MetricValue::HttpResponseBytes(v) => self
610 .http_response_bytes
611 .with_label_values(&labels.values())
612 .observe(v as f64),
613 observe::MetricValue::HttpResponseBytesRate(v) => self
614 .http_response_bytes_rate
615 .with_label_values(&labels.values())
616 .observe(v),
617 observe::MetricValue::HttpResponseDurationSeconds(v) => self
618 .http_response_duration_seconds
619 .with_label_values(&labels.values())
620 .observe(v.as_secs_f64()),
621 observe::MetricValue::HttpConnectionErrorsTotal => self
622 .http_connection_errors_total
623 .with_label_values(&labels.values())
624 .inc(),
625 observe::MetricValue::HttpStatusErrorsTotal => self
626 .http_status_errors_total
627 .with_label_values(&labels.values())
628 .inc(),
629 }
630 }
631}
632
633struct OperationLabelNames(Vec<&'static str>);
634
635impl AsRef<[&'static str]> for OperationLabelNames {
636 fn as_ref(&self) -> &[&'static str] {
637 &self.0
638 }
639}
640
641impl OperationLabelNames {
642 fn with_error(mut self) -> Self {
643 self.0.push(observe::LABEL_ERROR);
644 self
645 }
646
647 fn with_status_code(mut self) -> Self {
648 self.0.push(observe::LABEL_STATUS_CODE);
649 self
650 }
651}
652
653#[derive(Clone, Debug, PartialEq, Eq, Hash)]
654struct OperationLabels(observe::MetricLabels);
655
656impl OperationLabels {
657 fn names() -> OperationLabelNames {
658 OperationLabelNames(vec![
659 observe::LABEL_SCHEME,
660 observe::LABEL_NAMESPACE,
661 observe::LABEL_ROOT,
662 observe::LABEL_OPERATION,
663 ])
664 }
665
666 fn values(&self) -> Vec<&str> {
667 let mut labels = Vec::with_capacity(6);
668
669 labels.extend([
670 self.0.scheme.into_static(),
671 self.0.namespace.as_ref(),
672 self.0.root.as_ref(),
673 self.0.operation,
674 ]);
675
676 if let Some(error) = self.0.error {
677 labels.push(error.into_static());
678 }
679
680 if let Some(status_code) = &self.0.status_code {
681 labels.push(status_code.as_str());
682 }
683
684 labels
685 }
686}