1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::PathBuf;
21use std::time::Duration;
22
23use http::Uri;
24use redis::Client;
25use redis::ConnectionAddr;
26use redis::ConnectionInfo;
27use redis::ProtocolVersion;
28use redis::RedisConnectionInfo;
29use redis::cluster::ClusterClientBuilder;
30use tokio::sync::OnceCell;
31
32use super::REDIS_SCHEME;
33use super::core::*;
34use super::delete::RedisDeleter;
35use super::writer::RedisWriter;
36use crate::raw::oio;
37use crate::raw::*;
38use crate::services::RedisConfig;
39use crate::*;
40const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
41const DEFAULT_REDIS_PORT: u16 = 6379;
42
43#[doc = include_str!("docs.md")]
45#[derive(Clone, Default)]
46pub struct RedisBuilder {
47 pub(super) config: RedisConfig,
48}
49
50impl Debug for RedisBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 let mut d = f.debug_struct("RedisBuilder");
53
54 d.field("config", &self.config);
55 d.finish_non_exhaustive()
56 }
57}
58
59impl RedisBuilder {
60 pub fn endpoint(mut self, endpoint: &str) -> Self {
68 if !endpoint.is_empty() {
69 self.config.endpoint = Some(endpoint.to_owned());
70 }
71 self
72 }
73
74 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
83 if !cluster_endpoints.is_empty() {
84 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
85 }
86 self
87 }
88
89 pub fn username(mut self, username: &str) -> Self {
93 if !username.is_empty() {
94 self.config.username = Some(username.to_owned());
95 }
96 self
97 }
98
99 pub fn password(mut self, password: &str) -> Self {
103 if !password.is_empty() {
104 self.config.password = Some(password.to_owned());
105 }
106 self
107 }
108
109 pub fn db(mut self, db: i64) -> Self {
113 self.config.db = db;
114 self
115 }
116
117 pub fn default_ttl(mut self, ttl: Duration) -> Self {
121 self.config.default_ttl = Some(ttl);
122 self
123 }
124
125 pub fn root(mut self, root: &str) -> Self {
129 self.config.root = if root.is_empty() {
130 None
131 } else {
132 Some(root.to_string())
133 };
134
135 self
136 }
137}
138
139impl Builder for RedisBuilder {
140 type Config = RedisConfig;
141
142 fn build(self) -> Result<impl Access> {
143 let root = normalize_root(
144 self.config
145 .root
146 .clone()
147 .unwrap_or_else(|| "/".to_string())
148 .as_str(),
149 );
150
151 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
152 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
153 for endpoint in endpoints.split(',') {
154 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
155 }
156 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
157 if let Some(username) = &self.config.username {
158 client_builder = client_builder.username(username.clone());
159 }
160 if let Some(password) = &self.config.password {
161 client_builder = client_builder.password(password.clone());
162 }
163 let client = client_builder.build().map_err(format_redis_error)?;
164
165 let conn = OnceCell::new();
166
167 Ok(RedisAccessor::new(RedisCore {
168 addr: endpoints,
169 client: None,
170 cluster_client: Some(client),
171 conn,
172 default_ttl: self.config.default_ttl,
173 })
174 .with_normalized_root(root))
175 } else {
176 let endpoint = self
177 .config
178 .endpoint
179 .clone()
180 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
181
182 let client =
183 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
184 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
185 .with_context("service", Scheme::Redis)
186 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
187 .with_context("db", self.config.db.to_string())
188 .set_source(e)
189 })?;
190
191 let conn = OnceCell::new();
192 Ok(RedisAccessor::new(RedisCore {
193 addr: endpoint,
194 client: Some(client),
195 cluster_client: None,
196 conn,
197 default_ttl: self.config.default_ttl,
198 })
199 .with_normalized_root(root))
200 }
201 }
202}
203
204impl RedisBuilder {
205 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
206 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
207 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
208 .with_context("service", Scheme::Redis)
209 .with_context("endpoint", endpoint)
210 .set_source(e)
211 })?;
212
213 let con_addr = match ep_url.scheme_str() {
214 Some("tcp") | Some("redis") | None => {
215 let host = ep_url
216 .host()
217 .map(|h| h.to_string())
218 .unwrap_or_else(|| "127.0.0.1".to_string());
219 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
220 ConnectionAddr::Tcp(host, port)
221 }
222 Some("rediss") => {
223 let host = ep_url
224 .host()
225 .map(|h| h.to_string())
226 .unwrap_or_else(|| "127.0.0.1".to_string());
227 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228 ConnectionAddr::TcpTls {
229 host,
230 port,
231 insecure: false,
232 tls_params: None,
233 }
234 }
235 Some("unix") | Some("redis+unix") => {
236 let path = PathBuf::from(ep_url.path());
237 ConnectionAddr::Unix(path)
238 }
239 Some(s) => {
240 return Err(
241 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
242 .with_context("service", Scheme::Redis)
243 .with_context("scheme", s),
244 );
245 }
246 };
247
248 let redis_info = RedisConnectionInfo {
249 db: self.config.db,
250 username: self.config.username.clone(),
251 password: self.config.password.clone(),
252 protocol: ProtocolVersion::RESP2,
253 };
254
255 Ok(ConnectionInfo {
256 addr: con_addr,
257 redis: redis_info,
258 })
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct RedisAccessor {
265 core: std::sync::Arc<RedisCore>,
266 root: String,
267 info: std::sync::Arc<AccessorInfo>,
268}
269
270impl RedisAccessor {
271 fn new(core: RedisCore) -> Self {
272 let info = AccessorInfo::default();
273 info.set_scheme(REDIS_SCHEME);
274 info.set_name(&core.addr);
275 info.set_root("/");
276 info.set_native_capability(Capability {
277 read: true,
278 write: true,
279 delete: true,
280 stat: true,
281 write_can_empty: true,
282 shared: true,
283 ..Default::default()
284 });
285
286 Self {
287 core: std::sync::Arc::new(core),
288 root: "/".to_string(),
289 info: std::sync::Arc::new(info),
290 }
291 }
292
293 fn with_normalized_root(mut self, root: String) -> Self {
294 self.info.set_root(&root);
295 self.root = root;
296 self
297 }
298}
299
300impl Access for RedisAccessor {
301 type Reader = Buffer;
302 type Writer = RedisWriter;
303 type Lister = ();
304 type Deleter = oio::OneShotDeleter<RedisDeleter>;
305
306 fn info(&self) -> std::sync::Arc<AccessorInfo> {
307 self.info.clone()
308 }
309
310 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
311 let p = build_abs_path(&self.root, path);
312
313 if p == build_abs_path(&self.root, "") {
314 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
315 } else {
316 let bs = self.core.get(&p).await?;
317 match bs {
318 Some(bs) => Ok(RpStat::new(
319 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
320 )),
321 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
322 }
323 }
324 }
325
326 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
327 let p = build_abs_path(&self.root, path);
328
329 let range = args.range();
330 let buffer = if range.is_full() {
331 match self.core.get(&p).await? {
333 Some(bs) => bs,
334 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
335 }
336 } else {
337 let start = range.offset() as isize;
339 let end = match range.size() {
340 Some(size) => (range.offset() + size - 1) as isize,
341 None => -1, };
343
344 match self.core.get_range(&p, start, end).await? {
345 Some(bs) => bs,
346 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
347 }
348 };
349
350 Ok((RpRead::new(), buffer))
351 }
352
353 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
354 let p = build_abs_path(&self.root, path);
355 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
356 }
357
358 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
359 Ok((
360 RpDelete::default(),
361 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
362 ))
363 }
364
365 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
366 let _ = build_abs_path(&self.root, path);
367 Ok((RpList::default(), ()))
369 }
370}