diff --git a/src/client.rs b/src/client.rs index b435479..222d76d 100755 --- a/src/client.rs +++ b/src/client.rs @@ -1,19 +1,41 @@ -use lws_client::LwsVfsIns; -#[tokio::main] -async fn main() -> Result<(), Box> { - let lws_ins = match LwsVfsIns::new("config.json").await{ - Ok(ins) => ins, - Err(e) => { - println!("Error creating lws server instance: {:?}", e); - return Err(e); - } - }; - match lws_ins.hello(){ - Err(e) => { - println!("lws client instance hello err {:?}", e); - return Err(e); - }, - _ =>{}, - } - LwsVfsIns::mount(lws_ins) -} +use lws_client::LwsVfsIns; +use std::thread; +#[tokio::main] +async fn main() -> Result<(), Box> { + let lws_ins = match LwsVfsIns::new("config.json").await { + Ok(ins) => ins, + Err(e) => { + println!("Error creating lws server instance: {:?}", e); + return Err(e); + } + }; + println!("start hello process"); + match lws_ins.hello().await { + Err(e) => { + println!("lws client instance hello err {:?}", e); + return Err(e); + } + _ => {} + } + println!("start mount process"); + let handle = thread::spawn(move ||{ + match LwsVfsIns::mount(lws_ins) { + Ok(_) => { + Ok::(0) + }, + Err(e) => { + println!("mount err {:?}", e); + Ok::(-1) + } + } + }); + match handle.join() { + Ok(_) => { + Ok(()) + }, + Err(e) => { + eprintln!("mount thread start err {:?}", e); + Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "mount fail"))) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 5ec0fd1..ed639e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,15 @@ -use lws_client::{Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write}; -use tonic::transport::Channel as RpcChannel; -use std::io::{Read as IoRead, Error as IoError, ErrorKind}; -use std::error::Error; -use std::fs::File; -use std::collections::HashMap; -use serde_json::{self, Value}; use lws_client::lws_vfs_client::LwsVfsClient; +use lws_client::{ + Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, + Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, +}; +use serde_json::{self, Value}; +use std::collections::HashMap; +use std::error::Error; use std::ffi::{OsStr, OsString}; +use std::fs::File; +use std::io::{Error as IoError, ErrorKind, Read as IoRead}; +use tonic::transport::Channel as RpcChannel; pub mod lws_client { tonic::include_proto!("lws_vfs"); //导入lws vfs proto buffer @@ -29,7 +32,10 @@ impl Config { None => 5001, }; let addr: String = match json.get("addr") { - Some(addr) => addr.as_str().expect("expect addr is a String but its not").to_string(), + Some(addr) => addr + .as_str() + .expect("expect addr is a String but its not") + .to_string(), None => "localhost".to_string(), }; let mounts = match json.get("mount") { @@ -50,7 +56,11 @@ impl Config { ) }) .collect(); - Ok(Config { port, addr, mount_map }) + Ok(Config { + port, + addr, + mount_map, + }) } pub fn get_port(&self) -> u16 { @@ -68,11 +78,11 @@ impl Config { ret } } -use std::sync::{Arc,Mutex, RwLock, Condvar}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; struct RpcPool { clients: Arc>>>>, index: Arc>, - wait: Arc<(Mutex<()>,Condvar)>, + wait: Arc<(Mutex<()>, Condvar)>, } struct Rpc { @@ -85,7 +95,7 @@ impl RpcPool { let mut clients = Vec::with_capacity(size); let uri = uri.to_ascii_lowercase(); println!("create rpc pool {}", uri); - let channel = RpcChannel::from_shared(uri)?.connect().await?; + let channel = RpcChannel::from_shared(uri)?.connect().await?; for _ in 0..size { let client = LwsVfsClient::new(channel.clone()); clients.push(Arc::new(RwLock::new(Rpc { @@ -93,17 +103,17 @@ impl RpcPool { in_use: false, }))); } - let wait =(Mutex::new(()), Condvar::new()); + let wait = (Mutex::new(()), Condvar::new()); Ok(Self { clients: Arc::new(Mutex::new(clients)), index: Arc::new(RwLock::new(0)), - wait:Arc::new(wait), + wait: Arc::new(wait), }) } fn get(&self) -> Result>, Box> { // 优化遍历的逻辑 - let find_idle = |rpc_pool:&Self| -> Result>, Box>{ + let find_idle = |rpc_pool: &Self| -> Result>, Box> { let cs = self.clients.clone(); let clients = &(cs.lock().unwrap()); let size = clients.len(); @@ -118,7 +128,10 @@ impl RpcPool { return Ok(clients[idx].clone()); } } - Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client fail"))) + Err(Box::new(IoError::new( + ErrorKind::Other, + "get free rpc client fail", + ))) }; loop { if let Ok(rpc) = find_idle(self) { @@ -132,7 +145,10 @@ impl RpcPool { break; } } - Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client time out"))) + Err(Box::new(IoError::new( + ErrorKind::Other, + "get free rpc client time out", + ))) } fn put(&self, client: Arc>) { @@ -153,33 +169,43 @@ impl RpcPool { } struct VirFs { - _name: String, + name: String, stat: lws_client::Fstat, sub: HashMap, is_root: bool, } impl VirFs { - fn create (name:T, is_dir: bool) -> VirFs - where T: ToString + fn create(name: T, is_dir: bool) -> VirFs + where + T: ToString, { - let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; VirFs { - _name : name.to_string(), - stat : lws_client::Fstat { - fst_mode: 0o0755 | {if is_dir {libc::S_IFDIR} else {0}} as u64, + name: name.to_string(), + stat: lws_client::Fstat { + fst_mode: 0o0755 | { + if is_dir { + libc::S_IFDIR + } else { + 0 + } + } as u64, fst_ino: 0, fst_dev: 0, fst_rdev: 0, - fst_nlink: 0, - fst_uid: 0, - fst_gid: 0, + fst_nlink: 0, + fst_uid: 0, + fst_gid: 0, fst_size: 0, - fst_atime: now, - fst_mtime: now, - fst_ctime: now, - fst_blksize: 0, - fst_blocks: 0 + fst_atime: now, + fst_mtime: now, + fst_ctime: now, + fst_blksize: 0, + fst_blocks: 0, }, sub: HashMap::new(), is_root: true, @@ -191,42 +217,56 @@ impl VirFs { node } - pub fn add (&mut self, name:T) - where T: ToString + pub fn add(&mut self, name: T) + where + T: ToString, { let mut node = VirFs::create(name.to_string(), true); node.is_root = false; self.sub.insert(name.to_string(), node); } - pub fn find (&self, name:T) -> Option<&VirFs> - where T: ToString + pub fn find(&self, name: T) -> Option<&VirFs> + where + T: ToString, { let name = name.to_string(); let mut vname = name.split('/').filter(|x| x.len() > 0); - self.match_path(&mut vname) + println!("try match {:?}", vname); + if let Some(current) = vname.next() { + if self.name == current { + self.match_path(&mut vname) + } else { + None + } + } else { + None + } + } - pub fn match_path <'a> (&self, name_list: &mut impl Iterator) -> Option<&VirFs> { + pub fn match_path<'a>(&self, name_list: &mut impl Iterator) -> Option<&VirFs> { if let Some(current) = name_list.next() { - match self.sub.get(current){ + match self.sub.get(current) { Some(v) => { + // try match sub dir if let Some(sub) = v.match_path(name_list) { Some(sub) + // sub dirs not match } else { - Some(v) + None } } None => None, } - } else { - None + } else { // find over + Some(self) } } pub fn readdir(&self) -> Vec { let mut ret = vec![]; - for name in self.sub.keys(){ + for name in self.sub.keys() { ret.push(name.to_string()); } ret @@ -237,7 +277,7 @@ pub struct LwsVfsIns { pub config: Config, rpcs: RpcPool, async_rt: tokio::runtime::Runtime, - vir_root :VirFs + vir_root: VirFs, } fn mode_to_filetype(mode: libc::mode_t) -> FileType { @@ -247,9 +287,11 @@ fn mode_to_filetype(mode: libc::mode_t) -> FileType { libc::S_IFLNK => FileType::Symlink, libc::S_IFBLK => FileType::BlockDevice, libc::S_IFCHR => FileType::CharDevice, - libc::S_IFIFO => FileType::NamedPipe, + libc::S_IFIFO => FileType::NamedPipe, libc::S_IFSOCK => FileType::Socket, - _ => { panic!("unknown file type"); } + _ => { + panic!("unknown file type"); + } } } @@ -259,13 +301,15 @@ use std::time::{Duration, SystemTime}; impl FilesystemMT for LwsVfsIns { /// Called on mount, before any other function. fn init(&self, _req: RequestInfo) -> ResultEmpty { - match self.hello() { + let resp = self.async_rt.block_on(self.hello()); + match resp { Ok(()) => { + println!("init vir fs success"); Ok(()) }, - Err(e) =>{ + Err(e) => { println!("hello err: {:?}", e); - Err(libc::ENOSYS) + Err(libc::ENOMSG) } } } @@ -284,34 +328,29 @@ impl FilesystemMT for LwsVfsIns { let nanos = timens % (1000 * 1000); SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32) }; - let file_attr = |fst:&Fstat|{ - FileAttr { - size: fst.fst_size, - blocks: fst.fst_blocks, - atime: time(fst.fst_atime), - mtime: time(fst.fst_mtime), - ctime: time(fst.fst_ctime), - crtime: SystemTime::UNIX_EPOCH, - kind: mode_to_filetype(fst.fst_mode as u32), - perm: fst.fst_mode as u16, - nlink: fst.fst_nlink as u32, - uid: fst.fst_uid as u32, - gid: fst.fst_gid as u32, - rdev: fst.fst_rdev as u32, - flags: 0, - } + let file_attr = |fst: &Fstat| FileAttr { + size: fst.fst_size, + blocks: fst.fst_blocks, + atime: time(fst.fst_atime), + mtime: time(fst.fst_mtime), + ctime: time(fst.fst_ctime), + crtime: SystemTime::UNIX_EPOCH, + kind: mode_to_filetype(fst.fst_mode as u32), + perm: fst.fst_mode as u16, + nlink: fst.fst_nlink as u32, + uid: fst.fst_uid as u32, + gid: fst.fst_gid as u32, + rdev: fst.fst_rdev as u32, + flags: 0, }; let path = path.to_str().unwrap(); if let Some(node) = self.vir_root.find(path) { - return Ok(( - Duration::new(0,0), - file_attr(&node.stat), - )) + return Ok((Duration::new(0, 0), file_attr(&node.stat))); } let request = tonic::Request::new(Getattr { - path : path.into(), - fi : Some(FileInfo { - fh : match fh { + path: path.into(), + fi: Some(FileInfo { + fh: match fh { Some(f) => f, None => 0, }, @@ -319,27 +358,27 @@ impl FilesystemMT for LwsVfsIns { }), ..Getattr::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fgetattr(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fgetattr(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), - Err(e) => { + Err(e) => { println!("get resp err: {:?}", e); - return Err(libc::ENOMSG) - }, + return Err(libc::ENOMSG); + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } - Ok(( - Duration::new(0,0), - file_attr(&resp.stat.unwrap()), - )) + Ok((Duration::new(0, 0), file_attr(&resp.stat.unwrap()))) } // The following operations in the FUSE C API are all one kernel call: setattr @@ -350,7 +389,7 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: a file handle if this is called on an open file. /// * `mode`: the mode to change the file to. fn chmod(&self, _req: RequestInfo, _path: &Path, _fh: Option, _mode: u32) -> ResultEmpty { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Change the owner UID and/or group GID of a filesystem entry. @@ -358,8 +397,15 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: a file handle if this is called on an open file. /// * `uid`: user ID to change the file's owner to. If `None`, leave the UID unchanged. /// * `gid`: group ID to change the file's group to. If `None`, leave the GID unchanged. - fn chown(&self, _req: RequestInfo, _path: &Path, _fh: Option, _uid: Option, _gid: Option) -> ResultEmpty { - Err(libc::ENOSYS) + fn chown( + &self, + _req: RequestInfo, + _path: &Path, + _fh: Option, + _uid: Option, + _gid: Option, + ) -> ResultEmpty { + Err(libc::ENOMSG) } /// Set the length of a file. @@ -368,26 +414,29 @@ impl FilesystemMT for LwsVfsIns { /// * `size`: size in bytes to set as the file's length. fn truncate(&self, _req: RequestInfo, path: &Path, _fh: Option, size: u64) -> ResultEmpty { let request = tonic::Request::new(Truncate { - path : path.to_str().unwrap().into(), + path: path.to_str().unwrap().into(), size: size as i64, ..Truncate::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.ftruncate(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.ftruncate(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), - Err(e) => { + Err(e) => { println!("get resp err: {:?}", e); - return Err(libc::ENOMSG) - }, + return Err(libc::ENOMSG); + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -397,49 +446,54 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: a file handle if this is called on an open file. /// * `atime`: the time of last access. /// * `mtime`: the time of last modification. - fn utimens(&self, _req: RequestInfo, path: &Path, _fh: Option, atime: Option, mtime: Option) -> ResultEmpty { - - let time = move |time| -> Timespec{ - let time = match time { + fn utimens( + &self, + _req: RequestInfo, + path: &Path, + _fh: Option, + atime: Option, + mtime: Option, + ) -> ResultEmpty { + let time = move |time| -> Timespec { + let time = match time { Some(time) => time, None => SystemTime::now(), }; - match time.duration_since(SystemTime::UNIX_EPOCH){ - Ok(dur) => { - Timespec{ - tv_sec:dur.as_secs() as i32, - tv_nsec: dur.subsec_nanos() as i64, - } + match time.duration_since(SystemTime::UNIX_EPOCH) { + Ok(dur) => Timespec { + tv_sec: dur.as_secs() as i32, + tv_nsec: dur.subsec_nanos() as i64, }, - Err(_e) => { - Timespec::default() - } + Err(_e) => Timespec::default(), } }; let at = time(atime); let mt = time(mtime); - + let request = tonic::Request::new(Utimens { - path : path.to_str().unwrap().into(), + path: path.to_str().unwrap().into(), ts: vec![at, mt], ..Utimens::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.futimens(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.futimens(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), - Err(e) => { + Err(e) => { println!("get resp err: {:?}", e); - return Err(libc::ENOMSG) - }, + return Err(libc::ENOMSG); + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -448,7 +502,7 @@ impl FilesystemMT for LwsVfsIns { /// Read a symbolic link. fn readlink(&self, _req: RequestInfo, _path: &Path) -> ResultData { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Create a special file. @@ -457,8 +511,15 @@ impl FilesystemMT for LwsVfsIns { /// * `name`: name of the entry. /// * `mode`: mode for the new entry. /// * `rdev`: if mode has the bits `S_IFCHR` or `S_IFBLK` set, this is the major and minor numbers for the device file. Otherwise it should be ignored. - fn mknod(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _mode: u32, _rdev: u32) -> ResultEntry { - Err(libc::ENOSYS) + fn mknod( + &self, + _req: RequestInfo, + _parent: &Path, + _name: &OsStr, + _mode: u32, + _rdev: u32, + ) -> ResultEntry { + Err(libc::ENOMSG) } /// Create a directory. @@ -469,29 +530,32 @@ impl FilesystemMT for LwsVfsIns { fn mkdir(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32) -> ResultEntry { let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); if let Some(_unused) = self.vir_root.find(&path) { - return Err(libc::ENOSYS) + return Err(libc::ENOMSG); } let request = tonic::Request::new(Mkdir { path: path.to_string(), mode, ..Mkdir::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fmkdir(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fmkdir(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), - Err(e) => { + Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } self.getattr(req, Path::new(&path), None) } @@ -506,22 +570,25 @@ impl FilesystemMT for LwsVfsIns { path: path.to_string(), ..Unlink::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.funlink(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.funlink(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), - Err(e) => { + Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -533,28 +600,31 @@ impl FilesystemMT for LwsVfsIns { fn rmdir(&self, _req: RequestInfo, parent: &Path, name: &OsStr) -> ResultEmpty { let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); if let Some(_unused) = self.vir_root.find(&path) { - return Err(libc::ENOSYS) + return Err(libc::ENOMSG); } let request = tonic::Request::new(Rmdir { path: path.to_string(), ..Rmdir::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frmdir(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frmdir(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -564,8 +634,14 @@ impl FilesystemMT for LwsVfsIns { /// * `parent`: path to the directory to make the link in. /// * `name`: name of the symbolic link. /// * `target`: path (may be relative or absolute) to the target of the link. - fn symlink(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _target: &Path) -> ResultEntry { - Err(libc::ENOSYS) + fn symlink( + &self, + _req: RequestInfo, + _parent: &Path, + _name: &OsStr, + _target: &Path, + ) -> ResultEntry { + Err(libc::ENOMSG) } /// Rename a filesystem entry. @@ -574,31 +650,41 @@ impl FilesystemMT for LwsVfsIns { /// * `name`: name of the existing entry. /// * `newparent`: path to the directory it should be renamed into (may be the same as `parent`). /// * `newname`: name of the new entry. - fn rename(&self, _req: RequestInfo, parent: &Path, name: &OsStr, _newparent: &Path, _newname: &OsStr) -> ResultEmpty { + fn rename( + &self, + _req: RequestInfo, + parent: &Path, + name: &OsStr, + _newparent: &Path, + _newname: &OsStr, + ) -> ResultEmpty { let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); if let Some(_unused) = self.vir_root.find(&path) { - return Err(libc::ENOSYS) + return Err(libc::ENOMSG); } let request = tonic::Request::new(Rename { path: path.to_string(), ..Rename::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frename(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frename(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -608,8 +694,14 @@ impl FilesystemMT for LwsVfsIns { /// * `path`: path to an existing file. /// * `newparent`: path to the directory for the new link. /// * `newname`: name for the new link. - fn link(&self, _req: RequestInfo, _path: &Path, _newparent: &Path, _newname: &OsStr) -> ResultEntry { - Err(libc::ENOSYS) + fn link( + &self, + _req: RequestInfo, + _path: &Path, + _newparent: &Path, + _newname: &OsStr, + ) -> ResultEntry { + Err(libc::ENOMSG) } /// Open a file. @@ -623,28 +715,31 @@ impl FilesystemMT for LwsVfsIns { fn open(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen { let request = tonic::Request::new(Open { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { flags, ..FileInfo::default() }), ..Open::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fopen(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fopen(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } let fi = resp.fi.unwrap(); Ok((fi.fh, fi.flags)) @@ -664,32 +759,43 @@ impl FilesystemMT for LwsVfsIns { /// the result data as a slice, or an error code. /// /// Return the return value from the `callback` function. - fn read(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, _size: u32, callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult) -> CallbackResult { + fn read( + &self, + _req: RequestInfo, + path: &Path, + fh: u64, + offset: u64, + _size: u32, + callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult, + ) -> CallbackResult { let request = tonic::Request::new(Read { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), offset, ..Read::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return callback(Err(libc::ENOSYS)), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return callback(Err(libc::ENOMSG)), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fread(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fread(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return callback(Err(libc::ENOMSG)); - }, + } }; if resp.ret != 0 { - return callback(Err(libc::ENOSYS)); + return callback(Err(libc::ENOMSG)); } callback(Ok(&resp.buff)) } @@ -703,10 +809,18 @@ impl FilesystemMT for LwsVfsIns { /// * `flags`: /// /// Return the number of bytes written. - fn write(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, data: Vec, _flags: u32) -> ResultWrite { + fn write( + &self, + _req: RequestInfo, + path: &Path, + fh: u64, + offset: u64, + data: Vec, + _flags: u32, + ) -> ResultWrite { let request = tonic::Request::new(Write { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), @@ -715,22 +829,25 @@ impl FilesystemMT for LwsVfsIns { buff: data, ..Write::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fwrite(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fwrite(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(resp.buff.len() as u32) } @@ -749,28 +866,31 @@ impl FilesystemMT for LwsVfsIns { fn flush(&self, _req: RequestInfo, path: &Path, fh: u64, _lock_owner: u64) -> ResultEmpty { let request = tonic::Request::new(Flush { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), ..Flush::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fflush(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fflush(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -786,32 +906,43 @@ impl FilesystemMT for LwsVfsIns { /// * `lock_owner`: if the filesystem supports locking (`setlk`, `getlk`), remove all locks /// belonging to this lock owner. /// * `flush`: whether pending data must be flushed or not. - fn release(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32, _lock_owner: u64, flush: bool) -> ResultEmpty { + fn release( + &self, + _req: RequestInfo, + path: &Path, + fh: u64, + _flags: u32, + _lock_owner: u64, + flush: bool, + ) -> ResultEmpty { let request = tonic::Request::new(Release { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), - flush:flush as u32, + flush: flush as u32, ..Release::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frelease(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.frelease(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -824,7 +955,7 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: file handle returned from the `open` call. /// * `datasync`: if `false`, also write metadata, otherwise just write file data. fn fsync(&self, _req: RequestInfo, _path: &Path, _fh: u64, _datasync: bool) -> ResultEmpty { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Open a directory. @@ -842,22 +973,25 @@ impl FilesystemMT for LwsVfsIns { path: path.to_str().unwrap().to_string(), ..Opendir::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fopendir(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fopendir(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok((resp.fi.unwrap().fh, flags)) } @@ -873,12 +1007,12 @@ impl FilesystemMT for LwsVfsIns { match kind { 0 => FileType::RegularFile, 1 => FileType::Directory, - 2 => FileType::Symlink, + 2 => FileType::Symlink, 3 => FileType::CharDevice, 4 => FileType::BlockDevice, 5 => FileType::NamedPipe, 6 => FileType::Socket, - _ => FileType::RegularFile, + _ => FileType::RegularFile, } } let path = path.to_str().unwrap(); @@ -886,7 +1020,7 @@ impl FilesystemMT for LwsVfsIns { let mut dirs = vec![]; let resp = node.readdir(); for name in resp { - dirs.push(DirectoryEntry{ + dirs.push(DirectoryEntry { name: name.into(), kind: FileType::Directory, }); @@ -895,32 +1029,35 @@ impl FilesystemMT for LwsVfsIns { } let request = tonic::Request::new(Readdir { path: path.to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), ..Readdir::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.freaddir(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.freaddir(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } let mut dirs = vec![]; for dir in resp.dirs { - dirs.push(DirectoryEntry{ + dirs.push(DirectoryEntry { name: dir.name.into(), kind: filetype(dir.kind), }); @@ -938,28 +1075,31 @@ impl FilesystemMT for LwsVfsIns { fn releasedir(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32) -> ResultEmpty { let request = tonic::Request::new(Releasedir { path: path.to_str().unwrap().to_string(), - fi:Some(FileInfo{ + fi: Some(FileInfo { fh, ..FileInfo::default() }), ..Releasedir::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.freleasedir(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.freleasedir(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -968,7 +1108,7 @@ impl FilesystemMT for LwsVfsIns { /// /// Analogous to the `fsync` call. fn fsyncdir(&self, _req: RequestInfo, _path: &Path, _fh: u64, _datasync: bool) -> ResultEmpty { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Get filesystem statistics. @@ -977,7 +1117,7 @@ impl FilesystemMT for LwsVfsIns { /// /// See the `Statfs` struct for more details. fn statfs(&self, _req: RequestInfo, _path: &Path) -> ResultStatfs { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Set a file extended attribute. @@ -987,8 +1127,16 @@ impl FilesystemMT for LwsVfsIns { /// * `value`: the data to set the value to. /// * `flags`: can be either `XATTR_CREATE` or `XATTR_REPLACE`. /// * `position`: offset into the attribute value to write data. - fn setxattr(&self, _req: RequestInfo, _path: &Path, _name: &OsStr, _value: &[u8], _flags: u32, _position: u32) -> ResultEmpty { - Err(libc::ENOSYS) + fn setxattr( + &self, + _req: RequestInfo, + _path: &Path, + _name: &OsStr, + _value: &[u8], + _flags: u32, + _position: u32, + ) -> ResultEmpty { + Err(libc::ENOMSG) } /// Get a file extended attribute. @@ -1000,7 +1148,7 @@ impl FilesystemMT for LwsVfsIns { /// If `size` is 0, return `Xattr::Size(n)` where `n` is the size of the attribute data. /// Otherwise, return `Xattr::Data(data)` with the requested data. fn getxattr(&self, _req: RequestInfo, _path: &Path, _name: &OsStr, _size: u32) -> ResultXattr { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// List extended attributes for a file. @@ -1013,7 +1161,7 @@ impl FilesystemMT for LwsVfsIns { /// Otherwise, return `Xattr::Data(data)` where `data` is all the null-terminated attribute /// names. fn listxattr(&self, _req: RequestInfo, _path: &Path, _size: u32) -> ResultXattr { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Remove an extended attribute for a file. @@ -1021,7 +1169,7 @@ impl FilesystemMT for LwsVfsIns { /// * `path`: path to the file. /// * `name`: name of the attribute to remove. fn removexattr(&self, _req: RequestInfo, _path: &Path, _name: &OsStr) -> ResultEmpty { - Err(libc::ENOSYS) + Err(libc::ENOMSG) } /// Check for access to a file. @@ -1037,22 +1185,25 @@ impl FilesystemMT for LwsVfsIns { mask, ..Access::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.faccess(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.faccess(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } Ok(()) } @@ -1066,32 +1217,42 @@ impl FilesystemMT for LwsVfsIns { /// /// Return a `CreatedEntry` (which contains the new file's attributes as well as a file handle /// -- see documentation on `open` for more info on that). - fn create(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32, flags: u32) -> ResultCreate { + fn create( + &self, + req: RequestInfo, + parent: &Path, + name: &OsStr, + mode: u32, + flags: u32, + ) -> ResultCreate { let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); if let Some(_unused) = self.vir_root.find(&path) { - return Err(libc::ENOSYS) + return Err(libc::ENOMSG); } let request = tonic::Request::new(Create { path: path.clone(), mode, ..Create::default() }); - let client = match self.rpcs.get() { - Ok(c) => c, - Err(_) => return Err(libc::ENOSYS), + let (rpc, resp) = { + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOMSG), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fcreate(request)); + (client.clone(), resp) }; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.fcreate(request)); - self.rpcs.put(client.clone()); + self.rpcs.put(rpc); let resp = match resp { Ok(resp) => resp.into_inner(), Err(e) => { println!("get resp err: {:?}", e); return Err(libc::ENOMSG); - }, + } }; if resp.ret != 0 { - return Err(libc::ENOSYS); + return Err(libc::ENOMSG); } let fi = resp.fi.unwrap(); if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) { @@ -1099,18 +1260,17 @@ impl FilesystemMT for LwsVfsIns { fh: fi.fh, flags, attr, - ttl + ttl, }); } - Err(libc::ENOSYS) + Err(libc::ENOMSG) } - } use std::{env, vec}; impl LwsVfsIns { pub async fn new(json: &str) -> Result> { let config = Config::new(json)?; - let addr = format!("http://{}:{}", config.get_addr(), config.get_port()); + let addr = format!("http://{}:{}", config.get_addr(), config.get_port()); let rpcs = RpcPool::new(10, addr).await?; let async_rt = tokio::runtime::Runtime::new().unwrap(); @@ -1125,33 +1285,33 @@ impl LwsVfsIns { vir_root, }) } - pub fn hello(&self) -> Result<(), Box> { + pub async fn hello(&self) -> Result<(), Box> { let request = tonic::Request::new(HelloRequest { name: "Ekko lws hello".into(), }); - let client = self.rpcs.get()?; - let mut rpc = client.write().unwrap(); - let resp = self.async_rt.block_on(rpc.client.say_hello(request)); - self.rpcs.put(client.clone()); - match resp { - Ok(resp) => { - println!("resp={:?}", resp); - Ok(()) - }, - Err(status) => { - Err(Box::new(IoError::new(ErrorKind::Other, format!("get rpc resp fail sta{:?}", status)))) - } - } + let (rpc, resp) = { + let client = self.rpcs.get()?; + let mut rpc = client.write().unwrap(); + let resp = rpc.client.say_hello(request).await?; + (client.clone(), resp) + }; + self.rpcs.put(rpc); + println!("resp={:?}", resp); + Ok(()) } - - pub fn mount(file_system:F) -> Result<(), Box> - where - F: FilesystemMT + Sync + Send + 'static + + pub fn mount(file_system: F) -> Result<(), Box> + where + F: FilesystemMT + Sync + Send + 'static, { let args: Vec = env::args_os().collect(); + println!("args is {:?}", args); let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=passthrufs")]; - fuse_mt::mount(fuse_mt::FuseMT::new(file_system, 1), &args[2], &fuse_args[..])?; + fuse_mt::mount( + fuse_mt::FuseMT::new(file_system, 1), + &args[2], + &fuse_args[..], + )?; Ok(()) } } -