1. 添加cache life time的支持。默认10s

2. 优化server:port的解析,使其可以单独定义ip端口或者一起定义
3. 使用fmt格式化整个工程
This commit is contained in:
Ekko.bao 2024-08-07 08:37:14 +08:00
parent 1d49f19301
commit b60feeb7ed
3 changed files with 148 additions and 111 deletions

View File

@ -1,4 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/lws.proto")?; tonic_build::compile_protos("proto/lws.proto")?;
Ok(()) Ok(())
} }

View File

@ -1,12 +1,12 @@
use clap::{App, Arg};
use lws_client::LwsVfsIns; use lws_client::LwsVfsIns;
use std::thread; use std::thread;
use clap::{Arg, App};
extern crate log; extern crate log;
use std::env; use std::env;
const DEFAULT_PORT: u16 = 33444; const DEFAULT_PORT: u16 = 33444;
const DEFAULT_CACHE_LIFE: u32 = 10_000;
// use std::process::{Command,Stdio}; // use std::process::{Command,Stdio};
// fn get_ssh_clinet() -> String { // fn get_ssh_clinet() -> String {
// // Run `who` command // // Run `who` command
@ -19,7 +19,7 @@ const DEFAULT_PORT: u16 = 33444;
// .stdout(Stdio::piped()) // .stdout(Stdio::piped())
// .output().unwrap(); // .output().unwrap();
// let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string(); // let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string();
// let grep = Command::new("grep") // let grep = Command::new("grep")
// .arg(&user) // .arg(&user)
// .stdin(Stdio::from(who.stdout.unwrap())) // .stdin(Stdio::from(who.stdout.unwrap()))
@ -51,50 +51,83 @@ fn get_ssh_clinet() -> String {
client.to_string() client.to_string()
} }
fn param_parser() -> (String, String) { fn param_parser() -> (String, String, u32) {
let matches = App::new("lws_client") let matches = App::new("lws_client")
.version("1.0") .version("1.0")
.author("Ekko.bao") .author("Ekko.bao")
.about("linux&windows shared filesystem") .about("linux&windows shared filesystem")
.arg(Arg::with_name("s") .arg(
.short('s') Arg::with_name("s")
.long("server") .short('s')
.takes_value(true) .long("server")
.value_name("server") .takes_value(true)
.help(format!("server addr: [ip[:port]], default is [sshclient:{}]", DEFAULT_PORT).as_str())) .min_values(1)
.arg(Arg::with_name("m") .max_values(1)
.short('m') .value_name("server")
.long("mount") .help(
.takes_value(true) format!(
.value_name("mount point") "server addr: [ip[:port]], default is [sshclient:{}]",
.required(true) DEFAULT_PORT
.help("mount point, eg: ~/mnt")) )
.get_matches(); .as_str(),
let server = match matches.value_of("s") { ),
)
.arg(
Arg::with_name("m")
.short('m')
.long("mount")
.takes_value(true)
.min_values(1)
.max_values(1)
.value_name("mount point")
.required(true)
.help("mount point, eg: ~/mnt"),
)
.arg(
Arg::with_name("c")
.short('c')
.long("cache")
.takes_value(true)
.min_values(1)
.max_values(1)
.value_name("cache invalid time")
.help("file info cache invalid time, unit: ms, defualt is 10_000ms"),
)
.get_matches();
let (ip, port) = match matches.value_of("s") {
Some(server) => { Some(server) => {
let split = server.split(":").collect::<Vec<&str>>(); let split = server.split(":").collect::<Vec<&str>>();
if split.len() == 2 { if split.len() == 2 {
server.to_string() (split[0].to_string(), split[1].to_string())
} else { } else {
format!("{}:{}", split[0], DEFAULT_PORT) (split[0].to_string(), DEFAULT_PORT.to_string())
} }
},
None => {
let ip = get_ssh_clinet();
format!("{}:{}", ip, DEFAULT_PORT)
} }
None => (String::new(), String::new()),
}; };
let ip = if ip.len() == 0 { get_ssh_clinet() } else { ip };
let port = if port.len() == 0 {
DEFAULT_PORT.to_string()
} else {
port
};
let server = format!("{}:{}", ip, port);
log::info!("args server: [{}]", server); log::info!("args server: [{}]", server);
let mount_point = matches.value_of("m").unwrap().to_string(); let mount_point = matches.value_of("m").unwrap().to_string();
log::info!("args mount_point: [{}]", mount_point); log::info!("args mount_point: [{}]", mount_point);
(server, mount_point) let cache_life = match matches.value_of("c") {
Some(cache_life) => cache_life.parse().unwrap(),
None => DEFAULT_CACHE_LIFE,
};
log::info!("args cache invalid time: [{}ms]", cache_life);
(server, mount_point, cache_life)
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init(); env_logger::init();
let (server, mount_point) = param_parser(); let (server, mount_point, cache_life) = param_parser();
let lws_ins = match LwsVfsIns::new(&server).await { let lws_ins = match LwsVfsIns::new(&server, cache_life).await {
Ok(ins) => ins, Ok(ins) => ins,
Err(e) => { Err(e) => {
log::error!("Error creating lws server instance: {:?}", e); log::error!("Error creating lws server instance: {:?}", e);
@ -110,24 +143,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
_ => {} _ => {}
} }
log::info!("start mount process"); log::info!("start mount process");
let handle = thread::spawn(move ||{ let handle = thread::spawn(move || match LwsVfsIns::mount(&mount_point, lws_ins) {
match LwsVfsIns::mount(&mount_point, lws_ins) { Ok(_) => Ok::<i32, String>(0),
Ok(_) => { Err(e) => {
Ok::<i32, String>(0) log::error!("mount err {:?}", e);
}, Ok::<i32, String>(-1)
Err(e) => {
log::error!("mount err {:?}", e);
Ok::<i32, String>(-1)
}
} }
}); });
match handle.join() { match handle.join() {
Ok(_) => { Ok(_) => Ok(()),
Ok(())
},
Err(e) => { Err(e) => {
log::error!("mount thread start err {:?}", e); log::error!("mount thread start err {:?}", e);
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "mount fail"))) Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"mount fail",
)))
} }
} }
} }

