1use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::Level;
23use log::log;
24
25use crate::raw::*;
26use crate::*;
27
28#[derive(Debug)]
110pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
111 logger: I,
112}
113
114impl Default for LoggingLayer {
115 fn default() -> Self {
116 Self {
117 logger: DefaultLoggingInterceptor,
118 }
119 }
120}
121
122impl LoggingLayer {
123 pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
125 LoggingLayer { logger }
126 }
127}
128
129impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
130 type LayeredAccess = LoggingAccessor<A, I>;
131
132 fn layer(&self, inner: A) -> Self::LayeredAccess {
133 let info = inner.info();
134 LoggingAccessor {
135 inner,
136
137 info,
138 logger: self.logger.clone(),
139 }
140 }
141}
142
143pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
145 fn log(
161 &self,
162 info: &AccessorInfo,
163 operation: Operation,
164 context: &[(&str, &str)],
165 message: &str,
166 err: Option<&Error>,
167 );
168}
169
170#[derive(Debug, Copy, Clone, Default)]
172pub struct DefaultLoggingInterceptor;
173
174impl LoggingInterceptor for DefaultLoggingInterceptor {
175 #[inline]
176 fn log(
177 &self,
178 info: &AccessorInfo,
179 operation: Operation,
180 context: &[(&str, &str)],
181 message: &str,
182 err: Option<&Error>,
183 ) {
184 if let Some(err) = err {
185 let lvl = if err.kind() == ErrorKind::Unexpected {
187 Level::Error
188 } else {
189 Level::Warn
190 };
191
192 log!(
193 target: LOGGING_TARGET,
194 lvl,
195 "service={} name={}{}: {operation} {message} {}",
196 info.scheme(),
197 info.name(),
198 LoggingContext(context),
199 if err.kind() != ErrorKind::Unexpected {
204 format!("{err}")
205 } else {
206 format!("{err:?}")
207 }
208 );
209 }
210
211 log!(
212 target: LOGGING_TARGET,
213 Level::Debug,
214 "service={} name={}{}: {operation} {message}",
215 info.scheme(),
216 info.name(),
217 LoggingContext(context),
218 );
219 }
220}
221
222struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
223
224impl Display for LoggingContext<'_> {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 for (k, v) in self.0.iter() {
227 write!(f, " {k}={v}")?;
228 }
229 Ok(())
230 }
231}
232
233#[derive(Clone, Debug)]
234pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
235 inner: A,
236
237 info: Arc<AccessorInfo>,
238 logger: I,
239}
240
241static LOGGING_TARGET: &str = "opendal::services";
242
243impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
244 type Inner = A;
245 type Reader = LoggingReader<A::Reader, I>;
246 type Writer = LoggingWriter<A::Writer, I>;
247 type Lister = LoggingLister<A::Lister, I>;
248 type Deleter = LoggingDeleter<A::Deleter, I>;
249
250 fn inner(&self) -> &Self::Inner {
251 &self.inner
252 }
253
254 fn info(&self) -> Arc<AccessorInfo> {
255 self.info.clone()
256 }
257
258 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
259 self.logger.log(
260 &self.info,
261 Operation::CreateDir,
262 &[("path", path)],
263 "started",
264 None,
265 );
266
267 self.inner
268 .create_dir(path, args)
269 .await
270 .inspect(|_| {
271 self.logger.log(
272 &self.info,
273 Operation::CreateDir,
274 &[("path", path)],
275 "finished",
276 None,
277 );
278 })
279 .inspect_err(|err| {
280 self.logger.log(
281 &self.info,
282 Operation::CreateDir,
283 &[("path", path)],
284 "failed",
285 Some(err),
286 );
287 })
288 }
289
290 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
291 self.logger.log(
292 &self.info,
293 Operation::Read,
294 &[("path", path)],
295 "started",
296 None,
297 );
298
299 self.inner
300 .read(path, args)
301 .await
302 .map(|(rp, r)| {
303 self.logger.log(
304 &self.info,
305 Operation::Read,
306 &[("path", path)],
307 "created reader",
308 None,
309 );
310 (
311 rp,
312 LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
313 )
314 })
315 .inspect_err(|err| {
316 self.logger.log(
317 &self.info,
318 Operation::Read,
319 &[("path", path)],
320 "failed",
321 Some(err),
322 );
323 })
324 }
325
326 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327 self.logger.log(
328 &self.info,
329 Operation::Write,
330 &[("path", path)],
331 "started",
332 None,
333 );
334
335 self.inner
336 .write(path, args)
337 .await
338 .map(|(rp, w)| {
339 self.logger.log(
340 &self.info,
341 Operation::Write,
342 &[("path", path)],
343 "created writer",
344 None,
345 );
346 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
347 (rp, w)
348 })
349 .inspect_err(|err| {
350 self.logger.log(
351 &self.info,
352 Operation::Write,
353 &[("path", path)],
354 "failed",
355 Some(err),
356 );
357 })
358 }
359
360 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
361 self.logger.log(
362 &self.info,
363 Operation::Copy,
364 &[("from", from), ("to", to)],
365 "started",
366 None,
367 );
368
369 self.inner
370 .copy(from, to, args)
371 .await
372 .inspect(|_| {
373 self.logger.log(
374 &self.info,
375 Operation::Copy,
376 &[("from", from), ("to", to)],
377 "finished",
378 None,
379 );
380 })
381 .inspect_err(|err| {
382 self.logger.log(
383 &self.info,
384 Operation::Copy,
385 &[("from", from), ("to", to)],
386 "failed",
387 Some(err),
388 );
389 })
390 }
391
392 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
393 self.logger.log(
394 &self.info,
395 Operation::Rename,
396 &[("from", from), ("to", to)],
397 "started",
398 None,
399 );
400
401 self.inner
402 .rename(from, to, args)
403 .await
404 .inspect(|_| {
405 self.logger.log(
406 &self.info,
407 Operation::Rename,
408 &[("from", from), ("to", to)],
409 "finished",
410 None,
411 );
412 })
413 .inspect_err(|err| {
414 self.logger.log(
415 &self.info,
416 Operation::Rename,
417 &[("from", from), ("to", to)],
418 "failed",
419 Some(err),
420 );
421 })
422 }
423
424 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
425 self.logger.log(
426 &self.info,
427 Operation::Stat,
428 &[("path", path)],
429 "started",
430 None,
431 );
432
433 self.inner
434 .stat(path, args)
435 .await
436 .inspect(|_| {
437 self.logger.log(
438 &self.info,
439 Operation::Stat,
440 &[("path", path)],
441 "finished",
442 None,
443 );
444 })
445 .inspect_err(|err| {
446 self.logger.log(
447 &self.info,
448 Operation::Stat,
449 &[("path", path)],
450 "failed",
451 Some(err),
452 );
453 })
454 }
455
456 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
457 self.logger
458 .log(&self.info, Operation::Delete, &[], "started", None);
459
460 self.inner
461 .delete()
462 .await
463 .map(|(rp, d)| {
464 self.logger
465 .log(&self.info, Operation::Delete, &[], "finished", None);
466 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
467 (rp, d)
468 })
469 .inspect_err(|err| {
470 self.logger
471 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
472 })
473 }
474
475 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
476 self.logger.log(
477 &self.info,
478 Operation::List,
479 &[("path", path)],
480 "started",
481 None,
482 );
483
484 self.inner
485 .list(path, args)
486 .await
487 .map(|(rp, v)| {
488 self.logger.log(
489 &self.info,
490 Operation::List,
491 &[("path", path)],
492 "created lister",
493 None,
494 );
495 let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
496 (rp, streamer)
497 })
498 .inspect_err(|err| {
499 self.logger.log(
500 &self.info,
501 Operation::List,
502 &[("path", path)],
503 "failed",
504 Some(err),
505 );
506 })
507 }
508
509 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
510 self.logger.log(
511 &self.info,
512 Operation::Presign,
513 &[("path", path)],
514 "started",
515 None,
516 );
517
518 self.inner
519 .presign(path, args)
520 .await
521 .inspect(|_| {
522 self.logger.log(
523 &self.info,
524 Operation::Presign,
525 &[("path", path)],
526 "finished",
527 None,
528 );
529 })
530 .inspect_err(|err| {
531 self.logger.log(
532 &self.info,
533 Operation::Presign,
534 &[("path", path)],
535 "failed",
536 Some(err),
537 );
538 })
539 }
540}
541
542pub struct LoggingReader<R, I: LoggingInterceptor> {
543 info: Arc<AccessorInfo>,
544 logger: I,
545 path: String,
546
547 read: u64,
548 inner: R,
549}
550
551impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
552 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
553 Self {
554 info,
555 logger,
556 path: path.to_string(),
557
558 read: 0,
559 inner: reader,
560 }
561 }
562}
563
564impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
565 async fn read(&mut self) -> Result<Buffer> {
566 match self.inner.read().await {
567 Ok(bs) if bs.is_empty() => {
568 self.logger.log(
569 &self.info,
570 Operation::Read,
571 &[
572 ("path", &self.path),
573 ("read", &self.read.to_string()),
574 ("size", &bs.len().to_string()),
575 ],
576 "finished",
577 None,
578 );
579 Ok(bs)
580 }
581 Ok(bs) => {
582 self.read += bs.len() as u64;
583 Ok(bs)
584 }
585 Err(err) => {
586 self.logger.log(
587 &self.info,
588 Operation::Read,
589 &[("path", &self.path), ("read", &self.read.to_string())],
590 "failed",
591 Some(&err),
592 );
593 Err(err)
594 }
595 }
596 }
597}
598
599pub struct LoggingWriter<W, I> {
600 info: Arc<AccessorInfo>,
601 logger: I,
602 path: String,
603
604 written: u64,
605 inner: W,
606}
607
608impl<W, I> LoggingWriter<W, I> {
609 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
610 Self {
611 info,
612 logger,
613 path: path.to_string(),
614
615 written: 0,
616 inner: writer,
617 }
618 }
619}
620
621impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
622 async fn write(&mut self, bs: Buffer) -> Result<()> {
623 let size = bs.len();
624
625 match self.inner.write(bs).await {
626 Ok(_) => {
627 self.written += size as u64;
628 Ok(())
629 }
630 Err(err) => {
631 self.logger.log(
632 &self.info,
633 Operation::Write,
634 &[
635 ("path", &self.path),
636 ("written", &self.written.to_string()),
637 ("size", &size.to_string()),
638 ],
639 "failed",
640 Some(&err),
641 );
642 Err(err)
643 }
644 }
645 }
646
647 async fn abort(&mut self) -> Result<()> {
648 match self.inner.abort().await {
649 Ok(_) => {
650 self.logger.log(
651 &self.info,
652 Operation::Write,
653 &[("path", &self.path), ("written", &self.written.to_string())],
654 "abort succeeded",
655 None,
656 );
657 Ok(())
658 }
659 Err(err) => {
660 self.logger.log(
661 &self.info,
662 Operation::Write,
663 &[("path", &self.path), ("written", &self.written.to_string())],
664 "abort failed",
665 Some(&err),
666 );
667 Err(err)
668 }
669 }
670 }
671
672 async fn close(&mut self) -> Result<Metadata> {
673 match self.inner.close().await {
674 Ok(meta) => {
675 self.logger.log(
676 &self.info,
677 Operation::Write,
678 &[("path", &self.path), ("written", &self.written.to_string())],
679 "close succeeded",
680 None,
681 );
682 Ok(meta)
683 }
684 Err(err) => {
685 self.logger.log(
686 &self.info,
687 Operation::Write,
688 &[("path", &self.path), ("written", &self.written.to_string())],
689 "close failed",
690 Some(&err),
691 );
692 Err(err)
693 }
694 }
695 }
696}
697
698pub struct LoggingLister<P, I: LoggingInterceptor> {
699 info: Arc<AccessorInfo>,
700 logger: I,
701 path: String,
702
703 listed: usize,
704 inner: P,
705}
706
707impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
708 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
709 Self {
710 info,
711 logger,
712 path: path.to_string(),
713
714 listed: 0,
715 inner,
716 }
717 }
718}
719
720impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
721 async fn next(&mut self) -> Result<Option<oio::Entry>> {
722 let res = self.inner.next().await;
723
724 match &res {
725 Ok(Some(_)) => {
726 self.listed += 1;
727 }
728 Ok(None) => {
729 self.logger.log(
730 &self.info,
731 Operation::List,
732 &[("path", &self.path), ("listed", &self.listed.to_string())],
733 "finished",
734 None,
735 );
736 }
737 Err(err) => {
738 self.logger.log(
739 &self.info,
740 Operation::List,
741 &[("path", &self.path), ("listed", &self.listed.to_string())],
742 "failed",
743 Some(err),
744 );
745 }
746 };
747
748 res
749 }
750}
751
752pub struct LoggingDeleter<D, I: LoggingInterceptor> {
753 info: Arc<AccessorInfo>,
754 logger: I,
755
756 deleted: usize,
757 inner: D,
758}
759
760impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
761 fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
762 Self {
763 info,
764 logger,
765
766 deleted: 0,
767 inner,
768 }
769 }
770}
771
772impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
773 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
774 let version = args
775 .version()
776 .map(|v| v.to_string())
777 .unwrap_or_else(|| "<latest>".to_string());
778
779 let res = self.inner.delete(path, args).await;
780
781 match &res {
782 Ok(_) => {
783 self.deleted += 1;
784 }
785 Err(err) => {
786 self.logger.log(
787 &self.info,
788 Operation::Delete,
789 &[
790 ("path", path),
791 ("version", &version),
792 ("deleted", &self.deleted.to_string()),
793 ],
794 "failed",
795 Some(err),
796 );
797 }
798 };
799
800 res
801 }
802
803 async fn close(&mut self) -> Result<()> {
804 let res = self.inner.close().await;
805
806 match &res {
807 Ok(_) => {
808 self.logger.log(
809 &self.info,
810 Operation::Delete,
811 &[("deleted", &self.deleted.to_string())],
812 "succeeded",
813 None,
814 );
815 }
816 Err(err) => {
817 self.logger.log(
818 &self.info,
819 Operation::Delete,
820 &[("deleted", &self.deleted.to_string())],
821 "failed",
822 Some(err),
823 );
824 }
825 };
826
827 res
828 }
829}