diff --git a/Cargo.lock b/Cargo.lock index c520eb8..ac730dd 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0453232ace82dee0dd0b4c87a59bd90f7b53b314f3e0f61fe2ee7c8a16482289" + [[package]] name = "aho-corasick" version = "1.1.3" @@ -65,6 +71,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -179,6 +196,19 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -216,8 +246,6 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "fuse_mt" version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e098b8dc4cd32e9ba31d9c8cdfef11271d8191233c64c2a671432ff19d354948" dependencies = [ "fuser", "libc", @@ -315,6 +343,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -333,6 +370,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.3.9" @@ -373,6 +419,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.30" @@ -466,8 +518,11 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" name = "lws_client" version = "0.1.0" dependencies = [ + "env_logger", "fuse_mt", + "hashbrown 0.9.1", "libc", + "log", "prost", "serde", "serde_json", @@ -526,7 +581,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -871,6 +926,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -1088,6 +1152,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index fa0b7c6..150aa4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,11 @@ prost = "0.12" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -fuse_mt = "0.6.1" +fuse_mt = { path = "third_party/fuse-mt" } libc = "0.2" +hashbrown = "0.9.0" +log = "0.4" +env_logger = "0.8" [build-dependencies] tonic-build = "0.11" diff --git a/src/client.rs b/src/client.rs index 222d76d..d11ab7b 100755 --- a/src/client.rs +++ b/src/client.rs @@ -1,30 +1,33 @@ use lws_client::LwsVfsIns; use std::thread; +extern crate log; + #[tokio::main] async fn main() -> Result<(), Box> { + env_logger::init(); let lws_ins = match LwsVfsIns::new("config.json").await { Ok(ins) => ins, Err(e) => { - println!("Error creating lws server instance: {:?}", e); + log::error!("Error creating lws server instance: {:?}", e); return Err(e); } }; - println!("start hello process"); + log::info!("lws client instance created"); match lws_ins.hello().await { Err(e) => { - println!("lws client instance hello err {:?}", e); + log::error!("lws client instance hello err {:?}", e); return Err(e); } _ => {} } - println!("start mount process"); + log::info!("start mount process"); let handle = thread::spawn(move ||{ match LwsVfsIns::mount(lws_ins) { Ok(_) => { Ok::(0) }, Err(e) => { - println!("mount err {:?}", e); + log::error!("mount err {:?}", e); Ok::(-1) } } @@ -34,7 +37,7 @@ async fn main() -> Result<(), Box> { Ok(()) }, Err(e) => { - eprintln!("mount thread start err {:?}", e); + log::error!("mount thread start err {:?}", e); Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "mount fail"))) } } diff --git a/src/lib.rs b/src/lib.rs index 2c20996..ebd23df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,12 +4,16 @@ use lws_client::{ Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, }; use serde_json::{self, Value}; -use std::collections::HashMap; +use hashbrown::HashMap; use std::error::Error; use std::ffi::{OsStr, OsString}; use std::fs::File; +use std::{env, path, vec}; use std::io::{Error as IoError, ErrorKind, Read as IoRead}; -use tonic::transport::Channel as RpcChannel; +use tonic::transport::{channel, Channel as RpcChannel}; +use tokio::runtime::{Builder,Runtime}; +use std::thread::{self, JoinHandle}; +extern crate log; pub mod lws_client { tonic::include_proto!("lws_vfs"); //导入lws vfs proto buffer @@ -87,20 +91,27 @@ struct RpcPool { struct Rpc { pub client: LwsVfsClient, + pub rt: Arc, pub in_use: bool, + pub tx: Arc>, + pub rx: Arc>>, } impl RpcPool { async fn new(size: usize, uri: String) -> Result> { let mut clients = Vec::with_capacity(size); let uri = uri.to_ascii_lowercase(); - println!("create rpc pool {}", uri); + log::info!("create rpc pool {}", uri); let channel = RpcChannel::from_shared(uri)?.connect().await?; for _ in 0..size { let client = LwsVfsClient::new(channel.clone()); + let (tx,rx) = mpsc::channel(); clients.push(Arc::new(RwLock::new(Rpc { client, in_use: false, + rt: Arc::new(Builder::new_multi_thread().worker_threads(1).build().unwrap()), + tx: Arc::new(tx), + rx: Arc::new(Mutex::new(rx)), }))); } let wait = (Mutex::new(()), Condvar::new()); @@ -112,7 +123,7 @@ impl RpcPool { } fn get(&self) -> Result>, Box> { - // 优化遍历的逻辑 + // TODO: 优化遍历的逻辑 let find_idle = |rpc_pool: &Self| -> Result>, Box> { let cs = self.clients.clone(); let clients = &(cs.lock().unwrap()); @@ -123,7 +134,7 @@ impl RpcPool { let entry = clients[idx].read().unwrap(); if !entry.in_use { drop(entry); - let mut entry = clients[idx].write().unwrap(); + let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = clients[idx].write().unwrap(); entry.in_use = true; return Ok(clients[idx].clone()); } @@ -144,7 +155,7 @@ impl RpcPool { break; } } - println!("get free rpc client time out"); + log::warn!("get free rpc client time out"); Err(Box::new(IoError::new( ErrorKind::Other, "get free rpc client time out", @@ -222,9 +233,13 @@ impl VirFs { T: ToString, { let name = name.to_string(); - let ret = name.split('/').filter(|x| x.len() > 0).map(|y| y).collect::>(); + let ret = name + .split('/') + .filter(|x| x.len() > 0) + .map(|y| y) + .collect::>(); let name = ret[0]; - println!("add fs {}", name); + log::debug!("add fs {}", name); let mut node = VirFs::create(name.to_string(), true); node.is_root = false; self.sub.insert(name.to_string(), node); @@ -236,7 +251,7 @@ impl VirFs { { let name = name.to_string(); let mut vname = name.split('/').filter(|x| x.len() > 0); - //println!("try match {}", name); + //log::info!("try match {}", name); if let Some(_curr) = vname.next() { return None; } @@ -258,7 +273,8 @@ impl VirFs { } None => None, } - } else { // find over + } else { + // find over Some(self) } } @@ -268,16 +284,170 @@ impl VirFs { for name in self.sub.keys() { ret.push(name.to_string()); } - println!("readdir: {:?}", ret); + log::info!("readdir: {:?}", ret); ret } } +type FileAttrInfo = Result; +pub struct FileAttrCache{ + attr: FileAttrInfo, + exp_time: u32, +} +use std::time::UNIX_EPOCH; +fn current_time_in_ms() -> u32 { + let start = SystemTime::now(); + start.duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u32 +} +impl FileAttrCache { + pub fn new(attr: FileAttrInfo) -> FileAttrCache{ + FileAttrCache { + attr, + exp_time: current_time_in_ms() + 10_000, + } + } +} +use std::sync::mpsc; +pub enum FileAttrCacheMsg { + Get(String, Arc>), + GetReply(String, Option), + Set(String, FileAttrInfo), + Clr(Vec), +} +pub struct FileAttrCacheCtx{ + pub cache: HashMap, + pub cache_size: u32, + rx: Arc>>, +} + +impl FileAttrCacheCtx { + pub fn new(rx: mpsc::Receiver) -> FileAttrCacheCtx { + FileAttrCacheCtx{ + cache: HashMap::new(), + cache_size: 0, + rx: Arc::new(Mutex::new(rx)), + } + } +} + +pub struct FileAttrCacheManager{ + handle: thread::JoinHandle<()>, + tx: mpsc::Sender, +} + +impl FileAttrCacheManager { + pub fn new() -> FileAttrCacheManager{ + let (tx, rx) = mpsc::channel(); + let cache_ctx = FileAttrCacheCtx::new(rx); + let handle = thread::spawn(|| { + Self::cache_manager(cache_ctx); + }); + FileAttrCacheManager{ + handle, + tx, + } + } + + fn cache_manager(mut ctx: FileAttrCacheCtx) { + let rx = ctx.rx.lock().unwrap(); + + fn invalid_cache(cache: &mut HashMap) -> u32 { + let mut cache_size = 0; + let curr = current_time_in_ms(); + let del:Vec = cache.iter() + .filter(|&(_, val)| val.exp_time < curr) + .map(|(key, _)| key.clone()) + .collect(); + for key in del { + cache.remove(&key); + log::debug!("invalid cache: {}", key); + cache_size += 1; + } + cache_size + } + loop { + match rx.recv_timeout(Duration::from_millis(200)) { + Ok(req) => { + if ctx.cache_size > 256 { + ctx.cache_size -= invalid_cache(&mut ctx.cache); + } + match req { + FileAttrCacheMsg::Set(path, attr) => { + ctx.cache.insert(path, FileAttrCache::new(attr)); + ctx.cache_size += 1; + }, + FileAttrCacheMsg::Get(path, chn) => { + match ctx.cache.get(&path) { + Some(attr) => { + let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr))); + }, + None => { + let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); + } + } + }, + FileAttrCacheMsg::Clr(paths) => { + for path in paths.into_iter() { + ctx.cache.remove(&path); + ctx.cache_size -= 1; + } + }, + _ => {} + } + }, + Err(_) =>{ + ctx.cache_size -= invalid_cache(&mut ctx.cache); + }, + } + } + } + + pub fn set(&self, path:&String, attr: FileAttrInfo){ + let _ = self.tx.send(FileAttrCacheMsg::Set(path.to_string(), attr)); + } + + pub fn get(&self, path:&String, tx: &Arc>, rx: &Arc>>) -> Option { + //send get gile attribute request + let _ = self.tx.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone())); + let total_timeout = Duration::from_millis(100); + let start = Instant::now(); + loop { + let elapsed = start.elapsed(); + if elapsed >= total_timeout { + return None + } + let timeout = total_timeout - elapsed; + let resp = { + match rx.lock().unwrap().recv_timeout(timeout){ + Ok(resp) => resp, + Err(_e) => return None, + } + }; + match resp { + FileAttrCacheMsg::GetReply(rpath, resp) => { + if *path == rpath { + //message is not match + return resp; + } + }, + _ => break, + } + } + None + } + + pub fn clr(&self, paths: Vec) { + let _ = self.tx.send(FileAttrCacheMsg::Clr(paths)); + } +} + pub struct LwsVfsIns { pub config: Config, rpcs: RpcPool, - async_rt: tokio::runtime::Runtime, vir_root: VirFs, + cache: FileAttrCacheManager, } fn mode_to_filetype(mode: libc::mode_t) -> FileType { @@ -297,18 +467,17 @@ fn mode_to_filetype(mode: libc::mode_t) -> FileType { use fuse_mt::*; use std::path::Path; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; impl FilesystemMT for LwsVfsIns { /// Called on mount, before any other function. fn init(&self, _req: RequestInfo) -> ResultEmpty { - let resp = self.async_rt.block_on(self.hello()); - match resp { + match self.init() { Ok(()) => { - println!("init vir fs success"); + log::info!("init vir fs success"); Ok(()) - }, + } Err(e) => { - println!("hello err: {:?}", e); + log::error!("hello err: {:?}", e); Err(libc::ENOMSG) } } @@ -316,19 +485,18 @@ impl FilesystemMT for LwsVfsIns { /// Called on filesystem unmount. fn destroy(&self) { - // Nothing. } /// Get the attributes of a filesystem entry. /// /// * `fh`: a file handle if this is called on an open file. - fn getattr(&self, _req: RequestInfo, path: &Path, fh: Option) -> ResultEntry { + fn getattr(&self, req: RequestInfo, path: &Path, fh: Option) -> ResultEntry { let time = |timens| { let secs = timens / (1000 * 1000); let nanos = timens % (1000 * 1000); SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32) }; - let file_attr = |fst: &Fstat| FileAttr { + let file_attr = |fst: &Fstat, req:&RequestInfo| FileAttr { size: fst.fst_size, blocks: fst.fst_blocks, atime: time(fst.fst_atime), @@ -338,52 +506,66 @@ impl FilesystemMT for LwsVfsIns { kind: mode_to_filetype(fst.fst_mode as u32), perm: fst.fst_mode as u16, nlink: fst.fst_nlink as u32, - uid: fst.fst_uid as u32, - gid: fst.fst_gid as u32, + uid: req.uid, + gid: req.gid, rdev: fst.fst_rdev as u32, flags: 0, }; - // println!("getattr {:?}", path); - - // let path = ; - if let Some(node) = path.to_str() { - if let Some(node) = self.vir_root.find(node) { - return Ok((Duration::new(0, 0), file_attr(&node.stat))); - } + + let path = path.to_string_lossy().into_owned(); + if let Some(node) = self.vir_root.find(&path) { + return Ok((Duration::new(0, 0), file_attr(&node.stat, &req))); } - let request = tonic::Request::new(Getattr { - path: path.to_string_lossy().into_owned(), - fi: Some(FileInfo { - fh: match fh { - Some(f) => f, - None => 0, - }, - ..FileInfo::default() - }), - ..Getattr::default() - }); + log::trace!("lws getattr {:?}", path); let (rpc, resp) = { let client = match self.rpcs.get() { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fgetattr(request)); + if let Some(attr) = self.cache.get(&path, &rpc.tx, &rpc.rx) { + let d = client.clone(); + drop(rpc); + self.rpcs.put(d); + match attr { + Ok(attr) => return Ok((Instant::now().elapsed(), attr)), + Err(_e) => { + return Err(libc::ENOENT); + } + } + } + let request = tonic::Request::new(Getattr { + path: path.to_string(), + fi: Some(FileInfo { + fh: match fh { + Some(f) => f, + None => 0, + }, + ..FileInfo::default() + }), + ..Getattr::default() + }); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fgetattr(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { - return Err(libc::ENOMSG); + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { + self.cache.set(&path, Err(libc::ENOENT)); + return Err(libc::ENOENT); } - Ok((Duration::new(0, 0), file_attr(&resp.stat.unwrap()))) + let attr = file_attr(&resp.stat.unwrap(), &req); + self.cache.set(&path, Ok(attr.clone())); + Ok((Instant::now().elapsed(), attr)) } // The following operations in the FUSE C API are all one kernel call: setattr @@ -429,19 +611,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.ftruncate(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.ftruncate(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -487,19 +671,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.futimens(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.futimens(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -537,7 +723,7 @@ if resp.ret != 0 { fn mkdir(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32) -> ResultEntry { let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); if let Some(_unused) = self.vir_root.find(&path) { - return Err(libc::ENOSYS) + return Err(libc::ENOSYS); } let request = tonic::Request::new(Mkdir { path: path.to_string(), @@ -550,19 +736,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fmkdir(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fmkdir(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } self.getattr(req, Path::new(&path), None) @@ -584,19 +772,22 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.funlink(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.funlink(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + self.cache.clr(vec![path]); + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -621,19 +812,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frmdir(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.frmdir(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -682,19 +875,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frename(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.frename(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -724,9 +919,8 @@ if resp.ret != 0 { /// calls that operate on the file, and can be any value you choose, though it should allow /// your filesystem to identify the file opened even without any path info. fn open(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen { - println!("open flags is {}", flags); let request = tonic::Request::new(Open { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), fi: Some(FileInfo { flags, ..FileInfo::default() @@ -739,19 +933,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fopen(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fopen(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } let fi = resp.fi.unwrap(); @@ -782,7 +978,7 @@ if resp.ret != 0 { callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult, ) -> CallbackResult { let request = tonic::Request::new(Read { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), fi: Some(FileInfo { fh, ..FileInfo::default() @@ -797,19 +993,21 @@ if resp.ret != 0 { Err(_) => return callback(Err(libc::ENOMSG)), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fread(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fread(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return callback(Err(libc::ENOMSG)); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get fread resp size is: {}", resp.size); + if resp.ret != 0 { return callback(Err(libc::ENOMSG)); } callback(Ok(&resp.buff)) @@ -834,7 +1032,7 @@ if resp.ret != 0 { _flags: u32, ) -> ResultWrite { let request = tonic::Request::new(Write { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), fi: Some(FileInfo { fh, ..FileInfo::default() @@ -850,18 +1048,20 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fwrite(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fwrite(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); + log::trace!("get resp: {:?}", resp); if resp.ret != 0 { return Err(libc::ENOMSG); } @@ -881,7 +1081,7 @@ if resp.ret != 0 { /// belonging to this lock owner. fn flush(&self, _req: RequestInfo, path: &Path, fh: u64, _lock_owner: u64) -> ResultEmpty { let request = tonic::Request::new(Flush { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), fi: Some(FileInfo { fh, ..FileInfo::default() @@ -894,19 +1094,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fflush(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fflush(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -932,8 +1134,9 @@ if resp.ret != 0 { _lock_owner: u64, flush: bool, ) -> ResultEmpty { + let path = path.to_string_lossy().into_owned(); let request = tonic::Request::new(Release { - path: path.to_str().unwrap().to_string(), + path: path.to_string(), fi: Some(FileInfo { fh, ..FileInfo::default() @@ -947,21 +1150,24 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frelease(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.frelease(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } + // self.cache.clr(vec![path]); Ok(()) } @@ -989,7 +1195,7 @@ if resp.ret != 0 { fn opendir(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen { let path = path.to_str().unwrap(); if let Some(_node) = self.vir_root.find(&path) { - return Ok((6666, flags)) + return Ok((6666, flags)); } let request = tonic::Request::new(Opendir { path: path.to_string(), @@ -1001,19 +1207,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fopendir(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fopendir(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok((resp.fi.unwrap().fh, flags)) @@ -1064,22 +1272,26 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.freaddir(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.freaddir(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } let mut dirs = vec![]; + // let clrs: Vec = resp.dirs.iter().map(|dir| dir.name.clone().into()).collect(); + // self.cache.clr(clrs); for dir in resp.dirs { dirs.push(DirectoryEntry { name: dir.name.into(), @@ -1098,10 +1310,10 @@ if resp.ret != 0 { /// * `flags`: the file access flags passed to the `opendir` call. fn releasedir(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32) -> ResultEmpty { if fh == 6666 { - return Ok(()) + return Ok(()); } let request = tonic::Request::new(Releasedir { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), fi: Some(FileInfo { fh, ..FileInfo::default() @@ -1114,19 +1326,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.freleasedir(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.freleasedir(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } Ok(()) @@ -1225,7 +1439,7 @@ if resp.ret != 0 { mode as u32 } let request = tonic::Request::new(Access { - path: path.to_str().unwrap().to_string(), + path: path.to_string_lossy().into_owned(), mask: mask2mode(mask as i32), ..Access::default() }); @@ -1235,19 +1449,21 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.faccess(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.faccess(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::EACCES); } Ok(()) @@ -1285,21 +1501,24 @@ if resp.ret != 0 { Err(_) => return Err(libc::EAGAIN), }; let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fcreate(request)); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.fcreate(request)); (client.clone(), resp) }; self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { - println!("get resp err: {:?}", e); + log::error!("get resp err: {:?}", e); return Err(libc::ENOMSG); } }; - println!("get resp: {:?}", resp); -if resp.ret != 0 { + log::trace!("get resp: {:?}", resp); + if resp.ret != 0 { return Err(libc::ENOMSG); } + self.cache.clr(vec![path.to_string()]); let fi = resp.fi.unwrap(); if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) { return Ok(CreatedEntry { @@ -1312,14 +1531,14 @@ if resp.ret != 0 { Err(libc::ENOMSG) } } -use std::{env, vec}; + + + impl LwsVfsIns { pub async fn new(json: &str) -> Result> { let config = Config::new(json)?; let addr = format!("http://{}:{}", config.get_addr(), config.get_port()); let rpcs = RpcPool::new(10, addr).await?; - let async_rt = tokio::runtime::Runtime::new().unwrap(); - let mut vir_root = VirFs::new(); for mount in config.get_mount() { vir_root.add(mount); @@ -1327,10 +1546,27 @@ impl LwsVfsIns { Ok(LwsVfsIns { config, rpcs, - async_rt, vir_root, + cache:FileAttrCacheManager::new(), }) } + pub fn init(&self) -> Result<(), Box> { + let request = tonic::Request::new(HelloRequest { + name: "Ekko lws hello".into(), + }); + let (rpc, resp) = { + let client = self.rpcs.get()?; + let mut rpc = client.write().unwrap(); + let rt = rpc.rt.clone(); + let rpc = &mut (rpc.client); + let resp = rt.block_on(rpc.say_hello(request)); + (client.clone(), resp) + }; + self.rpcs.put(rpc); + log::trace!("resp={:?}", resp); + Ok(()) + } + pub async fn hello(&self) -> Result<(), Box> { let request = tonic::Request::new(HelloRequest { name: "Ekko lws hello".into(), @@ -1342,7 +1578,7 @@ impl LwsVfsIns { (client.clone(), resp) }; self.rpcs.put(rpc); - println!("resp={:?}", resp); + log::trace!("resp={:?}", resp); Ok(()) } @@ -1351,10 +1587,10 @@ impl LwsVfsIns { F: FilesystemMT + Sync + Send + 'static, { let args: Vec = env::args_os().collect(); - println!("args is {:?}", args); + log::info!("args is {:?}", args); let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=passthrufs")]; fuse_mt::mount( - fuse_mt::FuseMT::new(file_system, 1), + fuse_mt::FuseMT::new(file_system, 10), &args[2], &fuse_args[..], )?;