View File

@ -1,18 +1,18 @@
use hashbrown::HashMap;
use lws_client::lws_vfs_client::LwsVfsClient; use lws_client::lws_vfs_client::LwsVfsClient;
use lws_client::{ use lws_client::{
Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, Access, Create, FileInfo, Flush, Fstat, GetConfig, Getattr, HelloRequest, Mkdir, Open, Opendir,
Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, GetConfig, Read, Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write,
}; };
use serde_json::{self, Value}; use serde_json::{self, Value};
use hashbrown::HashMap;
use std::error::Error; use std::error::Error;
use std::ffi::{OsStr}; use std::ffi::OsStr;
use std::fs::File; use std::fs::File;
use std::{env, path, vec};
use std::io::{Error as IoError, ErrorKind, Read as IoRead}; use std::io::{Error as IoError, ErrorKind, Read as IoRead};
use tonic::transport::{Channel as RpcChannel};
use tokio::runtime::{Builder,Runtime};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::{env, path, vec};
use tokio::runtime::{Builder, Runtime};
use tonic::transport::Channel as RpcChannel;
extern crate log; extern crate log;
pub mod lws_client { pub mod lws_client {
@ -44,9 +44,7 @@ impl Config {
) )
}) })
.collect(); .collect();
Ok(Config { Ok(Config { mount_map })
mount_map,
})
} }
pub fn get_mount(&self) -> Vec<&String> { pub fn get_mount(&self) -> Vec<&String> {
@ -80,11 +78,16 @@ impl RpcPool {
let channel = RpcChannel::from_shared(uri)?.connect().await?; let channel = RpcChannel::from_shared(uri)?.connect().await?;
for _ in 0..size { for _ in 0..size {
let client = LwsVfsClient::new(channel.clone()); let client = LwsVfsClient::new(channel.clone());
let (tx,rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
clients.push(Arc::new(RwLock::new(Rpc { clients.push(Arc::new(RwLock::new(Rpc {
client, client,
in_use: false, in_use: false,
rt: Arc::new(Builder::new_multi_thread().worker_threads(1).build().unwrap()), rt: Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap(),
),
tx: Arc::new(tx), tx: Arc::new(tx),
rx: Arc::new(Mutex::new(rx)), rx: Arc::new(Mutex::new(rx)),
}))); })));
@ -109,7 +112,8 @@ impl RpcPool {
let entry = clients[idx].read().unwrap(); let entry = clients[idx].read().unwrap();
if !entry.in_use { if !entry.in_use {
drop(entry); drop(entry);
let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = clients[idx].write().unwrap(); let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> =
clients[idx].write().unwrap();
entry.in_use = true; entry.in_use = true;
return Ok(clients[idx].clone()); return Ok(clients[idx].clone());
} }
@ -265,22 +269,23 @@ impl VirFs {
} }
type FileAttrInfo = Result<FileAttr, i32>; type FileAttrInfo = Result<FileAttr, i32>;
pub struct FileAttrCache{ pub struct FileAttrCache {
attr: FileAttrInfo, attr: FileAttrInfo,
exp_time: u32, exp_time: u32,
} }
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
fn current_time_in_ms() -> u32 { fn current_time_in_ms() -> u32 {
let start = SystemTime::now(); let start = SystemTime::now();
start.duration_since(UNIX_EPOCH) start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards") .expect("Time went backwards")
.as_millis() as u32 .as_millis() as u32
} }
impl FileAttrCache { impl FileAttrCache {
pub fn new(attr: FileAttrInfo) -> FileAttrCache{ pub fn new(attr: FileAttrInfo, cache_life: u32) -> FileAttrCache {
FileAttrCache { FileAttrCache {
attr, attr,
exp_time: current_time_in_ms() + 10_000, exp_time: current_time_in_ms() + cache_life,
} }
} }
} }
@ -291,7 +296,7 @@ pub enum FileAttrCacheMsg {
Set(String, FileAttrInfo), Set(String, FileAttrInfo),
Clr(Vec<String>), Clr(Vec<String>),
} }
pub struct FileAttrCacheCtx{ pub struct FileAttrCacheCtx {
pub cache: HashMap<String, FileAttrCache>, pub cache: HashMap<String, FileAttrCache>,
pub cache_size: u32, pub cache_size: u32,
rx: Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>, rx: Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>,
@ -299,7 +304,7 @@ pub struct FileAttrCacheCtx{
impl FileAttrCacheCtx { impl FileAttrCacheCtx {
pub fn new(rx: mpsc::Receiver<FileAttrCacheMsg>) -> FileAttrCacheCtx { pub fn new(rx: mpsc::Receiver<FileAttrCacheMsg>) -> FileAttrCacheCtx {
FileAttrCacheCtx{ FileAttrCacheCtx {
cache: HashMap::new(), cache: HashMap::new(),
cache_size: 0, cache_size: 0,
rx: Arc::new(Mutex::new(rx)), rx: Arc::new(Mutex::new(rx)),
@ -307,34 +312,31 @@ impl FileAttrCacheCtx {
} }
} }
pub struct FileAttrCacheManager{ pub struct FileAttrCacheManager {
handle: thread::JoinHandle<()>, handle: thread::JoinHandle<()>,
tx: mpsc::Sender<FileAttrCacheMsg>, tx: mpsc::Sender<FileAttrCacheMsg>,
} }
impl FileAttrCacheManager { impl FileAttrCacheManager {
pub fn new() -> FileAttrCacheManager{ pub fn new(cache_life: u32) -> FileAttrCacheManager {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let cache_ctx = FileAttrCacheCtx::new(rx); let cache_ctx = FileAttrCacheCtx::new(rx);
let handle = thread::spawn(|| { let handle = thread::spawn(move || {
Self::cache_manager(cache_ctx); Self::cache_manager(cache_ctx, cache_life);
}); });
FileAttrCacheManager{ FileAttrCacheManager { handle, tx }
handle,
tx,
}
} }
fn cache_manager(mut ctx: FileAttrCacheCtx) { fn cache_manager(mut ctx: FileAttrCacheCtx, cache_life: u32) {
let rx = ctx.rx.lock().unwrap(); let rx = ctx.rx.lock().unwrap();
fn invalid_cache(cache: &mut HashMap<String, FileAttrCache>) -> u32 { fn invalid_cache(cache: &mut HashMap<String, FileAttrCache>) -> u32 {
let mut cache_size = 0; let mut cache_size = 0;
let curr = current_time_in_ms(); let curr = current_time_in_ms();
let del:Vec<String> = cache.iter() let del: Vec<String> = cache
.filter(|&(_, val)| val.exp_time < curr) .iter()
.map(|(key, _)| key.clone()) .filter(|&(_, val)| val.exp_time < curr)
.collect(); .map(|(key, _)| key.clone())
.collect();
for key in del { for key in del {
cache.remove(&key); cache.remove(&key);
log::debug!("invalid cache: {}", key); log::debug!("invalid cache: {}", key);
@ -350,52 +352,57 @@ impl FileAttrCacheManager {
} }
match req { match req {
FileAttrCacheMsg::Set(path, attr) => { FileAttrCacheMsg::Set(path, attr) => {
ctx.cache.insert(path, FileAttrCache::new(attr)); ctx.cache.insert(path, FileAttrCache::new(attr, cache_life));
ctx.cache_size += 1; ctx.cache_size += 1;
}, }
FileAttrCacheMsg::Get(path, chn) => { FileAttrCacheMsg::Get(path, chn) => match ctx.cache.get(&path) {
match ctx.cache.get(&path) { Some(attr) => {
Some(attr) => { let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr)));
let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr))); }
}, None => {
None => { let _ = chn.send(FileAttrCacheMsg::GetReply(path, None));
let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); }
}
}
}, },
FileAttrCacheMsg::Clr(paths) => { FileAttrCacheMsg::Clr(paths) => {
for path in paths.into_iter() { for path in paths.into_iter() {
ctx.cache.remove(&path); ctx.cache.remove(&path);
ctx.cache_size -= 1; ctx.cache_size -= 1;
} }
}, }
_ => {} _ => {}
} }
}, }
Err(_) =>{ Err(_) => {
ctx.cache_size -= invalid_cache(&mut ctx.cache); ctx.cache_size -= invalid_cache(&mut ctx.cache);
}, }
} }
} }
} }
pub fn set(&self, path:&String, attr: FileAttrInfo){ pub fn set(&self, path: &String, attr: FileAttrInfo) {
let _ = self.tx.send(FileAttrCacheMsg::Set(path.to_string(), attr)); let _ = self.tx.send(FileAttrCacheMsg::Set(path.to_string(), attr));
} }
pub fn get(&self, path:&String, tx: &Arc<mpsc::Sender<FileAttrCacheMsg>>, rx: &Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>) -> Option<FileAttrInfo> { pub fn get(
&self,
path: &String,
tx: &Arc<mpsc::Sender<FileAttrCacheMsg>>,
rx: &Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>,
) -> Option<FileAttrInfo> {
//send get gile attribute request //send get gile attribute request
let _ = self.tx.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone())); let _ = self
.tx
.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone()));
let total_timeout = Duration::from_millis(100); let total_timeout = Duration::from_millis(100);
let start = Instant::now(); let start = Instant::now();
loop { loop {
let elapsed = start.elapsed(); let elapsed = start.elapsed();
if elapsed >= total_timeout { if elapsed >= total_timeout {
return None return None;
} }
let timeout = total_timeout - elapsed; let timeout = total_timeout - elapsed;
let resp = { let resp = {
match rx.lock().unwrap().recv_timeout(timeout){ match rx.lock().unwrap().recv_timeout(timeout) {
Ok(resp) => resp, Ok(resp) => resp,
Err(_e) => return None, Err(_e) => return None,
} }
@ -406,7 +413,7 @@ impl FileAttrCacheManager {
//message is not match //message is not match
return resp; return resp;
} }
}, }
_ => break, _ => break,
} }
} }
@ -459,8 +466,7 @@ impl FilesystemMT for LwsVfsIns {
} }
/// Called on filesystem unmount. /// Called on filesystem unmount.
fn destroy(&self) { fn destroy(&self) {}
}
/// Get the attributes of a filesystem entry. /// Get the attributes of a filesystem entry.
/// ///
@ -471,7 +477,7 @@ impl FilesystemMT for LwsVfsIns {
let nanos = timens % (1000 * 1000); let nanos = timens % (1000 * 1000);
SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32) SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32)
}; };
let file_attr = |fst: &Fstat, req:&RequestInfo| FileAttr { let file_attr = |fst: &Fstat, req: &RequestInfo| FileAttr {
size: fst.fst_size, size: fst.fst_size,
blocks: fst.fst_blocks, blocks: fst.fst_blocks,
atime: time(fst.fst_atime), atime: time(fst.fst_atime),
@ -1507,10 +1513,8 @@ impl FilesystemMT for LwsVfsIns {
} }
} }
impl LwsVfsIns { impl LwsVfsIns {
pub async fn new(server: &String) -> Result<LwsVfsIns, Box<dyn Error>> { pub async fn new(server: &String, cache_life: u32) -> Result<LwsVfsIns, Box<dyn Error>> {
let addr = format!("http://{}", server); let addr = format!("http://{}", server);
log::info!("lws server is {}", addr); log::info!("lws server is {}", addr);
let rpcs = RpcPool::new(10, addr).await?; let rpcs = RpcPool::new(10, addr).await?;
@ -1523,7 +1527,7 @@ impl LwsVfsIns {
config, config,
rpcs, rpcs,
vir_root, vir_root,
cache:FileAttrCacheManager::new(), cache: FileAttrCacheManager::new(cache_life),
}) })
} }
@ -1543,7 +1547,10 @@ impl LwsVfsIns {
Ok(resp) => resp.into_inner(), Ok(resp) => resp.into_inner(),
Err(e) => { Err(e) => {
log::error!("get resp err: {:?}", e); log::error!("get resp err: {:?}", e);
return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "get config err"))); return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"get config err",
)));
} }
}; };
let config = Config::new(&resp.config).expect("parse config err"); let config = Config::new(&resp.config).expect("parse config err");
@ -1583,7 +1590,7 @@ impl LwsVfsIns {
Ok(()) Ok(())
} }
pub fn mount<F>(mount_point:&str, file_system: F) -> Result<(), Box<dyn Error>> pub fn mount<F>(mount_point: &str, file_system: F) -> Result<(), Box<dyn Error>>
where where
F: FilesystemMT + Sync + Send + 'static, F: FilesystemMT + Sync + Send + 'static,
{ {