diff --git a/Cargo.toml b/Cargo.toml index 23454f5..fa0b7c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [[bin]] # Bin to run the HelloWorld gRPC client -name = "lws-client" +name = "lws_vfs_client" path = "src/client.rs" [dependencies] diff --git a/config.json b/config.json new file mode 100755 index 0000000..869c88e --- /dev/null +++ b/config.json @@ -0,0 +1,9 @@ +{ + "mount": { + "c:\\": "/l3c", + "d:\\": "/l0d", + "f:\\": "/l0e" + }, + "port": 7412, + "addr": "192.168.0.110" +} \ No newline at end of file diff --git a/proto/lws.proto b/proto/lws.proto old mode 100644 new mode 100755 index 834b5b4..0181575 --- a/proto/lws.proto +++ b/proto/lws.proto @@ -49,7 +49,7 @@ message HelloRequest { string name = 1; } message HelloReply { string message = 1; } message file_info { - int32 flags = 1; + uint32 flags = 1; uint32 fh_old = 2; bool direct_io = 3; uint64 fh = 10; @@ -102,9 +102,14 @@ message access { int32 ret = 15; } +message direntry { + string name = 1; + uint32 kind = 2; +} + message readdir { string path = 1; - repeated string dirs = 2; + repeated direntry dirs = 2; uint32 offset = 3; file_info fi = 4; int32 ret = 15; @@ -120,7 +125,7 @@ message read { string path = 1; bytes buff = 2; int64 size = 3; - int64 offset = 4; + uint64 offset = 4; file_info fi = 5; int32 ret = 15; } @@ -128,8 +133,8 @@ message read { message write { string path = 1; bytes buff = 2; - int64 size = 3; - int64 offset = 4; + uint64 size = 3; + uint64 offset = 4; file_info fi = 5; int32 ret = 15; } @@ -160,6 +165,7 @@ message chown { message release { string path = 1; file_info fi = 2; + uint32 flush = 3; int32 ret = 15; } diff --git a/src/client.rs b/src/client.rs index 3a8f21a..b435479 100755 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ use lws_client::LwsVfsIns; #[tokio::main] async fn main() -> Result<(), Box> { - let mut lws_ins = match LwsVfsIns::new("config.json").await{ + let lws_ins = match LwsVfsIns::new("config.json").await{ Ok(ins) => ins, Err(e) => { println!("Error creating lws server instance: {:?}", e); diff --git a/src/lib.rs b/src/lib.rs index 6390ba4..5ec0fd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,13 @@ -use lws_client::{Getattr, HelloRequest, FileInfo}; -use std::borrow::BorrowMut; -use std::cell::RefCell; -use std::{io::Read as _}; -use std::fs::File; +use lws_client::{Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write}; +use tonic::transport::Channel as RpcChannel; +use std::io::{Read as IoRead, Error as IoError, ErrorKind}; use std::error::Error; +use std::fs::File; use std::collections::HashMap; use serde_json::{self, Value}; use lws_client::lws_vfs_client::LwsVfsClient; use std::ffi::{OsStr, OsString}; - pub mod lws_client { tonic::include_proto!("lws_vfs"); //导入lws vfs proto buffer } @@ -58,24 +56,205 @@ impl Config { pub fn get_port(&self) -> u16 { self.port } - pub fn get_addr(&self) -> &String { &self.addr } + + pub fn get_mount(&self) -> Vec<&String> { + let mut ret = vec![]; + for (_path, mount) in &self.mount_map { + ret.push(mount); + } + ret + } +} +use std::sync::{Arc,Mutex, RwLock, Condvar}; +struct RpcPool { + clients: Arc>>>>, + index: Arc>, + wait: Arc<(Mutex<()>,Condvar)>, } -struct vir_fs { - name: String, +struct Rpc { + pub client: LwsVfsClient, + pub in_use: bool, +} + +impl RpcPool { + async fn new(size: usize, uri: String) -> Result> { + let mut clients = Vec::with_capacity(size); + let uri = uri.to_ascii_lowercase(); + println!("create rpc pool {}", uri); + let channel = RpcChannel::from_shared(uri)?.connect().await?; + for _ in 0..size { + let client = LwsVfsClient::new(channel.clone()); + clients.push(Arc::new(RwLock::new(Rpc { + client, + in_use: false, + }))); + } + let wait =(Mutex::new(()), Condvar::new()); + Ok(Self { + clients: Arc::new(Mutex::new(clients)), + index: Arc::new(RwLock::new(0)), + wait:Arc::new(wait), + }) + } + + fn get(&self) -> Result>, Box> { + // 优化遍历的逻辑 + let find_idle = |rpc_pool:&Self| -> Result>, Box>{ + let cs = self.clients.clone(); + let clients = &(cs.lock().unwrap()); + let size = clients.len(); + let index = *rpc_pool.index.read().unwrap(); + for i in 0..size { + let idx = (index + i) % size; + let entry = clients[idx].read().unwrap(); + if !entry.in_use { + drop(entry); + let mut entry = clients[idx].write().unwrap(); + entry.in_use = true; + return Ok(clients[idx].clone()); + } + } + Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client fail"))) + }; + loop { + if let Ok(rpc) = find_idle(self) { + return Ok(rpc); + } + let (mutex, cond) = &(*self.wait.clone()); + let timeout = Duration::from_millis(100); + let (_unused, timeout_res) = cond.wait_timeout(mutex.lock().unwrap(), timeout).unwrap(); + if timeout_res.timed_out() { + println!("Timed out!"); + break; + } + } + Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client time out"))) + } + + fn put(&self, client: Arc>) { + let mut i = 0; + for entry in self.clients.lock().unwrap().iter_mut() { + if Arc::ptr_eq(&entry, &client) { + let mut item = entry.write().unwrap(); + item.in_use = false; + *self.index.write().unwrap() = i; + let (mutex, cond) = &(*self.wait.clone()); + let _unused = mutex.lock().unwrap(); + cond.notify_one(); + break; + } + i += 1; + } + } +} + +struct VirFs { + _name: String, + stat: lws_client::Fstat, + sub: HashMap, + is_root: bool, +} + +impl VirFs { + fn create (name:T, is_dir: bool) -> VirFs + where T: ToString + { + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64; + VirFs { + _name : name.to_string(), + stat : lws_client::Fstat { + fst_mode: 0o0755 | {if is_dir {libc::S_IFDIR} else {0}} as u64, + fst_ino: 0, + fst_dev: 0, + fst_rdev: 0, + fst_nlink: 0, + fst_uid: 0, + fst_gid: 0, + fst_size: 0, + fst_atime: now, + fst_mtime: now, + fst_ctime: now, + fst_blksize: 0, + fst_blocks: 0 + }, + sub: HashMap::new(), + is_root: true, + } + } + pub fn new() -> VirFs { + let mut node = VirFs::create("/", true); + node.is_root = true; + node + } + + pub fn add (&mut self, name:T) + where T: ToString + { + let mut node = VirFs::create(name.to_string(), true); + node.is_root = false; + self.sub.insert(name.to_string(), node); + } + + pub fn find (&self, name:T) -> Option<&VirFs> + where T: ToString + { + let name = name.to_string(); + let mut vname = name.split('/').filter(|x| x.len() > 0); + self.match_path(&mut vname) + } + + pub fn match_path <'a> (&self, name_list: &mut impl Iterator) -> Option<&VirFs> { + if let Some(current) = name_list.next() { + match self.sub.get(current){ + Some(v) => { + if let Some(sub) = v.match_path(name_list) { + Some(sub) + } else { + Some(v) + } + } + None => None, + } + } else { + None + } + } + + pub fn readdir(&self) -> Vec { + let mut ret = vec![]; + for name in self.sub.keys(){ + ret.push(name.to_string()); + } + ret + } } pub struct LwsVfsIns { pub config: Config, - rpc: RefCell>, + rpcs: RpcPool, async_rt: tokio::runtime::Runtime, + vir_root :VirFs +} + +fn mode_to_filetype(mode: libc::mode_t) -> FileType { + match mode & libc::S_IFMT { + libc::S_IFDIR => FileType::Directory, + libc::S_IFREG => FileType::RegularFile, + libc::S_IFLNK => FileType::Symlink, + libc::S_IFBLK => FileType::BlockDevice, + libc::S_IFCHR => FileType::CharDevice, + libc::S_IFIFO => FileType::NamedPipe, + libc::S_IFSOCK => FileType::Socket, + _ => { panic!("unknown file type"); } + } } use fuse_mt::*; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::time::{Duration, SystemTime}; impl FilesystemMT for LwsVfsIns { /// Called on mount, before any other function. @@ -85,6 +264,7 @@ impl FilesystemMT for LwsVfsIns { Ok(()) }, Err(e) =>{ + println!("hello err: {:?}", e); Err(libc::ENOSYS) } } @@ -98,16 +278,68 @@ impl FilesystemMT for LwsVfsIns { /// Get the attributes of a filesystem entry. /// /// * `fh`: a file handle if this is called on an open file. - fn getattr(&self, _req: RequestInfo, path: &Path, _fh: Option) -> ResultEntry { + fn getattr(&self, _req: RequestInfo, path: &Path, fh: Option) -> ResultEntry { + let time = |timens| { + let secs = timens / (1000 * 1000); + let nanos = timens % (1000 * 1000); + SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32) + }; + let file_attr = |fst:&Fstat|{ + FileAttr { + size: fst.fst_size, + blocks: fst.fst_blocks, + atime: time(fst.fst_atime), + mtime: time(fst.fst_mtime), + ctime: time(fst.fst_ctime), + crtime: SystemTime::UNIX_EPOCH, + kind: mode_to_filetype(fst.fst_mode as u32), + perm: fst.fst_mode as u16, + nlink: fst.fst_nlink as u32, + uid: fst.fst_uid as u32, + gid: fst.fst_gid as u32, + rdev: fst.fst_rdev as u32, + flags: 0, + } + }; + let path = path.to_str().unwrap(); + if let Some(node) = self.vir_root.find(path) { + return Ok(( + Duration::new(0,0), + file_attr(&node.stat), + )) + } let request = tonic::Request::new(Getattr { - path : path.to_str().unwrap().into(), - fi : FileInfo { - - }, + path : path.into(), + fi : Some(FileInfo { + fh : match fh { + Some(f) => f, + None => 0, + }, + ..FileInfo::default() + }), ..Getattr::default() }); - let response = self.async_rt.block_on(self.rpc.borrow_mut().fgetattr(request)); - Err(libc::ENOSYS) + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fgetattr(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG) + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(( + Duration::new(0,0), + file_attr(&resp.stat.unwrap()), + )) } // The following operations in the FUSE C API are all one kernel call: setattr @@ -134,8 +366,30 @@ impl FilesystemMT for LwsVfsIns { /// /// * `fh`: a file handle if this is called on an open file. /// * `size`: size in bytes to set as the file's length. - fn truncate(&self, _req: RequestInfo, _path: &Path, _fh: Option, _size: u64) -> ResultEmpty { - Err(libc::ENOSYS) + fn truncate(&self, _req: RequestInfo, path: &Path, _fh: Option, size: u64) -> ResultEmpty { + let request = tonic::Request::new(Truncate { + path : path.to_str().unwrap().into(), + size: size as i64, + ..Truncate::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.ftruncate(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG) + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Set timestamps of a filesystem entry. @@ -143,14 +397,51 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: a file handle if this is called on an open file. /// * `atime`: the time of last access. /// * `mtime`: the time of last modification. - fn utimens(&self, _req: RequestInfo, _path: &Path, _fh: Option, _atime: Option, _mtime: Option) -> ResultEmpty { - Err(libc::ENOSYS) - } + fn utimens(&self, _req: RequestInfo, path: &Path, _fh: Option, atime: Option, mtime: Option) -> ResultEmpty { - /// Set timestamps of a filesystem entry (with extra options only used on MacOS). - #[allow(clippy::too_many_arguments)] - fn utimens_macos(&self, _req: RequestInfo, _path: &Path, _fh: Option, _crtime: Option, _chgtime: Option, _bkuptime: Option, _flags: Option) -> ResultEmpty { - Err(libc::ENOSYS) + let time = move |time| -> Timespec{ + let time = match time { + Some(time) => time, + None => SystemTime::now(), + }; + match time.duration_since(SystemTime::UNIX_EPOCH){ + Ok(dur) => { + Timespec{ + tv_sec:dur.as_secs() as i32, + tv_nsec: dur.subsec_nanos() as i64, + } + }, + Err(_e) => { + Timespec::default() + } + } + }; + let at = time(atime); + let mt = time(mtime); + + let request = tonic::Request::new(Utimens { + path : path.to_str().unwrap().into(), + ts: vec![at, mt], + ..Utimens::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.futimens(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG) + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } // END OF SETATTR FUNCTIONS @@ -175,24 +466,97 @@ impl FilesystemMT for LwsVfsIns { /// * `parent`: path to the directory to make the directory under. /// * `name`: name of the directory. /// * `mode`: permissions for the new directory. - fn mkdir(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _mode: u32) -> ResultEntry { - Err(libc::ENOSYS) + fn mkdir(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32) -> ResultEntry { + let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); + if let Some(_unused) = self.vir_root.find(&path) { + return Err(libc::ENOSYS) + } + let request = tonic::Request::new(Mkdir { + path: path.to_string(), + mode, + ..Mkdir::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fmkdir(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + self.getattr(req, Path::new(&path), None) } /// Remove a file. /// /// * `parent`: path to the directory containing the file to delete. /// * `name`: name of the file to delete. - fn unlink(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr) -> ResultEmpty { - Err(libc::ENOSYS) + fn unlink(&self, _req: RequestInfo, parent: &Path, name: &OsStr) -> ResultEmpty { + let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); + let request = tonic::Request::new(Unlink { + path: path.to_string(), + ..Unlink::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.funlink(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Remove a directory. /// /// * `parent`: path to the directory containing the directory to delete. /// * `name`: name of the directory to delete. - fn rmdir(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr) -> ResultEmpty { - Err(libc::ENOSYS) + fn rmdir(&self, _req: RequestInfo, parent: &Path, name: &OsStr) -> ResultEmpty { + let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); + if let Some(_unused) = self.vir_root.find(&path) { + return Err(libc::ENOSYS) + } + let request = tonic::Request::new(Rmdir { + path: path.to_string(), + ..Rmdir::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frmdir(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Create a symbolic link. @@ -210,8 +574,33 @@ impl FilesystemMT for LwsVfsIns { /// * `name`: name of the existing entry. /// * `newparent`: path to the directory it should be renamed into (may be the same as `parent`). /// * `newname`: name of the new entry. - fn rename(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _newparent: &Path, _newname: &OsStr) -> ResultEmpty { - Err(libc::ENOSYS) + fn rename(&self, _req: RequestInfo, parent: &Path, name: &OsStr, _newparent: &Path, _newname: &OsStr) -> ResultEmpty { + let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); + if let Some(_unused) = self.vir_root.find(&path) { + return Err(libc::ENOSYS) + } + let request = tonic::Request::new(Rename { + path: path.to_string(), + ..Rename::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frename(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Create a hard link. @@ -231,8 +620,34 @@ impl FilesystemMT for LwsVfsIns { /// Return a tuple of (file handle, flags). The file handle will be passed to any subsequent /// calls that operate on the file, and can be any value you choose, though it should allow /// your filesystem to identify the file opened even without any path info. - fn open(&self, _req: RequestInfo, _path: &Path, _flags: u32) -> ResultOpen { - Err(libc::ENOSYS) + fn open(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen { + let request = tonic::Request::new(Open { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + flags, + ..FileInfo::default() + }), + ..Open::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fopen(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + let fi = resp.fi.unwrap(); + Ok((fi.fh, fi.flags)) } /// Read from a file. @@ -249,8 +664,34 @@ impl FilesystemMT for LwsVfsIns { /// the result data as a slice, or an error code. /// /// Return the return value from the `callback` function. - fn read(&self, _req: RequestInfo, _path: &Path, _fh: u64, _offset: u64, _size: u32, callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult) -> CallbackResult { - callback(Err(libc::ENOSYS)) + fn read(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, _size: u32, callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult) -> CallbackResult { + let request = tonic::Request::new(Read { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + offset, + ..Read::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return callback(Err(libc::ENOSYS)), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fread(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return callback(Err(libc::ENOMSG)); + }, + }; + if resp.ret != 0 { + return callback(Err(libc::ENOSYS)); + } + callback(Ok(&resp.buff)) } /// Write to a file. @@ -262,8 +703,36 @@ impl FilesystemMT for LwsVfsIns { /// * `flags`: /// /// Return the number of bytes written. - fn write(&self, _req: RequestInfo, _path: &Path, _fh: u64, _offset: u64, _data: Vec, _flags: u32) -> ResultWrite { - Err(libc::ENOSYS) + fn write(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, data: Vec, _flags: u32) -> ResultWrite { + let request = tonic::Request::new(Write { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + offset, + size: data.len() as u64, + buff: data, + ..Write::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fwrite(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(resp.buff.len() as u32) } /// Called each time a program calls `close` on an open file. @@ -277,8 +746,33 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: file handle returned from the `open` call. /// * `lock_owner`: if the filesystem supports locking (`setlk`, `getlk`), remove all locks /// belonging to this lock owner. - fn flush(&self, _req: RequestInfo, _path: &Path, _fh: u64, _lock_owner: u64) -> ResultEmpty { - Err(libc::ENOSYS) + fn flush(&self, _req: RequestInfo, path: &Path, fh: u64, _lock_owner: u64) -> ResultEmpty { + let request = tonic::Request::new(Flush { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + ..Flush::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fflush(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Called when an open file is closed. @@ -292,8 +786,34 @@ impl FilesystemMT for LwsVfsIns { /// * `lock_owner`: if the filesystem supports locking (`setlk`, `getlk`), remove all locks /// belonging to this lock owner. /// * `flush`: whether pending data must be flushed or not. - fn release(&self, _req: RequestInfo, _path: &Path, _fh: u64, _flags: u32, _lock_owner: u64, _flush: bool) -> ResultEmpty { - Err(libc::ENOSYS) + fn release(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32, _lock_owner: u64, flush: bool) -> ResultEmpty { + let request = tonic::Request::new(Release { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + flush:flush as u32, + ..Release::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.frelease(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Write out any pending changes of a file. @@ -317,8 +837,29 @@ impl FilesystemMT for LwsVfsIns { /// Return a tuple of (file handle, flags). The file handle will be passed to any subsequent /// calls that operate on the directory, and can be any value you choose, though it should /// allow your filesystem to identify the directory opened even without any path info. - fn opendir(&self, _req: RequestInfo, _path: &Path, _flags: u32) -> ResultOpen { - Err(libc::ENOSYS) + fn opendir(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen { + let request = tonic::Request::new(Opendir { + path: path.to_str().unwrap().to_string(), + ..Opendir::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fopendir(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok((resp.fi.unwrap().fh, flags)) } /// Get the entries of a directory. @@ -327,8 +868,64 @@ impl FilesystemMT for LwsVfsIns { /// * `fh`: file handle returned from the `opendir` call. /// /// Return all the entries of the directory. - fn readdir(&self, _req: RequestInfo, _path: &Path, _fh: u64) -> ResultReaddir { - Err(libc::ENOSYS) + fn readdir(&self, _req: RequestInfo, path: &Path, fh: u64) -> ResultReaddir { + fn filetype(kind: u32) -> FileType { + match kind { + 0 => FileType::RegularFile, + 1 => FileType::Directory, + 2 => FileType::Symlink, + 3 => FileType::CharDevice, + 4 => FileType::BlockDevice, + 5 => FileType::NamedPipe, + 6 => FileType::Socket, + _ => FileType::RegularFile, + } + } + let path = path.to_str().unwrap(); + if let Some(node) = self.vir_root.find(&path) { + let mut dirs = vec![]; + let resp = node.readdir(); + for name in resp { + dirs.push(DirectoryEntry{ + name: name.into(), + kind: FileType::Directory, + }); + } + return Ok(dirs); + } + let request = tonic::Request::new(Readdir { + path: path.to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + ..Readdir::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.freaddir(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + let mut dirs = vec![]; + for dir in resp.dirs { + dirs.push(DirectoryEntry{ + name: dir.name.into(), + kind: filetype(dir.kind), + }); + } + Ok(dirs) } /// Close an open directory. @@ -338,8 +935,33 @@ impl FilesystemMT for LwsVfsIns { /// * `path`: path to the directory. /// * `fh`: file handle returned from the `opendir` call. /// * `flags`: the file access flags passed to the `opendir` call. - fn releasedir(&self, _req: RequestInfo, _path: &Path, _fh: u64, _flags: u32) -> ResultEmpty { - Err(libc::ENOSYS) + fn releasedir(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32) -> ResultEmpty { + let request = tonic::Request::new(Releasedir { + path: path.to_str().unwrap().to_string(), + fi:Some(FileInfo{ + fh, + ..FileInfo::default() + }), + ..Releasedir::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.freleasedir(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Write out any pending changes to a directory. @@ -409,8 +1031,30 @@ impl FilesystemMT for LwsVfsIns { /// /// Return `Ok(())` if all requested permissions are allowed, otherwise return `Err(EACCES)` /// or other error code as appropriate (e.g. `ENOENT` if the file doesn't exist). - fn access(&self, _req: RequestInfo, _path: &Path, _mask: u32) -> ResultEmpty { - Err(libc::ENOSYS) + fn access(&self, _req: RequestInfo, path: &Path, mask: u32) -> ResultEmpty { + let request = tonic::Request::new(Access { + path: path.to_str().unwrap().to_string(), + mask, + ..Access::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.faccess(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + Ok(()) } /// Create and open a new file. @@ -422,32 +1066,82 @@ impl FilesystemMT for LwsVfsIns { /// /// Return a `CreatedEntry` (which contains the new file's attributes as well as a file handle /// -- see documentation on `open` for more info on that). - fn create(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _mode: u32, _flags: u32) -> ResultCreate { + fn create(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32, flags: u32) -> ResultCreate { + let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap()); + if let Some(_unused) = self.vir_root.find(&path) { + return Err(libc::ENOSYS) + } + let request = tonic::Request::new(Create { + path: path.clone(), + mode, + ..Create::default() + }); + let client = match self.rpcs.get() { + Ok(c) => c, + Err(_) => return Err(libc::ENOSYS), + }; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.fcreate(request)); + self.rpcs.put(client.clone()); + let resp = match resp { + Ok(resp) => resp.into_inner(), + Err(e) => { + println!("get resp err: {:?}", e); + return Err(libc::ENOMSG); + }, + }; + if resp.ret != 0 { + return Err(libc::ENOSYS); + } + let fi = resp.fi.unwrap(); + if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) { + return Ok(CreatedEntry { + fh: fi.fh, + flags, + attr, + ttl + }); + } Err(libc::ENOSYS) } } -use std::env; +use std::{env, vec}; impl LwsVfsIns { pub async fn new(json: &str) -> Result> { let config = Config::new(json)?; let addr = format!("http://{}:{}", config.get_addr(), config.get_port()); - //connect to server - let rpc = RefCell::new(LwsVfsClient::connect(addr).await?); + let rpcs = RpcPool::new(10, addr).await?; let async_rt = tokio::runtime::Runtime::new().unwrap(); + + let mut vir_root = VirFs::new(); + for mount in config.get_mount() { + vir_root.add(mount); + } Ok(LwsVfsIns { config, - rpc, + rpcs, async_rt, + vir_root, }) } pub fn hello(&self) -> Result<(), Box> { let request = tonic::Request::new(HelloRequest { name: "Ekko lws hello".into(), }); - let response = self.async_rt.block_on(self.rpc.borrow_mut().say_hello(request))?; - println!("RESPONSE={:?}", response); - Ok(()) + let client = self.rpcs.get()?; + let mut rpc = client.write().unwrap(); + let resp = self.async_rt.block_on(rpc.client.say_hello(request)); + self.rpcs.put(client.clone()); + match resp { + Ok(resp) => { + println!("resp={:?}", resp); + Ok(()) + }, + Err(status) => { + Err(Box::new(IoError::new(ErrorKind::Other, format!("get rpc resp fail sta{:?}", status)))) + } + } } pub fn mount(file_system:F) -> Result<(), Box>