1. 解决访问慢的问题。增加缓存机制,默认10秒。提高访问速度。

2. 测试新建文件,修改文件,删除文件等操作均可。但是vim编辑存在swap文件访问时序的问题。会先创建swap文件然后又删除导致报错,但是不影响文件编辑。
3. 修改日志库,使用log代替println宏。
This commit is contained in:
Ekko.bao 2024-08-04 17:57:26 +08:00
parent 6ae463369d
commit 80c7a14ff2
4 changed files with 453 additions and 138 deletions

79
Cargo.lock generated
View File

@ -17,6 +17,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0453232ace82dee0dd0b4c87a59bd90f7b53b314f3e0f61fe2ee7c8a16482289"
[[package]]
name = "aho-corasick"
version = "1.1.3"
@ -65,6 +71,17 @@ dependencies = [
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.3.0"
@ -179,6 +196,19 @@ version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@ -216,8 +246,6 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fuse_mt"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e098b8dc4cd32e9ba31d9c8cdfef11271d8191233c64c2a671432ff19d354948"
dependencies = [
"fuser",
"libc",
@ -315,6 +343,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -333,6 +370,15 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.9"
@ -373,6 +419,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.30"
@ -466,8 +518,11 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
name = "lws_client"
version = "0.1.0"
dependencies = [
"env_logger",
"fuse_mt",
"hashbrown 0.9.1",
"libc",
"log",
"prost",
"serde",
"serde_json",
@ -526,7 +581,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.9",
"libc",
]
@ -871,6 +926,15 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]]
name = "threadpool"
version = "1.8.1"
@ -1088,6 +1152,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@ -13,8 +13,11 @@ prost = "0.12"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
fuse_mt = "0.6.1"
fuse_mt = { path = "third_party/fuse-mt" }
libc = "0.2"
hashbrown = "0.9.0"
log = "0.4"
env_logger = "0.8"
[build-dependencies]
tonic-build = "0.11"

View File

@ -1,30 +1,33 @@
use lws_client::LwsVfsIns;
use std::thread;
extern crate log;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let lws_ins = match LwsVfsIns::new("config.json").await {
Ok(ins) => ins,
Err(e) => {
println!("Error creating lws server instance: {:?}", e);
log::error!("Error creating lws server instance: {:?}", e);
return Err(e);
}
};
println!("start hello process");
log::info!("lws client instance created");
match lws_ins.hello().await {
Err(e) => {
println!("lws client instance hello err {:?}", e);
log::error!("lws client instance hello err {:?}", e);
return Err(e);
}
_ => {}
}
println!("start mount process");
log::info!("start mount process");
let handle = thread::spawn(move ||{
match LwsVfsIns::mount(lws_ins) {
Ok(_) => {
Ok::<i32, String>(0)
},
Err(e) => {
println!("mount err {:?}", e);
log::error!("mount err {:?}", e);
Ok::<i32, String>(-1)
}
}
@ -34,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
},
Err(e) => {
eprintln!("mount thread start err {:?}", e);
log::error!("mount thread start err {:?}", e);
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "mount fail")))
}
}

View File

