1. 修改配置的解析方式,使用建立rpc链接的时候使用一个get_config的请求同步,这样不用维护windows和linux两个配置文件
2. 修改命令行参数使其更加符合CLI交互,使用clap库实现。 遗留问题: 权限全是0777。需要改一下
This commit is contained in:
@ -1,11 +1,100 @@
|
||||
use lws_client::LwsVfsIns;
|
||||
use std::thread;
|
||||
use clap::{Arg, App};
|
||||
extern crate log;
|
||||
|
||||
use std::env;
|
||||
|
||||
const DEFAULT_PORT: u16 = 33444;
|
||||
|
||||
// use std::process::{Command,Stdio};
|
||||
// fn get_ssh_clinet() -> String {
|
||||
// // Run `who` command
|
||||
// let who = Command::new("who")
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Run `grep $(whoami)` command
|
||||
// let whoami = Command::new("whoami")
|
||||
// .stdout(Stdio::piped())
|
||||
// .output().unwrap();
|
||||
// let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string();
|
||||
|
||||
// let grep = Command::new("grep")
|
||||
// .arg(&user)
|
||||
// .stdin(Stdio::from(who.stdout.unwrap()))
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Run `awk '{print $5}'` command
|
||||
// let awk = Command::new("awk")
|
||||
// .arg("{print $5}")
|
||||
// .stdin(Stdio::from(grep.stdout.unwrap()))
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Collect the output
|
||||
// let output = awk.wait_with_output().unwrap();
|
||||
// let output = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
// println!("output: {}",output);
|
||||
// let mut iter = output.split("(");
|
||||
// iter.next();
|
||||
// let output = iter.next().unwrap().split(")").collect::<Vec<&str>>()[0].to_string();
|
||||
// println!("output: {}",output);
|
||||
// output
|
||||
// }
|
||||
|
||||
fn get_ssh_clinet() -> String {
|
||||
// 获取特定环境变量的值
|
||||
let client = env::var("SSH_CLIENT").expect("only support auto get ssh connection");
|
||||
let client = client.split(' ').next().unwrap();
|
||||
client.to_string()
|
||||
}
|
||||
|
||||
fn param_parser() -> (String, String) {
|
||||
let matches = App::new("lws_client")
|
||||
.version("1.0")
|
||||
.author("Ekko.bao")
|
||||
.about("linux&windows shared filesystem")
|
||||
.arg(Arg::with_name("s")
|
||||
.short('s')
|
||||
.long("server")
|
||||
.takes_value(true)
|
||||
.value_name("server")
|
||||
.help(format!("server addr: [ip[:port]], default is [sshclient:{}]", DEFAULT_PORT).as_str()))
|
||||
.arg(Arg::with_name("m")
|
||||
.short('m')
|
||||
.long("mount")
|
||||
.takes_value(true)
|
||||
.value_name("mount point")
|
||||
.required(true)
|
||||
.help("mount point, eg: ~/mnt"))
|
||||
.get_matches();
|
||||
let server = match matches.value_of("s") {
|
||||
Some(server) => {
|
||||
let split = server.split(":").collect::<Vec<&str>>();
|
||||
if split.len() == 2 {
|
||||
server.to_string()
|
||||
} else {
|
||||
format!("{}:{}", split[0], DEFAULT_PORT)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
let ip = get_ssh_clinet();
|
||||
format!("{}:{}", ip, DEFAULT_PORT)
|
||||
}
|
||||
};
|
||||
log::info!("args server: [{}]", server);
|
||||
let mount_point = matches.value_of("m").unwrap().to_string();
|
||||
log::info!("args mount_point: [{}]", mount_point);
|
||||
(server, mount_point)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
let lws_ins = match LwsVfsIns::new("config.json").await {
|
||||
let (server, mount_point) = param_parser();
|
||||
let lws_ins = match LwsVfsIns::new(&server).await {
|
||||
Ok(ins) => ins,
|
||||
Err(e) => {
|
||||
log::error!("Error creating lws server instance: {:?}", e);
|
||||
@ -22,7 +111,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
log::info!("start mount process");
|
||||
let handle = thread::spawn(move ||{
|
||||
match LwsVfsIns::mount(lws_ins) {
|
||||
match LwsVfsIns::mount(&mount_point, lws_ins) {
|
||||
Ok(_) => {
|
||||
Ok::<i32, String>(0)
|
||||
},
|
||||
|
81
src/lib.rs
81
src/lib.rs
@ -1,16 +1,16 @@
|
||||
use lws_client::lws_vfs_client::LwsVfsClient;
|
||||
use lws_client::{
|
||||
Access, Create, FileInfo, Flush, Fstat, Getattr, HelloRequest, Mkdir, Open, Opendir, Read,
|
||||
Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write,
|
||||
Readdir, Release, Releasedir, Rename, Rmdir, Timespec, Truncate, Unlink, Utimens, Write, GetConfig,
|
||||
};
|
||||
use serde_json::{self, Value};
|
||||
use hashbrown::HashMap;
|
||||
use std::error::Error;
|
||||
use std::ffi::{OsStr, OsString};
|
||||
use std::ffi::{OsStr};
|
||||
use std::fs::File;
|
||||
use std::{env, path, vec};
|
||||
use std::io::{Error as IoError, ErrorKind, Read as IoRead};
|
||||
use tonic::transport::{channel, Channel as RpcChannel};
|
||||
use tonic::transport::{Channel as RpcChannel};
|
||||
use tokio::runtime::{Builder,Runtime};
|
||||
use std::thread::{self, JoinHandle};
|
||||
extern crate log;
|
||||
@ -21,28 +21,12 @@ pub mod lws_client {
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Config {
|
||||
port: u16,
|
||||
addr: String,
|
||||
mount_map: HashMap<String, String>,
|
||||
}
|
||||
impl Config {
|
||||
pub fn new(json: &str) -> Result<Config, Box<dyn Error>> {
|
||||
let mut file = File::open(json)?;
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer)?;
|
||||
let json: Value = serde_json::from_str(buffer.as_ref())?;
|
||||
let port: u16 = match json.get("port") {
|
||||
Some(port) => port.as_u64().expect("expect port is a number but its not") as u16,
|
||||
None => 5001,
|
||||
};
|
||||
let addr: String = match json.get("addr") {
|
||||
Some(addr) => addr
|
||||
.as_str()
|
||||
.expect("expect addr is a String but its not")
|
||||
.to_string(),
|
||||
None => "localhost".to_string(),
|
||||
};
|
||||
let mounts = match json.get("mount") {
|
||||
let json: Value = serde_json::from_str(json)?;
|
||||
let mounts = match json.get("mount_map") {
|
||||
Some(mounts) => mounts.as_object().unwrap(),
|
||||
None => {
|
||||
return Err(Box::new(std::io::Error::new(
|
||||
@ -61,19 +45,10 @@ impl Config {
|
||||
})
|
||||
.collect();
|
||||
Ok(Config {
|
||||
port,
|
||||
addr,
|
||||
mount_map,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_port(&self) -> u16 {
|
||||
self.port
|
||||
}
|
||||
pub fn get_addr(&self) -> &String {
|
||||
&self.addr
|
||||
}
|
||||
|
||||
pub fn get_mount(&self) -> Vec<&String> {
|
||||
let mut ret = vec![];
|
||||
for (_path, mount) in &self.mount_map {
|
||||
@ -101,7 +76,7 @@ impl RpcPool {
|
||||
async fn new(size: usize, uri: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let mut clients = Vec::with_capacity(size);
|
||||
let uri = uri.to_ascii_lowercase();
|
||||
log::info!("create rpc pool {}", uri);
|
||||
log::info!("create rpc pool size: {}", size);
|
||||
let channel = RpcChannel::from_shared(uri)?.connect().await?;
|
||||
for _ in 0..size {
|
||||
let client = LwsVfsClient::new(channel.clone());
|
||||
@ -198,7 +173,7 @@ impl VirFs {
|
||||
VirFs {
|
||||
_name: name.to_string(),
|
||||
stat: lws_client::Fstat {
|
||||
fst_mode: 0o0755 | {
|
||||
fst_mode: 0o0555 | {
|
||||
if is_dir {
|
||||
libc::S_IFDIR
|
||||
} else {
|
||||
@ -284,7 +259,7 @@ impl VirFs {
|
||||
for name in self.sub.keys() {
|
||||
ret.push(name.to_string());
|
||||
}
|
||||
log::info!("readdir: {:?}", ret);
|
||||
log::trace!("readdir: {:?}", ret);
|
||||
ret
|
||||
}
|
||||
}
|
||||
@ -1535,10 +1510,11 @@ impl FilesystemMT for LwsVfsIns {
|
||||
|
||||
|
||||
impl LwsVfsIns {
|
||||
pub async fn new(json: &str) -> Result<LwsVfsIns, Box<dyn Error>> {
|
||||
let config = Config::new(json)?;
|
||||
let addr = format!("http://{}:{}", config.get_addr(), config.get_port());
|
||||
pub async fn new(server: &String) -> Result<LwsVfsIns, Box<dyn Error>> {
|
||||
let addr = format!("http://{}", server);
|
||||
log::info!("lws server is {}", addr);
|
||||
let rpcs = RpcPool::new(10, addr).await?;
|
||||
let config = LwsVfsIns::fetch_config(&rpcs).await?;
|
||||
let mut vir_root = VirFs::new();
|
||||
for mount in config.get_mount() {
|
||||
vir_root.add(mount);
|
||||
@ -1550,6 +1526,31 @@ impl LwsVfsIns {
|
||||
cache:FileAttrCacheManager::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn fetch_config(rpcs: &RpcPool) -> Result<Config, Box<dyn Error>> {
|
||||
let (rpc, resp) = {
|
||||
let request = tonic::Request::new(GetConfig {
|
||||
..GetConfig::default()
|
||||
});
|
||||
let client = rpcs.get().unwrap();
|
||||
let mut rpc = client.write().unwrap();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rpc.get_config(request).await;
|
||||
(client.clone(), resp)
|
||||
};
|
||||
rpcs.put(rpc);
|
||||
let resp = match resp {
|
||||
Ok(resp) => resp.into_inner(),
|
||||
Err(e) => {
|
||||
log::error!("get resp err: {:?}", e);
|
||||
return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, "get config err")));
|
||||
}
|
||||
};
|
||||
let config = Config::new(&resp.config).expect("parse config err");
|
||||
log::info!("config: {:?}", config);
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<(), Box<dyn Error>> {
|
||||
let request = tonic::Request::new(HelloRequest {
|
||||
name: "Ekko lws hello".into(),
|
||||
@ -1582,16 +1583,14 @@ impl LwsVfsIns {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn mount<F>(file_system: F) -> Result<(), Box<dyn Error>>
|
||||
pub fn mount<F>(mount_point:&str, file_system: F) -> Result<(), Box<dyn Error>>
|
||||
where
|
||||
F: FilesystemMT + Sync + Send + 'static,
|
||||
{
|
||||
let args: Vec<OsString> = env::args_os().collect();
|
||||
log::info!("args is {:?}", args);
|
||||
let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=passthrufs")];
|
||||
let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=lws_vfs")];
|
||||
fuse_mt::mount(
|
||||
fuse_mt::FuseMT::new(file_system, 10),
|
||||
&args[2],
|
||||
&mount_point,
|
||||
&fuse_args[..],
|
||||
)?;
|
||||
Ok(())
|
||||
|
Reference in New Issue
Block a user