From b60feeb7ed9d9e8da7c25059d076f48410c8216f Mon Sep 17 00:00:00 2001 From: BeGild Date: Wed, 7 Aug 2024 08:37:14 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=B7=BB=E5=8A=A0cache=20life=20time?= =?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81=E3=80=82=E9=BB=98=E8=AE=A410s=202.?= =?UTF-8?q?=20=E4=BC=98=E5=8C=96server:port=E7=9A=84=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=EF=BC=8C=E4=BD=BF=E5=85=B6=E5=8F=AF=E4=BB=A5=E5=8D=95=E7=8B=AC?= =?UTF-8?q?=E5=AE=9A=E4=B9=89ip=E7=AB=AF=E5=8F=A3=E6=88=96=E8=80=85?= =?UTF-8?q?=E4=B8=80=E8=B5=B7=E5=AE=9A=E4=B9=89=203.=20=E4=BD=BF=E7=94=A8f?= =?UTF-8?q?mt=E6=A0=BC=E5=BC=8F=E5=8C=96=E6=95=B4=E4=B8=AA=E5=B7=A5?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.rs | 8 +-- src/client.rs | 118 +++++++++++++++++++++++++++----------------- src/lib.rs | 133 ++++++++++++++++++++++++++------------------------ 3 files changed, 148 insertions(+), 111 deletions(-) diff --git a/build.rs b/build.rs index a38129c..b972aa7 100755 --- a/build.rs +++ b/build.rs @@ -1,4 +1,4 @@ -fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/lws.proto")?; - Ok(()) -} \ No newline at end of file +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/lws.proto")?; + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index 7577872..46f3181 100755 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,12 @@ +use clap::{App, Arg}; use lws_client::LwsVfsIns; use std::thread; -use clap::{Arg, App}; extern crate log; use std::env; const DEFAULT_PORT: u16 = 33444; - +const DEFAULT_CACHE_LIFE: u32 = 10_000; // use std::process::{Command,Stdio}; // fn get_ssh_clinet() -> String { // // Run `who` command @@ -19,7 +19,7 @@ const DEFAULT_PORT: u16 = 33444; // .stdout(Stdio::piped()) // .output().unwrap(); // let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string(); - + // let grep = Command::new("grep") // .arg(&user) // .stdin(Stdio::from(who.stdout.unwrap())) @@ -51,50 +51,83 @@ fn get_ssh_clinet() -> String { client.to_string() } -fn param_parser() -> (String, String) { +fn param_parser() -> (String, String, u32) { let matches = App::new("lws_client") - .version("1.0") - .author("Ekko.bao") - .about("linux&windows shared filesystem") - .arg(Arg::with_name("s") - .short('s') - .long("server") - .takes_value(true) - .value_name("server") - .help(format!("server addr: [ip[:port]], default is [sshclient:{}]", DEFAULT_PORT).as_str())) - .arg(Arg::with_name("m") - .short('m') - .long("mount") - .takes_value(true) - .value_name("mount point") - .required(true) - .help("mount point, eg: ~/mnt")) - .get_matches(); - let server = match matches.value_of("s") { + .version("1.0") + .author("Ekko.bao") + .about("linux&windows shared filesystem") + .arg( + Arg::with_name("s") + .short('s') + .long("server") + .takes_value(true) + .min_values(1) + .max_values(1) + .value_name("server") + .help( + format!( + "server addr: [ip[:port]], default is [sshclient:{}]", + DEFAULT_PORT + ) + .as_str(), + ), + ) + .arg( + Arg::with_name("m") + .short('m') + .long("mount") + .takes_value(true) + .min_values(1) + .max_values(1) + .value_name("mount point") + .required(true) + .help("mount point, eg: ~/mnt"), + ) + .arg( + Arg::with_name("c") + .short('c') + .long("cache") + .takes_value(true) + .min_values(1) + .max_values(1) + .value_name("cache invalid time") + .help("file info cache invalid time, unit: ms, defualt is 10_000ms"), + ) + .get_matches(); + let (ip, port) = match matches.value_of("s") { Some(server) => { let split = server.split(":").collect::>(); if split.len() == 2 { - server.to_string() + (split[0].to_string(), split[1].to_string()) } else { - format!("{}:{}", split[0], DEFAULT_PORT) + (split[0].to_string(), DEFAULT_PORT.to_string()) } - }, - None => { - let ip = get_ssh_clinet(); - format!("{}:{}", ip, DEFAULT_PORT) } + None => (String::new(), String::new()), }; + let ip = if ip.len() == 0 { get_ssh_clinet() } else { ip }; + let port = if port.len() == 0 { + DEFAULT_PORT.to_string() + } else { + port + }; + let server = format!("{}:{}", ip, port); log::info!("args server: [{}]", server); let mount_point = matches.value_of("m").unwrap().to_string(); log::info!("args mount_point: [{}]", mount_point); - (server, mount_point) + let cache_life = match matches.value_of("c") { + Some(cache_life) => cache_life.parse().unwrap(), + None => DEFAULT_CACHE_LIFE, + }; + log::info!("args cache invalid time: [{}ms]", cache_life); + (server, mount_point, cache_life) } #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); - let (server, mount_point) = param_parser(); - let lws_ins = match LwsVfsIns::new(&server).await { + let (server, mount_point, cache_life) = param_parser(); + let lws_ins = match LwsVfsIns::new(&server, cache_life).await { Ok(ins) => ins, Err(e) => { log::error!("Error creating lws server instance: {:?}", e); @@ -110,24 +143,21 @@ async fn main() -> Result<(), Box> { _ => {} } log::info!("start mount process"); - let handle = thread::spawn(move ||{ - match LwsVfsIns::mount(&mount_point, lws_ins) { - Ok(_) => { - Ok::(0) - }, - Err(e) => { - log::error!("mount err {:?}", e); - Ok::(-1) - } + let handle = thread::spawn(move || match LwsVfsIns::mount(&mount_point, lws_ins) { + Ok(_) => Ok::(0), + Err(e) => { + log::error!("mount err {:?}", e); + Ok::(-1) } }); match handle.join() { - Ok(_) => { - Ok(()) - }, + Ok(_) => Ok(()), Err(e) => { log::error!("mount thread start err {:?}", e); - Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "mount fail"))) + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "mount fail", + ))) } } } diff --git a/src/lib.rs b/src/lib.rs index 81e08b0..ce59e94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,18 @@ +use hashbrown::HashMap; use lws_client::lws_vfs_client::LwsVfsClient; use lws_client::{ - Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, - Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, GetConfig, + Access, Create, FileInfo, Flush, Fstat, GetConfig, Getattr, HelloRequest, Mkdir, Open, Opendir, + Read, Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, }; use serde_json::{self, Value}; -use hashbrown::HashMap; use std::error::Error; -use std::ffi::{OsStr}; +use std::ffi::OsStr; use std::fs::File; -use std::{env, path, vec}; use std::io::{Error as IoError, ErrorKind, Read as IoRead}; -use tonic::transport::{Channel as RpcChannel}; -use tokio::runtime::{Builder,Runtime}; use std::thread::{self, JoinHandle}; +use std::{env, path, vec}; +use tokio::runtime::{Builder, Runtime}; +use tonic::transport::Channel as RpcChannel; extern crate log; pub mod lws_client { @@ -44,9 +44,7 @@ impl Config { ) }) .collect(); - Ok(Config { - mount_map, - }) + Ok(Config { mount_map }) } pub fn get_mount(&self) -> Vec<&String> { @@ -80,11 +78,16 @@ impl RpcPool { let channel = RpcChannel::from_shared(uri)?.connect().await?; for _ in 0..size { let client = LwsVfsClient::new(channel.clone()); - let (tx,rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel(); clients.push(Arc::new(RwLock::new(Rpc { client, in_use: false, - rt: Arc::new(Builder::new_multi_thread().worker_threads(1).build().unwrap()), + rt: Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(), + ), tx: Arc::new(tx), rx: Arc::new(Mutex::new(rx)), }))); @@ -109,7 +112,8 @@ impl RpcPool { let entry = clients[idx].read().unwrap(); if !entry.in_use { drop(entry); - let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = clients[idx].write().unwrap(); + let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = + clients[idx].write().unwrap(); entry.in_use = true; return Ok(clients[idx].clone()); } @@ -265,22 +269,23 @@ impl VirFs { } type FileAttrInfo = Result; -pub struct FileAttrCache{ +pub struct FileAttrCache { attr: FileAttrInfo, exp_time: u32, } use std::time::UNIX_EPOCH; fn current_time_in_ms() -> u32 { let start = SystemTime::now(); - start.duration_since(UNIX_EPOCH) + start + .duration_since(UNIX_EPOCH) .expect("Time went backwards") .as_millis() as u32 } impl FileAttrCache { - pub fn new(attr: FileAttrInfo) -> FileAttrCache{ + pub fn new(attr: FileAttrInfo, cache_life: u32) -> FileAttrCache { FileAttrCache { attr, - exp_time: current_time_in_ms() + 10_000, + exp_time: current_time_in_ms() + cache_life, } } } @@ -291,7 +296,7 @@ pub enum FileAttrCacheMsg { Set(String, FileAttrInfo), Clr(Vec), } -pub struct FileAttrCacheCtx{ +pub struct FileAttrCacheCtx { pub cache: HashMap, pub cache_size: u32, rx: Arc>>, @@ -299,7 +304,7 @@ pub struct FileAttrCacheCtx{ impl FileAttrCacheCtx { pub fn new(rx: mpsc::Receiver) -> FileAttrCacheCtx { - FileAttrCacheCtx{ + FileAttrCacheCtx { cache: HashMap::new(), cache_size: 0, rx: Arc::new(Mutex::new(rx)), @@ -307,34 +312,31 @@ impl FileAttrCacheCtx { } } -pub struct FileAttrCacheManager{ +pub struct FileAttrCacheManager { handle: thread::JoinHandle<()>, tx: mpsc::Sender, } impl FileAttrCacheManager { - pub fn new() -> FileAttrCacheManager{ + pub fn new(cache_life: u32) -> FileAttrCacheManager { let (tx, rx) = mpsc::channel(); let cache_ctx = FileAttrCacheCtx::new(rx); - let handle = thread::spawn(|| { - Self::cache_manager(cache_ctx); + let handle = thread::spawn(move || { + Self::cache_manager(cache_ctx, cache_life); }); - FileAttrCacheManager{ - handle, - tx, - } + FileAttrCacheManager { handle, tx } } - fn cache_manager(mut ctx: FileAttrCacheCtx) { + fn cache_manager(mut ctx: FileAttrCacheCtx, cache_life: u32) { let rx = ctx.rx.lock().unwrap(); - fn invalid_cache(cache: &mut HashMap) -> u32 { let mut cache_size = 0; let curr = current_time_in_ms(); - let del:Vec = cache.iter() - .filter(|&(_, val)| val.exp_time < curr) - .map(|(key, _)| key.clone()) - .collect(); + let del: Vec = cache + .iter() + .filter(|&(_, val)| val.exp_time < curr) + .map(|(key, _)| key.clone()) + .collect(); for key in del { cache.remove(&key); log::debug!("invalid cache: {}", key); @@ -350,52 +352,57 @@ impl FileAttrCacheManager { } match req { FileAttrCacheMsg::Set(path, attr) => { - ctx.cache.insert(path, FileAttrCache::new(attr)); + ctx.cache.insert(path, FileAttrCache::new(attr, cache_life)); ctx.cache_size += 1; - }, - FileAttrCacheMsg::Get(path, chn) => { - match ctx.cache.get(&path) { - Some(attr) => { - let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr))); - }, - None => { - let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); - } - } + } + FileAttrCacheMsg::Get(path, chn) => match ctx.cache.get(&path) { + Some(attr) => { + let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr))); + } + None => { + let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); + } }, FileAttrCacheMsg::Clr(paths) => { for path in paths.into_iter() { ctx.cache.remove(&path); ctx.cache_size -= 1; } - }, + } _ => {} } - }, - Err(_) =>{ + } + Err(_) => { ctx.cache_size -= invalid_cache(&mut ctx.cache); - }, + } } } } - pub fn set(&self, path:&String, attr: FileAttrInfo){ + pub fn set(&self, path: &String, attr: FileAttrInfo) { let _ = self.tx.send(FileAttrCacheMsg::Set(path.to_string(), attr)); } - pub fn get(&self, path:&String, tx: &Arc>, rx: &Arc>>) -> Option { + pub fn get( + &self, + path: &String, + tx: &Arc>, + rx: &Arc>>, + ) -> Option { //send get gile attribute request - let _ = self.tx.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone())); + let _ = self + .tx + .send(FileAttrCacheMsg::Get(path.to_string(), tx.clone())); let total_timeout = Duration::from_millis(100); let start = Instant::now(); loop { let elapsed = start.elapsed(); if elapsed >= total_timeout { - return None + return None; } let timeout = total_timeout - elapsed; let resp = { - match rx.lock().unwrap().recv_timeout(timeout){ + match rx.lock().unwrap().recv_timeout(timeout) { Ok(resp) => resp, Err(_e) => return None, } @@ -406,7 +413,7 @@ impl FileAttrCacheManager { //message is not match return resp; } - }, + } _ => break, } } @@ -459,8 +466,7 @@ impl FilesystemMT for LwsVfsIns { } /// Called on filesystem unmount. - fn destroy(&self) { - } + fn destroy(&self) {} /// Get the attributes of a filesystem entry. /// @@ -471,7 +477,7 @@ impl FilesystemMT for LwsVfsIns { let nanos = timens % (1000 * 1000); SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32) }; - let file_attr = |fst: &Fstat, req:&RequestInfo| FileAttr { + let file_attr = |fst: &Fstat, req: &RequestInfo| FileAttr { size: fst.fst_size, blocks: fst.fst_blocks, atime: time(fst.fst_atime), @@ -1507,10 +1513,8 @@ impl FilesystemMT for LwsVfsIns { } } - - impl LwsVfsIns { - pub async fn new(server: &String) -> Result> { + pub async fn new(server: &String, cache_life: u32) -> Result> { let addr = format!("http://{}", server); log::info!("lws server is {}", addr); let rpcs = RpcPool::new(10, addr).await?; @@ -1523,7 +1527,7 @@ impl LwsVfsIns { config, rpcs, vir_root, - cache:FileAttrCacheManager::new(), + cache: FileAttrCacheManager::new(cache_life), }) } @@ -1543,7 +1547,10 @@ impl LwsVfsIns { Ok(resp) => resp.into_inner(), Err(e) => { log::error!("get resp err: {:?}", e); - return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "get config err"))); + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "get config err", + ))); } }; let config = Config::new(&resp.config).expect("parse config err"); @@ -1583,7 +1590,7 @@ impl LwsVfsIns { Ok(()) } - pub fn mount(mount_point:&str, file_system: F) -> Result<(), Box> + pub fn mount(mount_point: &str, file_system: F) -> Result<(), Box> where F: FilesystemMT + Sync + Send + 'static, {