Compare commits

...

2 Commits

4 changed files with 129 additions and 101 deletions

View File

@ -19,6 +19,8 @@ hashbrown = "0.9.0"
log = "0.4"
env_logger = "0.8"
clap = "3.0"
nix = { version = "0.29.0", features = ["process", "feature"]}
signal-hook = "0.3.17"
[build-dependencies]
tonic-build = "0.11"

View File

@ -1,3 +1,10 @@
# lws_client
lws的客户端。运行在linux服务器上。使用fuser
lws的客户端。运行在linux服务器上。使用fuser
如果要编译libfue3.0 的版本
需要将如下的命令撤销还原回来原来的路径将优先链接libfuse3
```bash
mv /usr/local/lib/x86_64-linux-gnu/ /usr/local/lib/x86_64-linux-gnu_fuse3
```

View File

@ -1,49 +1,15 @@
use clap::{App, Arg};
use lws_client::LwsVfsIns;
use std::thread;
use std::{process, thread};
use std::process::Command;
extern crate log;
use nix::unistd::{fork, ForkResult, getpid, getppid};
use signal_hook::consts::{SIGCHLD, SIGINT};
use signal_hook::iterator::Signals;
use std::env;
const DEFAULT_PORT: u16 = 33444;
const DEFAULT_CACHE_LIFE: u32 = 10_000;
// use std::process::{Command,Stdio};
// fn get_ssh_clinet() -> String {
// // Run `who` command
// let who = Command::new("who")
// .stdout(Stdio::piped())
// .spawn().unwrap();
// // Run `grep $(whoami)` command
// let whoami = Command::new("whoami")
// .stdout(Stdio::piped())
// .output().unwrap();
// let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string();
// let grep = Command::new("grep")
// .arg(&user)
// .stdin(Stdio::from(who.stdout.unwrap()))
// .stdout(Stdio::piped())
// .spawn().unwrap();
// // Run `awk '{print $5}'` command
// let awk = Command::new("awk")
// .arg("{print $5}")
// .stdin(Stdio::from(grep.stdout.unwrap()))
// .stdout(Stdio::piped())
// .spawn().unwrap();
// // Collect the output
// let output = awk.wait_with_output().unwrap();
// let output = String::from_utf8_lossy(&output.stdout).to_string();
// println!("output: {}",output);
// let mut iter = output.split("(");
// iter.next();
// let output = iter.next().unwrap().split(")").collect::<Vec<&str>>()[0].to_string();
// println!("output: {}",output);
// output
// }
fn get_ssh_clinet() -> String {
// 获取特定环境变量的值
let client = env::var("SSH_CLIENT").expect("only support auto get ssh connection");
@ -123,6 +89,43 @@ fn param_parser() -> (String, String, u32) {
(server, mount_point, cache_life)
}
fn umount_point(mount: &String) {
log::info!("fusermount -u {}", mount);
let output = Command::new("fusermount")
.arg("-u")
.arg(mount)
.output()
.expect("umount command execaute failed");
if output.status.success(){
log::info!("{}", format!("umount {} success", mount));
} else {
if let Some(code) = output.status.code() {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr:String = String::from_utf8_lossy(&output.stderr).to_string();
// 不存在的话就不需要umount了
if stderr.contains("not found") {
log::info!("{}", format!("not need umount {}", mount));
return;
}
log::error!("{}", format!("umount {} fail: {}", mount, code));
log::error!("{} {}", stdout, stderr);
} else {
log::error!("{}", format!("umount {} interrupted", mount));
}
}
}
fn set_cleanup(mount: &String) {
// 捕获 SIGCHLD 信号
log::info!("waiting for SIGCHLD SIGINT signal");
let mut signals = Signals::new(&[SIGCHLD, SIGINT]).expect("Error creating signals");
for _ in signals.forever() {
log::info!("Catch SIGCHLD||SIGINT, exit....");
umount_point(&mount);
break;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
@ -134,6 +137,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
return Err(e);
}
};
// 尝试卸载此前的挂载点
umount_point(&mount_point);
child_process(&mount_point);
log::info!("lws client instance created");
match lws_ins.hello().await {
Err(e) => {
@ -160,4 +166,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)))
}
}
}
fn child_process(mount: &String) {
match unsafe{fork()} {
Ok(ForkResult::Parent { child: _ }) => {
// no thing
}
Ok(ForkResult::Child) => {
let pid = getpid();
let ppid = getppid();
log::info!("parent: {}, child: {}", ppid, pid);
set_cleanup(mount);
}
Err(err) => {
log::error!("fork fail: {}", err);
process::exit(1);
}
}
}

View File