@ -4,12 +4,16 @@ use lws_client::{
Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write,
};
use serde_json::{self, Value};
use std::collections::HashMap;
use hashbrown::HashMap;
use std::error::Error;
use std::ffi::{OsStr, OsString};
use std::fs::File;
use std::{env, path, vec};
use std::io::{Error as IoError, ErrorKind, Read as IoRead};
use tonic::transport::Channel as RpcChannel;
use tonic::transport::{channel, Channel as RpcChannel};
use tokio::runtime::{Builder,Runtime};
use std::thread::{self, JoinHandle};
extern crate log;
pub mod lws_client {
tonic::include_proto!("lws_vfs"); //导入lws vfs proto buffer
@ -87,20 +91,27 @@ struct RpcPool {
struct Rpc {
pub client: LwsVfsClient<RpcChannel>,
pub rt: Arc<Runtime>,
pub in_use: bool,
pub tx: Arc<mpsc::Sender<FileAttrCacheMsg>>,
pub rx: Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>,
}
impl RpcPool {
async fn new(size: usize, uri: String) -> Result<Self, Box<dyn std::error::Error>> {
let mut clients = Vec::with_capacity(size);
let uri = uri.to_ascii_lowercase();
println!("create rpc pool {}", uri);
log::info!("create rpc pool {}", uri);
let channel = RpcChannel::from_shared(uri)?.connect().await?;
for _ in 0..size {
let client = LwsVfsClient::new(channel.clone());
let (tx,rx) = mpsc::channel();
clients.push(Arc::new(RwLock::new(Rpc {
client,
in_use: false,
rt: Arc::new(Builder::new_multi_thread().worker_threads(1).build().unwrap()),
tx: Arc::new(tx),
rx: Arc::new(Mutex::new(rx)),
})));
}
let wait = (Mutex::new(()), Condvar::new());
@ -112,7 +123,7 @@ impl RpcPool {
}
fn get(&self) -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>> {
// 优化遍历的逻辑
// TODO: 优化遍历的逻辑
let find_idle = |rpc_pool: &Self| -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>> {
let cs = self.clients.clone();
let clients = &(cs.lock().unwrap());
@ -123,7 +134,7 @@ impl RpcPool {
let entry = clients[idx].read().unwrap();
if !entry.in_use {
drop(entry);
let mut entry = clients[idx].write().unwrap();
let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = clients[idx].write().unwrap();
entry.in_use = true;
return Ok(clients[idx].clone());
}
@ -144,7 +155,7 @@ impl RpcPool {
break;
}
}
println!("get free rpc client time out");
log::warn!("get free rpc client time out");
Err(Box::new(IoError::new(
ErrorKind::Other,
"get free rpc client time out",
@ -222,9 +233,13 @@ impl VirFs {
T: ToString,
{
let name = name.to_string();
let ret = name.split('/').filter(|x| x.len() > 0).map(|y| y).collect::<Vec<&str>>();
let ret = name
.split('/')
.filter(|x| x.len() > 0)
.map(|y| y)
.collect::<Vec<&str>>();
let name = ret[0];
println!("add fs {}", name);
log::debug!("add fs {}", name);
let mut node = VirFs::create(name.to_string(), true);
node.is_root = false;
self.sub.insert(name.to_string(), node);
@ -236,7 +251,7 @@ impl VirFs {
{
let name = name.to_string();
let mut vname = name.split('/').filter(|x| x.len() > 0);
//println!("try match {}", name);
//log::info!("try match {}", name);
if let Some(_curr) = vname.next() {
return None;
}
@ -258,7 +273,8 @@ impl VirFs {
}
None => None,
}
} else { // find over
} else {
// find over
Some(self)
}
}
@ -268,16 +284,170 @@ impl VirFs {
for name in self.sub.keys() {
ret.push(name.to_string());
}
println!("readdir: {:?}", ret);
log::info!("readdir: {:?}", ret);
ret
}
}
type FileAttrInfo = Result<FileAttr, i32>;
pub struct FileAttrCache{
attr: FileAttrInfo,
exp_time: u32,
}
use std::time::UNIX_EPOCH;
fn current_time_in_ms() -> u32 {
let start = SystemTime::now();
start.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u32
}
impl FileAttrCache {
pub fn new(attr: FileAttrInfo) -> FileAttrCache{
FileAttrCache {
attr,
exp_time: current_time_in_ms() + 10_000,
}
}
}
use std::sync::mpsc;
pub enum FileAttrCacheMsg {
Get(String, Arc<mpsc::Sender<FileAttrCacheMsg>>),
GetReply(String, Option<FileAttrInfo>),
Set(String, FileAttrInfo),
Clr(Vec<String>),
}
pub struct FileAttrCacheCtx{
pub cache: HashMap<String, FileAttrCache>,
pub cache_size: u32,
rx: Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>,
}
impl FileAttrCacheCtx {
pub fn new(rx: mpsc::Receiver<FileAttrCacheMsg>) -> FileAttrCacheCtx {
FileAttrCacheCtx{
cache: HashMap::new(),
cache_size: 0,
rx: Arc::new(Mutex::new(rx)),
}
}
}
pub struct FileAttrCacheManager{
handle: thread::JoinHandle<()>,
tx: mpsc::Sender<FileAttrCacheMsg>,
}
impl FileAttrCacheManager {
pub fn new() -> FileAttrCacheManager{
let (tx, rx) = mpsc::channel();
let cache_ctx = FileAttrCacheCtx::new(rx);
let handle = thread::spawn(|| {
Self::cache_manager(cache_ctx);
});
FileAttrCacheManager{
handle,
tx,
}
}
fn cache_manager(mut ctx: FileAttrCacheCtx) {
let rx = ctx.rx.lock().unwrap();
fn invalid_cache(cache: &mut HashMap<String, FileAttrCache>) -> u32 {
let mut cache_size = 0;
let curr = current_time_in_ms();
let del:Vec<String> = cache.iter()
.filter(|&(_, val)| val.exp_time < curr)
.map(|(key, _)| key.clone())
.collect();
for key in del {
cache.remove(&key);
log::debug!("invalid cache: {}", key);
cache_size += 1;
}
cache_size
}
loop {
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(req) => {
if ctx.cache_size > 256 {
ctx.cache_size -= invalid_cache(&mut ctx.cache);
}
match req {
FileAttrCacheMsg::Set(path, attr) => {
ctx.cache.insert(path, FileAttrCache::new(attr));
ctx.cache_size += 1;
},
FileAttrCacheMsg::Get(path, chn) => {
match ctx.cache.get(&path) {
Some(attr) => {
let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr)));
},
None => {
let _ = chn.send(FileAttrCacheMsg::GetReply(path, None));
}
}
},
FileAttrCacheMsg::Clr(paths) => {
for path in paths.into_iter() {
ctx.cache.remove(&path);
ctx.cache_size -= 1;
}
},
_ => {}
}
},
Err(_) =>{
ctx.cache_size -= invalid_cache(&mut ctx.cache);
},
}
}
}
pub fn set(&self, path:&String, attr: FileAttrInfo){
let _ = self.tx.send(FileAttrCacheMsg::Set(path.to_string(), attr));
}
pub fn get(&self, path:&String, tx: &Arc<mpsc::Sender<FileAttrCacheMsg>>, rx: &Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>) -> Option<FileAttrInfo> {
//send get gile attribute request
let _ = self.tx.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone()));
let total_timeout = Duration::from_millis(100);
let start = Instant::now();
loop {
let elapsed = start.elapsed();
if elapsed >= total_timeout {
return None
}
let timeout = total_timeout - elapsed;
let resp = {
match rx.lock().unwrap().recv_timeout(timeout){
Ok(resp) => resp,
Err(_e) => return None,
}
};
match resp {
FileAttrCacheMsg::GetReply(rpath, resp) => {
if *path == rpath {
//message is not match
return resp;
}
},
_ => break,
}
}
None
}
pub fn clr(&self, paths: Vec<String>) {
let _ = self.tx.send(FileAttrCacheMsg::Clr(paths));
}
}
pub struct LwsVfsIns {
pub config: Config,
rpcs: RpcPool,
async_rt: tokio::runtime::Runtime,
vir_root: VirFs,
cache: FileAttrCacheManager,
}
fn mode_to_filetype(mode: libc::mode_t) -> FileType {
@ -297,18 +467,17 @@ fn mode_to_filetype(mode: libc::mode_t) -> FileType {
use fuse_mt::*;
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
impl FilesystemMT for LwsVfsIns {
/// Called on mount, before any other function.
fn init(&self, _req: RequestInfo) -> ResultEmpty {
let resp = self.async_rt.block_on(self.hello());
match resp {
match self.init() {
Ok(()) => {
println!("init vir fs success");
log::info!("init vir fs success");
Ok(())
},
}
Err(e) => {
println!("hello err: {:?}", e);
log::error!("hello err: {:?}", e);
Err(libc::ENOMSG)
}
}
@ -316,19 +485,18 @@ impl FilesystemMT for LwsVfsIns {
/// Called on filesystem unmount.
fn destroy(&self) {
// Nothing.
}
/// Get the attributes of a filesystem entry.
///
/// * `fh`: a file handle if this is called on an open file.
fn getattr(&self, _req: RequestInfo, path: &Path, fh: Option<u64>) -> ResultEntry {
fn getattr(&self, req: RequestInfo, path: &Path, fh: Option<u64>) -> ResultEntry {
let time = |timens| {
let secs = timens / (1000 * 1000);
let nanos = timens % (1000 * 1000);
SystemTime::UNIX_EPOCH + Duration::new(secs as u64, nanos as u32)
};
let file_attr = |fst: &Fstat| FileAttr {
let file_attr = |fst: &Fstat, req:&RequestInfo| FileAttr {
size: fst.fst_size,
blocks: fst.fst_blocks,
atime: time(fst.fst_atime),
@ -338,21 +506,36 @@ impl FilesystemMT for LwsVfsIns {
kind: mode_to_filetype(fst.fst_mode as u32),
perm: fst.fst_mode as u16,
nlink: fst.fst_nlink as u32,
uid: fst.fst_uid as u32,
gid: fst.fst_gid as u32,
uid: req.uid,
gid: req.gid,
rdev: fst.fst_rdev as u32,
flags: 0,
};
// println!("getattr {:?}", path);
// let path = ;
if let Some(node) = path.to_str() {
if let Some(node) = self.vir_root.find(node) {
return Ok((Duration::new(0, 0), file_attr(&node.stat)));
let path = path.to_string_lossy().into_owned();
if let Some(node) = self.vir_root.find(&path) {
return Ok((Duration::new(0, 0), file_attr(&node.stat, &req)));
}
log::trace!("lws getattr {:?}", path);
let (rpc, resp) = {
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
if let Some(attr) = self.cache.get(&path, &rpc.tx, &rpc.rx) {
let d = client.clone();
drop(rpc);
self.rpcs.put(d);
match attr {
Ok(attr) => return Ok((Instant::now().elapsed(), attr)),
Err(_e) => {
return Err(libc::ENOENT);
}
}
}
let request = tonic::Request::new(Getattr {
path: path.to_string_lossy().into_owned(),
path: path.to_string(),
fi: Some(FileInfo {
fh: match fh {
Some(f) => f,
@ -362,28 +545,27 @@ impl FilesystemMT for LwsVfsIns {
}),
..Getattr::default()
});
let (rpc, resp) = {
let client = match self.rpcs.get() {
Ok(c) => c,
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fgetattr(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fgetattr(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
self.cache.set(&path, Err(libc::ENOENT));
return Err(libc::ENOENT);
}
Ok((Duration::new(0, 0), file_attr(&resp.stat.unwrap())))
let attr = file_attr(&resp.stat.unwrap(), &req);
self.cache.set(&path, Ok(attr.clone()));
Ok((Instant::now().elapsed(), attr))
}
// The following operations in the FUSE C API are all one kernel call: setattr
@ -429,18 +611,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.ftruncate(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.ftruncate(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -487,18 +671,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.futimens(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.futimens(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -537,7 +723,7 @@ if resp.ret != 0 {
fn mkdir(&self, req: RequestInfo, parent: &Path, name: &OsStr, mode: u32) -> ResultEntry {
let path = format!("{}/{}", parent.to_str().unwrap(), name.to_str().unwrap());
if let Some(_unused) = self.vir_root.find(&path) {
return Err(libc::ENOSYS)
return Err(libc::ENOSYS);
}
let request = tonic::Request::new(Mkdir {
path: path.to_string(),
@ -550,18 +736,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fmkdir(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fmkdir(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -584,18 +772,21 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.funlink(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.funlink(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
self.cache.clr(vec![path]);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -621,18 +812,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frmdir(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.frmdir(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -682,18 +875,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frename(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.frename(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -724,9 +919,8 @@ if resp.ret != 0 {
/// calls that operate on the file, and can be any value you choose, though it should allow
/// your filesystem to identify the file opened even without any path info.
fn open(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen {
println!("open flags is {}", flags);
let request = tonic::Request::new(Open {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
fi: Some(FileInfo {
flags,
..FileInfo::default()
@ -739,18 +933,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fopen(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fopen(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -782,7 +978,7 @@ if resp.ret != 0 {
callback: impl FnOnce(ResultSlice<'_>) -> CallbackResult,
) -> CallbackResult {
let request = tonic::Request::new(Read {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
fi: Some(FileInfo {
fh,
..FileInfo::default()
@ -797,18 +993,20 @@ if resp.ret != 0 {
Err(_) => return callback(Err(libc::ENOMSG)),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fread(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fread(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return callback(Err(libc::ENOMSG));
}
};
println!("get resp: {:?}", resp);
log::trace!("get fread resp size is: {}", resp.size);
if resp.ret != 0 {
return callback(Err(libc::ENOMSG));
}
@ -834,7 +1032,7 @@ if resp.ret != 0 {
_flags: u32,
) -> ResultWrite {
let request = tonic::Request::new(Write {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
fi: Some(FileInfo {
fh,
..FileInfo::default()
@ -850,18 +1048,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fwrite(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fwrite(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -881,7 +1081,7 @@ if resp.ret != 0 {
/// belonging to this lock owner.
fn flush(&self, _req: RequestInfo, path: &Path, fh: u64, _lock_owner: u64) -> ResultEmpty {
let request = tonic::Request::new(Flush {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
fi: Some(FileInfo {
fh,
..FileInfo::default()
@ -894,18 +1094,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fflush(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fflush(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -932,8 +1134,9 @@ if resp.ret != 0 {
_lock_owner: u64,
flush: bool,
) -> ResultEmpty {
let path = path.to_string_lossy().into_owned();
let request = tonic::Request::new(Release {
path: path.to_str().unwrap().to_string(),
path: path.to_string(),
fi: Some(FileInfo {
fh,
..FileInfo::default()
@ -947,21 +1150,24 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.frelease(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.frelease(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
// self.cache.clr(vec![path]);
Ok(())
}
@ -989,7 +1195,7 @@ if resp.ret != 0 {
fn opendir(&self, _req: RequestInfo, path: &Path, flags: u32) -> ResultOpen {
let path = path.to_str().unwrap();
if let Some(_node) = self.vir_root.find(&path) {
return Ok((6666, flags))
return Ok((6666, flags));
}
let request = tonic::Request::new(Opendir {
path: path.to_string(),
@ -1001,18 +1207,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fopendir(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fopendir(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -1064,22 +1272,26 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.freaddir(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.freaddir(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
let mut dirs = vec![];
// let clrs: Vec<String> = resp.dirs.iter().map(|dir| dir.name.clone().into()).collect();
// self.cache.clr(clrs);
for dir in resp.dirs {
dirs.push(DirectoryEntry {
name: dir.name.into(),
@ -1098,10 +1310,10 @@ if resp.ret != 0 {
/// * `flags`: the file access flags passed to the `opendir` call.
fn releasedir(&self, _req: RequestInfo, path: &Path, fh: u64, _flags: u32) -> ResultEmpty {
if fh == 6666 {
return Ok(())
return Ok(());
}
let request = tonic::Request::new(Releasedir {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
fi: Some(FileInfo {
fh,
..FileInfo::default()
@ -1114,18 +1326,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.freleasedir(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.freleasedir(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
@ -1225,7 +1439,7 @@ if resp.ret != 0 {
mode as u32
}
let request = tonic::Request::new(Access {
path: path.to_str().unwrap().to_string(),
path: path.to_string_lossy().into_owned(),
mask: mask2mode(mask as i32),
..Access::default()
});
@ -1235,18 +1449,20 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.faccess(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.faccess(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::EACCES);
}
@ -1285,21 +1501,24 @@ if resp.ret != 0 {
Err(_) => return Err(libc::EAGAIN),
};
let mut rpc = client.write().unwrap();
let resp = self.async_rt.block_on(rpc.client.fcreate(request));
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.fcreate(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
println!("get resp err: {:?}", e);
log::error!("get resp err: {:?}", e);
return Err(libc::ENOMSG);
}
};
println!("get resp: {:?}", resp);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
self.cache.clr(vec![path.to_string()]);
let fi = resp.fi.unwrap();
if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) {
return Ok(CreatedEntry {
@ -1312,14 +1531,14 @@ if resp.ret != 0 {
Err(libc::ENOMSG)
}
}
use std::{env, vec};
impl LwsVfsIns {
pub async fn new(json: &str) -> Result<LwsVfsIns, Box<dyn Error>> {
let config = Config::new(json)?;
let addr = format!("http://{}:{}", config.get_addr(), config.get_port());
let rpcs = RpcPool::new(10, addr).await?;
let async_rt = tokio::runtime::Runtime::new().unwrap();
let mut vir_root = VirFs::new();
for mount in config.get_mount() {
vir_root.add(mount);
@ -1327,10 +1546,27 @@ impl LwsVfsIns {
Ok(LwsVfsIns {
config,
rpcs,
async_rt,
vir_root,
cache:FileAttrCacheManager::new(),
})
}
pub fn init(&self) -> Result<(), Box<dyn Error>> {
let request = tonic::Request::new(HelloRequest {
name: "Ekko lws hello".into(),
});
let (rpc, resp) = {
let client = self.rpcs.get()?;
let mut rpc = client.write().unwrap();
let rt = rpc.rt.clone();
let rpc = &mut (rpc.client);
let resp = rt.block_on(rpc.say_hello(request));
(client.clone(), resp)
};
self.rpcs.put(rpc);
log::trace!("resp={:?}", resp);
Ok(())
}
pub async fn hello(&self) -> Result<(), Box<dyn Error>> {
let request = tonic::Request::new(HelloRequest {
name: "Ekko lws hello".into(),
@ -1342,7 +1578,7 @@ impl LwsVfsIns {
(client.clone(), resp)
};
self.rpcs.put(rpc);
println!("resp={:?}", resp);
log::trace!("resp={:?}", resp);
Ok(())
}
@ -1351,10 +1587,10 @@ impl LwsVfsIns {
F: FilesystemMT + Sync + Send + 'static,
{
let args: Vec<OsString> = env::args_os().collect();
println!("args is {:?}", args);
log::info!("args is {:?}", args);
let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=passthrufs")];
fuse_mt::mount(
fuse_mt::FuseMT::new(file_system, 1),
fuse_mt::FuseMT::new(file_system, 10),
&args[2],
&fuse_args[..],
)?;