opendal/layers/
logging.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::Level;
23use log::log;
24
25use crate::raw::*;
26use crate::*;
27
28/// Add [log](https://docs.rs/log/) for every operation.
29///
30/// # Logging
31///
32/// - OpenDAL will log in structural way.
33/// - Every operation will start with a `started` log entry.
34/// - Every operation will finish with the following status:
35///   - `succeeded`: the operation is successful, but might have more to take.
36///   - `finished`: the whole operation is finished.
37///   - `failed`: the operation returns an unexpected error.
38/// - The default log level while expected error happened is `Warn`.
39/// - The default log level while unexpected failure happened is `Error`.
40///
41/// # Examples
42///
43/// ```no_run
44/// # use opendal::layers::LoggingLayer;
45/// # use opendal::services;
46/// # use opendal::Operator;
47/// # use opendal::Result;
48///
49/// # fn main() -> Result<()> {
50/// let _ = Operator::new(services::Memory::default())?
51///     .layer(LoggingLayer::default())
52///     .finish();
53/// Ok(())
54/// # }
55/// ```
56///
57/// # Output
58///
59/// OpenDAL is using [`log`](https://docs.rs/log/latest/log/) for logging internally.
60///
61/// To enable logging output, please set `RUST_LOG`:
62///
63/// ```shell
64/// RUST_LOG=debug ./app
65/// ```
66///
67/// To config logging output, please refer to [Configure Logging](https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html):
68///
69/// ```shell
70/// RUST_LOG="info,opendal::services=debug" ./app
71/// ```
72///
73/// # Logging Interceptor
74///
75/// You can implement your own logging interceptor to customize the logging behavior.
76///
77/// ```no_run
78/// # use opendal::layers::LoggingInterceptor;
79/// # use opendal::layers::LoggingLayer;
80/// # use opendal::raw;
81/// # use opendal::services;
82/// # use opendal::Error;
83/// # use opendal::Operator;
84/// # use opendal::Result;
85///
86/// #[derive(Debug, Clone)]
87/// struct MyLoggingInterceptor;
88///
89/// impl LoggingInterceptor for MyLoggingInterceptor {
90///     fn log(
91///         &self,
92///         info: &raw::AccessorInfo,
93///         operation: raw::Operation,
94///         context: &[(&str, &str)],
95///         message: &str,
96///         err: Option<&Error>,
97///     ) {
98///         // log something
99///     }
100/// }
101///
102/// # fn main() -> Result<()> {
103/// let _ = Operator::new(services::Memory::default())?
104///     .layer(LoggingLayer::new(MyLoggingInterceptor))
105///     .finish();
106/// Ok(())
107/// # }
108/// ```
109#[derive(Debug)]
110pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
111    logger: I,
112}
113
114impl Default for LoggingLayer {
115    fn default() -> Self {
116        Self {
117            logger: DefaultLoggingInterceptor,
118        }
119    }
120}
121
122impl LoggingLayer {
123    /// Create the layer with specific logging interceptor.
124    pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
125        LoggingLayer { logger }
126    }
127}
128
129impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
130    type LayeredAccess = LoggingAccessor<A, I>;
131
132    fn layer(&self, inner: A) -> Self::LayeredAccess {
133        let info = inner.info();
134        LoggingAccessor {
135            inner,
136
137            info,
138            logger: self.logger.clone(),
139        }
140    }
141}
142
143/// LoggingInterceptor is used to intercept the log.
144pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
145    /// Everytime there is a log, this function will be called.
146    ///
147    /// # Inputs
148    ///
149    /// - info: The service's access info.
150    /// - operation: The operation to log.
151    /// - context: Additional context of the log like path, etc.
152    /// - message: The log message.
153    /// - err: The error to log.
154    ///
155    /// # Note
156    ///
157    /// Users should avoid calling resource-intensive operations such as I/O or network
158    /// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal
159    /// could perform unexpectedly slow.
160    fn log(
161        &self,
162        info: &AccessorInfo,
163        operation: Operation,
164        context: &[(&str, &str)],
165        message: &str,
166        err: Option<&Error>,
167    );
168}
169
170/// The DefaultLoggingInterceptor will log the message by the standard logging macro.
171#[derive(Debug, Copy, Clone, Default)]
172pub struct DefaultLoggingInterceptor;
173
174impl LoggingInterceptor for DefaultLoggingInterceptor {
175    #[inline]
176    fn log(
177        &self,
178        info: &AccessorInfo,
179        operation: Operation,
180        context: &[(&str, &str)],
181        message: &str,
182        err: Option<&Error>,
183    ) {
184        if let Some(err) = err {
185            // Print error if it's unexpected, otherwise in warn.
186            let lvl = if err.kind() == ErrorKind::Unexpected {
187                Level::Error
188            } else {
189                Level::Warn
190            };
191
192            log!(
193                target: LOGGING_TARGET,
194                lvl,
195                "service={} name={}{}: {operation} {message} {}",
196                info.scheme(),
197                info.name(),
198                LoggingContext(context),
199                // Print error message with debug output while unexpected happened.
200                //
201                // It's super sad that we can't bind `format_args!()` here.
202                // See: https://github.com/rust-lang/rust/issues/92698
203                if err.kind() != ErrorKind::Unexpected {
204                   format!("{err}")
205                } else {
206                   format!("{err:?}")
207                }
208            );
209        }
210
211        log!(
212            target: LOGGING_TARGET,
213            Level::Debug,
214            "service={} name={}{}: {operation} {message}",
215            info.scheme(),
216            info.name(),
217            LoggingContext(context),
218        );
219    }
220}
221
222struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
223
224impl Display for LoggingContext<'_> {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        for (k, v) in self.0.iter() {
227            write!(f, " {k}={v}")?;
228        }
229        Ok(())
230    }
231}
232
233#[derive(Clone, Debug)]
234pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
235    inner: A,
236
237    info: Arc<AccessorInfo>,
238    logger: I,
239}
240
241static LOGGING_TARGET: &str = "opendal::services";
242
243impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
244    type Inner = A;
245    type Reader = LoggingReader<A::Reader, I>;
246    type Writer = LoggingWriter<A::Writer, I>;
247    type Lister = LoggingLister<A::Lister, I>;
248    type Deleter = LoggingDeleter<A::Deleter, I>;
249
250    fn inner(&self) -> &Self::Inner {
251        &self.inner
252    }
253
254    fn info(&self) -> Arc<AccessorInfo> {
255        self.info.clone()
256    }
257
258    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
259        self.logger.log(
260            &self.info,
261            Operation::CreateDir,
262            &[("path", path)],
263            "started",
264            None,
265        );
266
267        self.inner
268            .create_dir(path, args)
269            .await
270            .inspect(|_| {
271                self.logger.log(
272                    &self.info,
273                    Operation::CreateDir,
274                    &[("path", path)],
275                    "finished",
276                    None,
277                );
278            })
279            .inspect_err(|err| {
280                self.logger.log(
281                    &self.info,
282                    Operation::CreateDir,
283                    &[("path", path)],
284                    "failed",
285                    Some(err),
286                );
287            })
288    }
289
290    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
291        self.logger.log(
292            &self.info,
293            Operation::Read,
294            &[("path", path)],
295            "started",
296            None,
297        );
298
299        self.inner
300            .read(path, args)
301            .await
302            .map(|(rp, r)| {
303                self.logger.log(
304                    &self.info,
305                    Operation::Read,
306                    &[("path", path)],
307                    "created reader",
308                    None,
309                );
310                (
311                    rp,
312                    LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
313                )
314            })
315            .inspect_err(|err| {
316                self.logger.log(
317                    &self.info,
318                    Operation::Read,
319                    &[("path", path)],
320                    "failed",
321                    Some(err),
322                );
323            })
324    }
325
326    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327        self.logger.log(
328            &self.info,
329            Operation::Write,
330            &[("path", path)],
331            "started",
332            None,
333        );
334
335        self.inner
336            .write(path, args)
337            .await
338            .map(|(rp, w)| {
339                self.logger.log(
340                    &self.info,
341                    Operation::Write,
342                    &[("path", path)],
343                    "created writer",
344                    None,
345                );
346                let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
347                (rp, w)
348            })
349            .inspect_err(|err| {
350                self.logger.log(
351                    &self.info,
352                    Operation::Write,
353                    &[("path", path)],
354                    "failed",
355                    Some(err),
356                );
357            })
358    }
359
360    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
361        self.logger.log(
362            &self.info,
363            Operation::Copy,
364            &[("from", from), ("to", to)],
365            "started",
366            None,
367        );
368
369        self.inner
370            .copy(from, to, args)
371            .await
372            .inspect(|_| {
373                self.logger.log(
374                    &self.info,
375                    Operation::Copy,
376                    &[("from", from), ("to", to)],
377                    "finished",
378                    None,
379                );
380            })
381            .inspect_err(|err| {
382                self.logger.log(
383                    &self.info,
384                    Operation::Copy,
385                    &[("from", from), ("to", to)],
386                    "failed",
387                    Some(err),
388                );
389            })
390    }
391
392    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
393        self.logger.log(
394            &self.info,
395            Operation::Rename,
396            &[("from", from), ("to", to)],
397            "started",
398            None,
399        );
400
401        self.inner
402            .rename(from, to, args)
403            .await
404            .inspect(|_| {
405                self.logger.log(
406                    &self.info,
407                    Operation::Rename,
408                    &[("from", from), ("to", to)],
409                    "finished",
410                    None,
411                );
412            })
413            .inspect_err(|err| {
414                self.logger.log(
415                    &self.info,
416                    Operation::Rename,
417                    &[("from", from), ("to", to)],
418                    "failed",
419                    Some(err),
420                );
421            })
422    }
423
424    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
425        self.logger.log(
426            &self.info,
427            Operation::Stat,
428            &[("path", path)],
429            "started",
430            None,
431        );
432
433        self.inner
434            .stat(path, args)
435            .await
436            .inspect(|_| {
437                self.logger.log(
438                    &self.info,
439                    Operation::Stat,
440                    &[("path", path)],
441                    "finished",
442                    None,
443                );
444            })
445            .inspect_err(|err| {
446                self.logger.log(
447                    &self.info,
448                    Operation::Stat,
449                    &[("path", path)],
450                    "failed",
451                    Some(err),
452                );
453            })
454    }
455
456    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
457        self.logger
458            .log(&self.info, Operation::Delete, &[], "started", None);
459
460        self.inner
461            .delete()
462            .await
463            .map(|(rp, d)| {
464                self.logger
465                    .log(&self.info, Operation::Delete, &[], "finished", None);
466                let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
467                (rp, d)
468            })
469            .inspect_err(|err| {
470                self.logger
471                    .log(&self.info, Operation::Delete, &[], "failed", Some(err));
472            })
473    }
474
475    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
476        self.logger.log(
477            &self.info,
478            Operation::List,
479            &[("path", path)],
480            "started",
481            None,
482        );
483
484        self.inner
485            .list(path, args)
486            .await
487            .map(|(rp, v)| {
488                self.logger.log(
489                    &self.info,
490                    Operation::List,
491                    &[("path", path)],
492                    "created lister",
493                    None,
494                );
495                let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
496                (rp, streamer)
497            })
498            .inspect_err(|err| {
499                self.logger.log(
500                    &self.info,
501                    Operation::List,
502                    &[("path", path)],
503                    "failed",
504                    Some(err),
505                );
506            })
507    }
508
509    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
510        self.logger.log(
511            &self.info,
512            Operation::Presign,
513            &[("path", path)],
514            "started",
515            None,
516        );
517
518        self.inner
519            .presign(path, args)
520            .await
521            .inspect(|_| {
522                self.logger.log(
523                    &self.info,
524                    Operation::Presign,
525                    &[("path", path)],
526                    "finished",
527                    None,
528                );
529            })
530            .inspect_err(|err| {
531                self.logger.log(
532                    &self.info,
533                    Operation::Presign,
534                    &[("path", path)],
535                    "failed",
536                    Some(err),
537                );
538            })
539    }
540}
541
542pub struct LoggingReader<R, I: LoggingInterceptor> {
543    info: Arc<AccessorInfo>,
544    logger: I,
545    path: String,
546
547    read: u64,
548    inner: R,
549}
550
551impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
552    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
553        Self {
554            info,
555            logger,
556            path: path.to_string(),
557
558            read: 0,
559            inner: reader,
560        }
561    }
562}
563
564impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
565    async fn read(&mut self) -> Result<Buffer> {
566        match self.inner.read().await {
567            Ok(bs) if bs.is_empty() => {
568                self.logger.log(
569                    &self.info,
570                    Operation::Read,
571                    &[
572                        ("path", &self.path),
573                        ("read", &self.read.to_string()),
574                        ("size", &bs.len().to_string()),
575                    ],
576                    "finished",
577                    None,
578                );
579                Ok(bs)
580            }
581            Ok(bs) => {
582                self.read += bs.len() as u64;
583                Ok(bs)
584            }
585            Err(err) => {
586                self.logger.log(
587                    &self.info,
588                    Operation::Read,
589                    &[("path", &self.path), ("read", &self.read.to_string())],
590                    "failed",
591                    Some(&err),
592                );
593                Err(err)
594            }
595        }
596    }
597}
598
599pub struct LoggingWriter<W, I> {
600    info: Arc<AccessorInfo>,
601    logger: I,
602    path: String,
603
604    written: u64,
605    inner: W,
606}
607
608impl<W, I> LoggingWriter<W, I> {
609    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
610        Self {
611            info,
612            logger,
613            path: path.to_string(),
614
615            written: 0,
616            inner: writer,
617        }
618    }
619}
620
621impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
622    async fn write(&mut self, bs: Buffer) -> Result<()> {
623        let size = bs.len();
624
625        match self.inner.write(bs).await {
626            Ok(_) => {
627                self.written += size as u64;
628                Ok(())
629            }
630            Err(err) => {
631                self.logger.log(
632                    &self.info,
633                    Operation::Write,
634                    &[
635                        ("path", &self.path),
636                        ("written", &self.written.to_string()),
637                        ("size", &size.to_string()),
638                    ],
639                    "failed",
640                    Some(&err),
641                );
642                Err(err)
643            }
644        }
645    }
646
647    async fn abort(&mut self) -> Result<()> {
648        match self.inner.abort().await {
649            Ok(_) => {
650                self.logger.log(
651                    &self.info,
652                    Operation::Write,
653                    &[("path", &self.path), ("written", &self.written.to_string())],
654                    "abort succeeded",
655                    None,
656                );
657                Ok(())
658            }
659            Err(err) => {
660                self.logger.log(
661                    &self.info,
662                    Operation::Write,
663                    &[("path", &self.path), ("written", &self.written.to_string())],
664                    "abort failed",
665                    Some(&err),
666                );
667                Err(err)
668            }
669        }
670    }
671
672    async fn close(&mut self) -> Result<Metadata> {
673        match self.inner.close().await {
674            Ok(meta) => {
675                self.logger.log(
676                    &self.info,
677                    Operation::Write,
678                    &[("path", &self.path), ("written", &self.written.to_string())],
679                    "close succeeded",
680                    None,
681                );
682                Ok(meta)
683            }
684            Err(err) => {
685                self.logger.log(
686                    &self.info,
687                    Operation::Write,
688                    &[("path", &self.path), ("written", &self.written.to_string())],
689                    "close failed",
690                    Some(&err),
691                );
692                Err(err)
693            }
694        }
695    }
696}
697
698pub struct LoggingLister<P, I: LoggingInterceptor> {
699    info: Arc<AccessorInfo>,
700    logger: I,
701    path: String,
702
703    listed: usize,
704    inner: P,
705}
706
707impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
708    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
709        Self {
710            info,
711            logger,
712            path: path.to_string(),
713
714            listed: 0,
715            inner,
716        }
717    }
718}
719
720impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
721    async fn next(&mut self) -> Result<Option<oio::Entry>> {
722        let res = self.inner.next().await;
723
724        match &res {
725            Ok(Some(_)) => {
726                self.listed += 1;
727            }
728            Ok(None) => {
729                self.logger.log(
730                    &self.info,
731                    Operation::List,
732                    &[("path", &self.path), ("listed", &self.listed.to_string())],
733                    "finished",
734                    None,
735                );
736            }
737            Err(err) => {
738                self.logger.log(
739                    &self.info,
740                    Operation::List,
741                    &[("path", &self.path), ("listed", &self.listed.to_string())],
742                    "failed",
743                    Some(err),
744                );
745            }
746        };
747
748        res
749    }
750}
751
752pub struct LoggingDeleter<D, I: LoggingInterceptor> {
753    info: Arc<AccessorInfo>,
754    logger: I,
755
756    deleted: usize,
757    inner: D,
758}
759
760impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
761    fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
762        Self {
763            info,
764            logger,
765
766            deleted: 0,
767            inner,
768        }
769    }
770}
771
772impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
773    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
774        let version = args
775            .version()
776            .map(|v| v.to_string())
777            .unwrap_or_else(|| "<latest>".to_string());
778
779        let res = self.inner.delete(path, args).await;
780
781        match &res {
782            Ok(_) => {
783                self.deleted += 1;
784            }
785            Err(err) => {
786                self.logger.log(
787                    &self.info,
788                    Operation::Delete,
789                    &[
790                        ("path", path),
791                        ("version", &version),
792                        ("deleted", &self.deleted.to_string()),
793                    ],
794                    "failed",
795                    Some(err),
796                );
797            }
798        };
799
800        res
801    }
802
803    async fn close(&mut self) -> Result<()> {
804        let res = self.inner.close().await;
805
806        match &res {
807            Ok(_) => {
808                self.logger.log(
809                    &self.info,
810                    Operation::Delete,
811                    &[("deleted", &self.deleted.to_string())],
812                    "succeeded",
813                    None,
814                );
815            }
816            Err(err) => {
817                self.logger.log(
818                    &self.info,
819                    Operation::Delete,
820                    &[("deleted", &self.deleted.to_string())],
821                    "failed",
822                    Some(err),
823                );
824            }
825        };
826
827        res
828    }
829}