@ -7,10 +7,9 @@ use lws_client::{
use serde_json::{self, Value};
use std::error::Error;
use std::ffi::OsStr;
use std::fs::File;
use std::io::{Error as IoError, ErrorKind, Read as IoRead};
use std::thread::{self, JoinHandle};
use std::{env, path, vec};
use std::io::{Error as IoError, ErrorKind};
use std::thread::{self};
use std::{env, vec};
use tokio::runtime::{Builder, Runtime};
use tonic::transport::Channel as RpcChannel;
extern crate log;
@ -238,12 +237,12 @@ impl VirFs {
//self.match_path(&mut vname)
}
pub fn match_path<'a>(&self, name_list: &mut impl Iterator<Item = &'a str>) -> Option<&VirFs> {
pub fn _match_path<'a>(&self, name_list: &mut impl Iterator<Item = &'a str>) -> Option<&VirFs> {
if let Some(current) = name_list.next() {
match self.sub.get(current) {
Some(v) => {
// try match sub dir
if let Some(sub) = v.match_path(name_list) {
if let Some(sub) = v._match_path(name_list) {
Some(sub)
// sub dirs not match
} else {
@ -313,7 +312,7 @@ impl FileAttrCacheCtx {
}
pub struct FileAttrCacheManager {
handle: thread::JoinHandle<()>,
_handle: thread::JoinHandle<()>,
tx: mpsc::Sender<FileAttrCacheMsg>,
}
@ -324,7 +323,7 @@ impl FileAttrCacheManager {
let handle = thread::spawn(move || {
Self::cache_manager(cache_ctx, cache_life);
});
FileAttrCacheManager { handle, tx }
FileAttrCacheManager { _handle:handle, tx }
}
fn cache_manager(mut ctx: FileAttrCacheCtx, cache_life: u32) {
@ -432,6 +431,16 @@ pub struct LwsVfsIns {
cache: FileAttrCacheManager,
}
/// 将返回值转换为 Result<(), i32> 类型给rpc使用
macro_rules! ret2result {
($ret:expr) => {
match $ret {
0 => Ok(()),
_ => Err($ret),
}
};
}
fn mode_to_filetype(mode: libc::mode_t) -> FileType {
match mode & libc::S_IFMT {
libc::S_IFDIR => FileType::Directory,
@ -541,8 +550,8 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
self.cache.set(&path, Err(libc::ENOENT));
return Err(libc::ENOENT);
self.cache.set(&path, Err(resp.ret));
return Err(resp.ret);
}
let attr = file_attr(&resp.stat.unwrap(), &req);
self.cache.set(&path, Ok(attr.clone()));
@ -606,10 +615,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Set timestamps of a filesystem entry.
@ -666,10 +672,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
// END OF SETATTR FUNCTIONS
@ -732,7 +735,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
return Err(resp.ret)
}
self.getattr(req, Path::new(&path), None)
}
@ -768,10 +771,7 @@ impl FilesystemMT for LwsVfsIns {
};
self.cache.clr(vec![path]);
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Remove a directory.
@ -807,10 +807,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Create a symbolic link.
@ -870,10 +867,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Create a hard link.
@ -929,7 +923,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
return Err(resp.ret)
}
let fi = resp.fi.unwrap();
Ok((fi.fh, fi.flags))
@ -989,7 +983,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get fread resp size is: {}", resp.size);
if resp.ret != 0 {
return callback(Err(libc::ENOMSG));
return callback(Err(resp.ret));
}
callback(Ok(&resp.buff))
}
@ -1044,7 +1038,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
return Err(resp.ret)
}
Ok(resp.size as u32)
}
@ -1089,10 +1083,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Called when an open file is closed.
@ -1145,11 +1136,8 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
ret2result!(resp.ret)
// self.cache.clr(vec![path]);
Ok(())
}
/// Write out any pending changes of a file.
@ -1203,7 +1191,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
return Err(resp.ret);
}
Ok((resp.fi.unwrap().fh, flags))
}
@ -1268,7 +1256,7 @@ impl FilesystemMT for LwsVfsIns {
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
return Err(resp.ret)
}
let mut dirs = vec![];
// let clrs: Vec<String> = resp.dirs.iter().map(|dir| dir.name.clone().into()).collect();
@ -1321,10 +1309,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::ENOMSG);
}
Ok(())
ret2result!(resp.ret)
}
/// Write out any pending changes to a directory.
@ -1444,10 +1429,7 @@ impl FilesystemMT for LwsVfsIns {
}
};
log::trace!("get resp: {:?}", resp);
if resp.ret != 0 {
return Err(libc::EACCES);
}
Ok(())
ret2result!(resp.ret)
}
/// Create and open a new file.
@ -1501,15 +1483,17 @@ impl FilesystemMT for LwsVfsIns {
}
self.cache.clr(vec![path.to_string()]);
let fi = resp.fi.unwrap();
if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) {
return Ok(CreatedEntry {
fh: fi.fh,
flags,
attr,
ttl,
});
match self.getattr(req, Path::new(&path), Some(fi.fh)) {
Ok((ttl, attr)) => {
Ok(CreatedEntry {
fh: fi.fh,
flags,
attr,
ttl,
})
},
Err(e) => Err(e),
}
Err(libc::ENOMSG)
}
}
@ -1594,7 +1578,15 @@ impl LwsVfsIns {
where
F: FilesystemMT + Sync + Send + 'static,
{
let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=lws_vfs")];
let user = match env::var("USER") {
Ok(user) => user,
Err(_) => {
log::warn!("Can not get user name, use 'unknown'");
"unknown".to_string()
},
};
let name = format!("fsname=lws_vfs@{}", user);
let fuse_args = [OsStr::new("-o"), OsStr::new(name.as_str())];
fuse_mt::mount(
fuse_mt::FuseMT::new(file_system, 10),
&mount_point,