初版完成等待和服务器调试

1. 完成虚拟root节点的使用
2. 完成基础接口的实现和转发

等待完善自动获取登录IP的功能,config暂时需要配置端口
This commit is contained in:
Ekko.bao 2024-07-23 08:51:57 +08:00
parent 2f34dd807d
commit 022a31a489
5 changed files with 777 additions and 68 deletions

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "lws-client"
name = "lws_vfs_client"
path = "src/client.rs"
[dependencies]

9
config.json Executable file
View File

@ -0,0 +1,9 @@
{
"mount": {
"c:\\": "/l3c",
"d:\\": "/l0d",
"f:\\": "/l0e"
},
"port": 7412,
"addr": "192.168.0.110"
}

16
proto/lws.proto Normal file → Executable file
View File

@ -49,7 +49,7 @@ message HelloRequest { string name = 1; }
message HelloReply { string message = 1; }
message file_info {
int32 flags = 1;
uint32 flags = 1;
uint32 fh_old = 2;
bool direct_io = 3;
uint64 fh = 10;
@ -102,9 +102,14 @@ message access {
int32 ret = 15;
}
message direntry {
string name = 1;
uint32 kind = 2;
}
message readdir {
string path = 1;
repeated string dirs = 2;
repeated direntry dirs = 2;
uint32 offset = 3;
file_info fi = 4;
int32 ret = 15;
@ -120,7 +125,7 @@ message read {
string path = 1;
bytes buff = 2;
int64 size = 3;
int64 offset = 4;
uint64 offset = 4;
file_info fi = 5;
int32 ret = 15;
}
@ -128,8 +133,8 @@ message read {
message write {
string path = 1;
bytes buff = 2;
int64 size = 3;
int64 offset = 4;
uint64 size = 3;
uint64 offset = 4;
file_info fi = 5;
int32 ret = 15;
}
@ -160,6 +165,7 @@ message chown {
message release {
string path = 1;
file_info fi = 2;
uint32 flush = 3;
int32 ret = 15;
}

View File

@ -1,7 +1,7 @@
use lws_client::LwsVfsIns;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut lws_ins = match LwsVfsIns::new("config.json").await{
let lws_ins = match LwsVfsIns::new("config.json").await{
Ok(ins) => ins,
Err(e) => {
println!("Error creating lws server instance: {:?}", e);

View File

@ -1,15 +1,13 @@
use lws_client::{Getattr, HelloRequest, FileInfo};
use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::{io::Read as _};
use std::fs::File;
use lws_client::{Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read, Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write};
use tonic::transport::Channel as RpcChannel;
use std::io::{Read as IoRead, Error as IoError, ErrorKind};
use std::error::Error;
use std::fs::File;
use std::collections::HashMap;
use serde_json::{self, Value};
use lws_client::lws_vfs_client::LwsVfsClient;
use std::ffi::{OsStr, OsString};
pub mod lws_client {
tonic::include_proto!("lws_vfs"); //导入lws vfs proto buffer
}
@ -58,24 +56,205 @@ impl Config {
pub fn get_port(&self) -> u16 {
self.port
}
pub fn get_addr(&self) -> &String {
&self.addr
}
pub fn get_mount(&self) -> Vec<&String> {
let mut ret = vec![];
for (_path, mount) in &self.mount_map {
ret.push(mount);
}
ret
}
}
use std::sync::{Arc,Mutex, RwLock, Condvar};
struct RpcPool {
clients: Arc<Mutex<Vec<Arc<RwLock<Rpc>>>>>,
index: Arc<RwLock<usize>>,
wait: Arc<(Mutex<()>,Condvar)>,
}
struct vir_fs {
name: String,
struct Rpc {
pub client: LwsVfsClient<RpcChannel>,
pub in_use: bool,
}
impl RpcPool {
async fn new(size: usize, uri: String) -> Result<Self, Box<dyn std::error::Error>> {
let mut clients = Vec::with_capacity(size);
let uri = uri.to_ascii_lowercase();
println!("create rpc pool {}", uri);
let channel = RpcChannel::from_shared(uri)?.connect().await?;
for _ in 0..size {
let client = LwsVfsClient::new(channel.clone());
clients.push(Arc::new(RwLock::new(Rpc {
client,
in_use: false,
})));
}
let wait =(Mutex::new(()), Condvar::new());
Ok(Self {
clients: Arc::new(Mutex::new(clients)),
index: Arc::new(RwLock::new(0)),
wait:Arc::new(wait),
})
}
fn get(&self) -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>> {
// 优化遍历的逻辑
let find_idle = |rpc_pool:&Self| -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>>{
let cs = self.clients.clone();
let clients = &(cs.lock().unwrap());
let size = clients.len();
let index = *rpc_pool.index.read().unwrap();
for i in 0..size {
let idx = (index + i) % size;
let entry = clients[idx].read().unwrap();
if !entry.in_use {
drop(entry);
let mut entry = clients[idx].write().unwrap();
entry.in_use = true;
return Ok(clients[idx].clone());
}
}
Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client fail")))
};
loop {
if let Ok(rpc) = find_idle(self) {
return Ok(rpc);
}
let (mutex, cond) = &(*self.wait.clone());
let timeout = Duration::from_millis(100);
let (_unused, timeout_res) = cond.wait_timeout(mutex.lock().unwrap(), timeout).unwrap();
if timeout_res.timed_out() {
println!("Timed out!");
break;
}
}
Err(Box::new(IoError::new(ErrorKind::Other, "get free rpc client time out")))
}
fn put(&self, client: Arc<RwLock<Rpc>>) {
let mut i = 0;
for entry in self.clients.lock().unwrap().iter_mut() {
if Arc::ptr_eq(&entry, &client) {
let mut item = entry.write().unwrap();
item.in_use = false;
*self.index.write().unwrap() = i;
let (mutex, cond) = &(*self.wait.clone());
let _unused = mutex.lock().unwrap();
cond.notify_one();
break;
}
i += 1;
}
}
}
struct VirFs {
_name: String,
stat: lws_client::Fstat,
sub: HashMap<String, VirFs>,
is_root: bool,
}
impl VirFs {
fn create <T> (name:T, is_dir: bool) -> VirFs
where T: ToString
{
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64;
VirFs {
_name : name.to_string(),
stat : lws_client::Fstat {
fst_mode: 0o0755 | {if is_dir {libc::S_IFDIR} else {0}} as u64,
fst_ino: 0,
fst_dev: 0,
fst_rdev: 0,
fst_nlink: 0,
fst_uid: 0,
fst_gid: 0,
fst_size: 0,
fst_atime: now,
fst_mtime: now,
fst_ctime: now,
fst_blksize: 0,
fst_blocks: 0
},
sub: HashMap::new(),
is_root: true,
}
}
pub fn new() -> VirFs {
let mut node = VirFs::create("/", true);
node.is_root = true;
node
}
pub fn add <T> (&mut self, name:T)
where T: ToString
{
let mut node = VirFs::create(name.to_string(), true);
node.is_root = false;
self.sub.insert(name.to_string(), node);
}
pub fn find <T> (&self, name:T) -> Option<&VirFs>
where T: ToString
{
let name = name.to_string();
let mut vname = name.split('/').filter(|x| x.len() > 0);
self.match_path(&mut vname)
}
pub fn match_path <'a> (&self, name_list: &mut impl Iterator<Item = &'a str>) -> Option<&VirFs> {
if let Some(current) = name_list.next() {
match self.sub.get(current){
Some(v) => {
if let Some(sub) = v.match_path(name_list) {
Some(sub)
} else {
Some(v)
}
}
None => None,
}
} else {
None
}
}
pub fn readdir(&self) -> Vec<String> {
let mut ret = vec![];
for name in self.sub.keys(){
ret.push(name.to_string());
}
ret
}
}
pub struct LwsVfsIns {
pub config: Config,
rpc: RefCell<LwsVfsClient<tonic::transport::Channel>>,
rpcs: RpcPool,
async_rt: tokio::runtime::Runtime,
vir_root :VirFs
}
fn mode_to_filetype(mode: libc::mode_t) -> FileType {
match mode & libc::S_IFMT {
libc::S_IFDIR => FileType::Directory,
libc::S_IFREG => FileType::RegularFile,
libc::S_IFLNK => FileType::Symlink,
libc::S_IFBLK => FileType::BlockDevice,
libc::S_IFCHR => FileType::CharDevice,
libc::S_IFIFO => FileType::NamedPipe,
libc::S_IFSOCK => FileType::Socket,
_ => { panic!("unknown file type"); }
}
}
use fuse_mt::*;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::time::{Duration, SystemTime};
impl FilesystemMT for LwsVfsIns {
/// Called on mount, before any other function.
@ -85,6 +264,7 @@ impl FilesystemMT for LwsVfsIns {
Ok(())
},
Err(e) =>{
println!("hello err: {:?}", e);
Err(libc::ENOSYS)
}
}
@ -98,16 +278,68 @@ impl FilesystemMT for LwsVfsIns {
/// Get the attributes of a filesystem entry.
///
/// * `fh`: a file handle if this is called on an open file.
fn getattr(&self, _req: RequestInfo, path: &Path, _fh: Option<u64>) -> ResultEntry {
fn getattr(&self, _req: RequestInfo, path: &Path, fh: Option<u64>) -> ResultEntry {
let time = |timens| {
let secs = timens / (1000 * 1000);
let nanos = timens % (1000 * 1000);
SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32)
};
let file_attr = |fst:&Fstat|{
FileAttr {
size: fst.fst_size,
blocks: fst.fst_blocks,
atime: time(fst.fst_atime),
mtime: time(fst.fst_mtime),
ctime: time(fst.fst_ctime),
crtime: SystemTime::UNIX_EPOCH,
kind: mode_to_filetype(fst.fst_mode as u32),
perm: fst.fst_mode as u16,
nlink: fst.fst_nlink as u32,
uid: fst.fst_uid as u32,
gid: fst.fst_gid as u32,
rdev: fst.fst_rdev as u32,
flags: 0,
}
};
let path = path.to_str().unwrap();
if let Some(node) = self.vir_root.find(path) {
return Ok((
Duration::new(0,0),
file_attr(&node.stat),
))
}
let request = tonic::Request::new(Getattr {
path : path.to_str().unwrap().into(),
fi : FileInfo {
},
path : path.into(),
fi : Some(FileInfo {
fh : match fh {
Some(f) => f,
None => 0,
},
..FileInfo::default()
}),
..Getattr::default()
});
let response = self.async_rt.block_on(self.rpc.borrow_mut().fgetattr(request));
Err(libc::ENOSYS)
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fgetattr(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG)
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok((
Duration::new(0,0),
file_attr(&resp.stat.unwrap()),
))
}
// The following operations in the FUSE C API are all one kernel call: setattr
@ -134,8 +366,30 @@ impl FilesystemMT for LwsVfsIns {
///
/// * `fh`: a file handle if this is called on an open file.
/// * `size`: size in bytes to set as the file's length.
fn truncate(&self, _req: RequestInfo, _path: &Path, _fh: Option<u64>, _size: u64) -> ResultEmpty {
Err(libc::ENOSYS)
fn truncate(&self, _req: RequestInfo, path: &Path, _fh: Option<u64>, size: u64) -> ResultEmpty {
let request = tonic::Request::new(Truncate {
path : path.to_str().unwrap().into(),
size: size as i64,
..Truncate::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.ftruncate(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG)
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Set timestamps of a filesystem entry.
@ -143,14 +397,51 @@ impl FilesystemMT for LwsVfsIns {
/// * `fh`: a file handle if this is called on an open file.
/// * `atime`: the time of last access.
/// * `mtime`: the time of last modification.
fn utimens(&self, _req: RequestInfo, _path: &Path, _fh: Option<u64>, _atime: Option<SystemTime>, _mtime: Option<SystemTime>) -> ResultEmpty {
Err(libc::ENOSYS)
}
fn utimens(&self, _req: RequestInfo, path: &Path, _fh: Option<u64>, atime: Option<SystemTime>, mtime: Option<SystemTime>) -> ResultEmpty {
/// Set timestamps of a filesystem entry (with extra options only used on MacOS).
#[allow(clippy::too_many_arguments)]
fn utimens_macos(&self, _req: RequestInfo, _path: &Path, _fh: Option<u64>, _crtime: Option<SystemTime>, _chgtime: Option<SystemTime>, _bkuptime: Option<SystemTime>, _flags: Option<u32>) -> ResultEmpty {
Err(libc::ENOSYS)
let time = move |time| -> Timespec{
let time = match time {
Some(time) => time,
None => SystemTime::now(),
};
match time.duration_since(SystemTime::UNIX_EPOCH){
Ok(dur) => {
Timespec{
tv_sec:dur.as_secs() as i32,
tv_nsec: dur.subsec_nanos() as i64,
}
},
Err(_e) => {
Timespec::default()
}
}
};
let at = time(atime);
let mt = time(mtime);
let request = tonic::Request::new(Utimens {
path : path.to_str().unwrap().into(),
ts: vec![at, mt],
..Utimens::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.futimens(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG)
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
// END OF SETATTR FUNCTIONS
@ -175,24 +466,97 @@ impl FilesystemMT for LwsVfsIns {
/// * `parent`: path to the directory to make the directory under.
/// * `name`: name of the directory.
/// * `mode`: permissions for the new directory.
fn mkdir(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _mode: u32) -> ResultEntry {
Err(libc::ENOSYS)
fn mkdir(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32) -> ResultEntry {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
if let Some(_unused) = self.vir_root.find(&path) {
return Err(libc::ENOSYS)
}
let request = tonic::Request::new(Mkdir {
path: path.to_string(),
mode,
..Mkdir::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fmkdir(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
self.getattr(req, Path::new(&path), None)
}
/// Remove a file.
///
/// * `parent`: path to the directory containing the file to delete.
/// * `name`: name of the file to delete.
fn unlink(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr) -> ResultEmpty {
Err(libc::ENOSYS)
fn unlink(&self, _req: RequestInfo, parent: &Path, name: &OsStr) -> ResultEmpty {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
let request = tonic::Request::new(Unlink {
path: path.to_string(),
..Unlink::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.funlink(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Remove a directory.
///
/// * `parent`: path to the directory containing the directory to delete.
/// * `name`: name of the directory to delete.
fn rmdir(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr) -> ResultEmpty {
Err(libc::ENOSYS)
fn rmdir(&self, _req: RequestInfo, parent: &Path, name: &OsStr) -> ResultEmpty {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
if let Some(_unused) = self.vir_root.find(&path) {
return Err(libc::ENOSYS)
}
let request = tonic::Request::new(Rmdir {
path: path.to_string(),
..Rmdir::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frmdir(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Create a symbolic link.
@ -210,8 +574,33 @@ impl FilesystemMT for LwsVfsIns {
/// * `name`: name of the existing entry.
/// * `newparent`: path to the directory it should be renamed into (may be the same as `parent`).
/// * `newname`: name of the new entry.
fn rename(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _newparent: &Path, _newname: &OsStr) -> ResultEmpty {
Err(libc::ENOSYS)
fn rename(&self, _req: RequestInfo, parent: &Path, name: &OsStr, _newparent: &Path, _newname: &OsStr) -> ResultEmpty {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
if let Some(_unused) = self.vir_root.find(&path) {
return Err(libc::ENOSYS)
}
let request = tonic::Request::new(Rename {
path: path.to_string(),
..Rename::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frename(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Create a hard link.
@ -231,8 +620,34 @@ impl FilesystemMT for LwsVfsIns {
/// Return a tuple of (file handle, flags). The file handle will be passed to any subsequent
/// calls that operate on the file, and can be any value you choose, though it should allow
/// your filesystem to identify the file opened even without any path info.
fn open(&self, _req: RequestInfo, _path: &Path, _flags: u32) -> ResultOpen {
Err(libc::ENOSYS)
fn open(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen {
let request = tonic::Request::new(Open {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
flags,
..FileInfo::default()
}),
..Open::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fopen(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
let fi = resp.fi.unwrap();
Ok((fi.fh, fi.flags))
}
/// Read from a file.
@ -249,8 +664,34 @@ impl FilesystemMT for LwsVfsIns {
/// the result data as a slice, or an error code.
///
/// Return the return value from the `callback` function.
fn read(&self, _req: RequestInfo, _path: &Path, _fh: u64, _offset: u64, _size: u32, callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult) -> CallbackResult {
callback(Err(libc::ENOSYS))
fn read(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, _size: u32, callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult) -> CallbackResult {
let request = tonic::Request::new(Read {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
offset,
..Read::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return callback(Err(libc::ENOSYS)),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fread(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return callback(Err(libc::ENOMSG));
},
};
if resp.ret != 0 {
return callback(Err(libc::ENOSYS));
}
callback(Ok(&resp.buff))
}
/// Write to a file.
@ -262,8 +703,36 @@ impl FilesystemMT for LwsVfsIns {
/// * `flags`:
///
/// Return the number of bytes written.
fn write(&self, _req: RequestInfo, _path: &Path, _fh: u64, _offset: u64, _data: Vec<u8>, _flags: u32) -> ResultWrite {
Err(libc::ENOSYS)
fn write(&self, _req: RequestInfo, path: &Path, fh: u64, offset: u64, data: Vec<u8>, _flags: u32) -> ResultWrite {
let request = tonic::Request::new(Write {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
offset,
size: data.len() as u64,
buff: data,
..Write::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fwrite(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(resp.buff.len() as u32)
}
/// Called each time a program calls `close` on an open file.
@ -277,8 +746,33 @@ impl FilesystemMT for LwsVfsIns {
/// * `fh`: file handle returned from the `open` call.
/// * `lock_owner`: if the filesystem supports locking (`setlk`, `getlk`), remove all locks
/// belonging to this lock owner.
fn flush(&self, _req: RequestInfo, _path: &Path, _fh: u64, _lock_owner: u64) -> ResultEmpty {
Err(libc::ENOSYS)
fn flush(&self, _req: RequestInfo, path: &Path, fh: u64, _lock_owner: u64) -> ResultEmpty {
let request = tonic::Request::new(Flush {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
..Flush::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fflush(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Called when an open file is closed.
@ -292,8 +786,34 @@ impl FilesystemMT for LwsVfsIns {
/// * `lock_owner`: if the filesystem supports locking (`setlk`, `getlk`), remove all locks
/// belonging to this lock owner.
/// * `flush`: whether pending data must be flushed or not.
fn release(&self, _req: RequestInfo, _path: &Path, _fh: u64, _flags: u32, _lock_owner: u64, _flush: bool) -> ResultEmpty {
Err(libc::ENOSYS)
fn release(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32, _lock_owner: u64, flush: bool) -> ResultEmpty {
let request = tonic::Request::new(Release {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
flush:flush as u32,
..Release::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frelease(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Write out any pending changes of a file.
@ -317,8 +837,29 @@ impl FilesystemMT for LwsVfsIns {
/// Return a tuple of (file handle, flags). The file handle will be passed to any subsequent
/// calls that operate on the directory, and can be any value you choose, though it should
/// allow your filesystem to identify the directory opened even without any path info.
fn opendir(&self, _req: RequestInfo, _path: &Path, _flags: u32) -> ResultOpen {
Err(libc::ENOSYS)
fn opendir(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen {
let request = tonic::Request::new(Opendir {
path: path.to_str().unwrap().to_string(),
..Opendir::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fopendir(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok((resp.fi.unwrap().fh, flags))
}
/// Get the entries of a directory.
@ -327,8 +868,64 @@ impl FilesystemMT for LwsVfsIns {
/// * `fh`: file handle returned from the `opendir` call.
///
/// Return all the entries of the directory.
fn readdir(&self, _req: RequestInfo, _path: &Path, _fh: u64) -> ResultReaddir {
Err(libc::ENOSYS)
fn readdir(&self, _req: RequestInfo, path: &Path, fh: u64) -> ResultReaddir {
fn filetype(kind: u32) -> FileType {
match kind {
0 => FileType::RegularFile,
1 => FileType::Directory,
2 => FileType::Symlink,
3 => FileType::CharDevice,
4 => FileType::BlockDevice,
5 => FileType::NamedPipe,
6 => FileType::Socket,
_ => FileType::RegularFile,
}
}
let path = path.to_str().unwrap();
if let Some(node) = self.vir_root.find(&path) {
let mut dirs = vec![];
let resp = node.readdir();
for name in resp {
dirs.push(DirectoryEntry{
name: name.into(),
kind: FileType::Directory,
});
}
return Ok(dirs);
}
let request = tonic::Request::new(Readdir {
path: path.to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
..Readdir::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.freaddir(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
let mut dirs = vec![];
for dir in resp.dirs {
dirs.push(DirectoryEntry{
name: dir.name.into(),
kind: filetype(dir.kind),
});
}
Ok(dirs)
}
/// Close an open directory.
@ -338,8 +935,33 @@ impl FilesystemMT for LwsVfsIns {
/// * `path`: path to the directory.
/// * `fh`: file handle returned from the `opendir` call.
/// * `flags`: the file access flags passed to the `opendir` call.
fn releasedir(&self, _req: RequestInfo, _path: &Path, _fh: u64, _flags: u32) -> ResultEmpty {
Err(libc::ENOSYS)
fn releasedir(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32) -> ResultEmpty {
let request = tonic::Request::new(Releasedir {
path: path.to_str().unwrap().to_string(),
fi:Some(FileInfo{
fh,
..FileInfo::default()
}),
..Releasedir::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.freleasedir(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Write out any pending changes to a directory.
@ -409,8 +1031,30 @@ impl FilesystemMT for LwsVfsIns {
///
/// Return `Ok(())` if all requested permissions are allowed, otherwise return `Err(EACCES)`
/// or other error code as appropriate (e.g. `ENOENT` if the file doesn't exist).
fn access(&self, _req: RequestInfo, _path: &Path, _mask: u32) -> ResultEmpty {
Err(libc::ENOSYS)
fn access(&self, _req: RequestInfo, path: &Path, mask: u32) -> ResultEmpty {
let request = tonic::Request::new(Access {
path: path.to_str().unwrap().to_string(),
mask,
..Access::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.faccess(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
Ok(())
}
/// Create and open a new file.
@ -422,32 +1066,82 @@ impl FilesystemMT for LwsVfsIns {
///
/// Return a `CreatedEntry` (which contains the new file's attributes as well as a file handle
/// -- see documentation on `open` for more info on that).
fn create(&self, _req: RequestInfo, _parent: &Path, _name: &OsStr, _mode: u32, _flags: u32) -> ResultCreate {
fn create(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32, flags: u32) -> ResultCreate {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
if let Some(_unused) = self.vir_root.find(&path) {
return Err(libc::ENOSYS)
}
let request = tonic::Request::new(Create {
path: path.clone(),
mode,
..Create::default()
});
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::ENOSYS),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fcreate(request));
self.rpcs.put(client.clone());
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
},
};
if resp.ret != 0 {
return Err(libc::ENOSYS);
}
let fi = resp.fi.unwrap();
if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) {
return Ok(CreatedEntry {
fh: fi.fh,
flags,
attr,
ttl
});
}
Err(libc::ENOSYS)
}
}
use std::env;
use std::{env, vec};
impl LwsVfsIns {
pub async fn new(json: &str) -> Result<LwsVfsIns, Box<dyn Error>> {
let config = Config::new(json)?;
let addr = format!("http://{}:{}", config.get_addr(), config.get_port());
//connect to server
let rpc = RefCell::new(LwsVfsClient::connect(addr).await?);
let rpcs = RpcPool::new(10, addr).await?;
let async_rt = tokio::runtime::Runtime::new().unwrap();
let mut vir_root = VirFs::new();
for mount in config.get_mount() {
vir_root.add(mount);
}
Ok(LwsVfsIns {
config,
rpc,
rpcs,
async_rt,
vir_root,
})
}
pub fn hello(&self) -> Result<(), Box<dyn Error>> {
let request = tonic::Request::new(HelloRequest {
name: "Ekko lws hello".into(),
});
let response = self.async_rt.block_on(self.rpc.borrow_mut().say_hello(request))?;
println!("RESPONSE={:?}", response);
Ok(())
let client = self.rpcs.get()?;
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.say_hello(request));
self.rpcs.put(client.clone());
match resp {
Ok(resp) => {
println!("resp={:?}", resp);
Ok(())
},
Err(status) => {
Err(Box::new(IoError::new(ErrorKind::Other, format!("get rpc resp fail sta{:?}", status))))
}
}
}
pub fn mount<F>(file_system:F) -> Result<(), Box<dyn Error>>