Compare commits
3 Commits
b60feeb7ed
...
master
Author | SHA1 | Date | |
---|---|---|---|
acece94fae | |||
fa5b137db1 | |||
94c9751c00 |
197
Cargo.lock
generated
197
Cargo.lock
generated
@ -190,6 +190,12 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "cfg_aliases"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.2.25"
|
||||
@ -214,6 +220,26 @@ dependencies = [
|
||||
"os_str_bytes",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs"
|
||||
version = "4.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
|
||||
dependencies = [
|
||||
"dirs-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs-sys"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"redox_users",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.13.0"
|
||||
@ -526,36 +552,38 @@ version = "0.2.155"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
|
||||
|
||||
[[package]]
|
||||
name = "libredox"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.4.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "lws_client"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"env_logger",
|
||||
"fuse_mt",
|
||||
"hashbrown 0.9.1",
|
||||
"libc",
|
||||
"log",
|
||||
"prost",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
@ -600,6 +628,18 @@ version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.29.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.16.0"
|
||||
@ -641,6 +681,29 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.3.1"
|
||||
@ -812,6 +875,26 @@ dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_users"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"libredox",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.5"
|
||||
@ -872,6 +955,12 @@ version = "1.0.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.204"
|
||||
@ -903,6 +992,25 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"signal-hook-registry",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.9"
|
||||
@ -928,6 +1036,28 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sstar_lws_vfs"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"dirs",
|
||||
"env_logger",
|
||||
"fuse_mt",
|
||||
"hashbrown 0.9.1",
|
||||
"libc",
|
||||
"log",
|
||||
"nix",
|
||||
"parking_lot",
|
||||
"prost",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"signal-hook",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.10.0"
|
||||
@ -978,6 +1108,26 @@ version = "0.16.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9"
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.65"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.65"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "threadpool"
|
||||
version = "1.8.1"
|
||||
@ -1201,7 +1351,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1228,6 +1378,15 @@ dependencies = [
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.59.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.48.5"
|
||||
|
10
Cargo.toml
10
Cargo.toml
@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "lws_client"
|
||||
name = "sstar_lws_vfs"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
@ -7,6 +7,10 @@ edition = "2021"
|
||||
name = "lws_vfs_client"
|
||||
path = "src/client.rs"
|
||||
|
||||
# [[bin]]
|
||||
# name = "sscp"
|
||||
# path = "src/sscp.rs"
|
||||
|
||||
[dependencies]
|
||||
tonic = "0.11"
|
||||
prost = "0.12"
|
||||
@ -19,6 +23,10 @@ hashbrown = "0.9.0"
|
||||
log = "0.4"
|
||||
env_logger = "0.8"
|
||||
clap = "3.0"
|
||||
nix = { version = "0.29.0", features = ["process", "feature"]}
|
||||
signal-hook = "0.3.17"
|
||||
parking_lot = "0.12"
|
||||
dirs = "4.0"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.11"
|
||||
|
@ -1,3 +1,10 @@
|
||||
# lws_client
|
||||
|
||||
lws的客户端。运行在linux服务器上。使用fuser
|
||||
|
||||
|
||||
如果要编译libfue3.0 的版本
|
||||
需要将如下的命令撤销,还原回来原来的路径,将优先链接libfuse3
|
||||
```bash
|
||||
mv /usr/local/lib/x86_64-linux-gnu/ /usr/local/lib/x86_64-linux-gnu_fuse3
|
||||
```
|
10
config.json
10
config.json
@ -1,9 +1,9 @@
|
||||
{
|
||||
"mount": {
|
||||
"c:\\": "/l3c",
|
||||
"d:\\": "/l0d",
|
||||
"f:\\": "/l0e"
|
||||
"系统": "/l3c",
|
||||
"ekko.bao_ubuntu (\\\\192.168.0.112)": "/l0d",
|
||||
"softerware": "/l0e",
|
||||
"nas (\\\\192.168.0.117)": "/l0v"
|
||||
},
|
||||
"port": 7412,
|
||||
"addr": "192.168.0.111"
|
||||
"port": 33444
|
||||
}
|
185
src/client.rs
185
src/client.rs
@ -1,49 +1,21 @@
|
||||
use clap::{App, Arg};
|
||||
use lws_client::LwsVfsIns;
|
||||
use std::thread;
|
||||
use sstar_lws_vfs::LwsVfsIns;
|
||||
use std::{process, thread};
|
||||
use std::process::Command;
|
||||
extern crate log;
|
||||
|
||||
use nix::unistd::{fork, ForkResult, getpid, getppid};
|
||||
// use signal_hook::consts::{SIGCHLD, SIGINT, SIGTERM};
|
||||
// use signal_hook::iterator::Signals;
|
||||
use std::env;
|
||||
use libc::prctl;
|
||||
use std::time::Duration;
|
||||
use std::fs::{File, create_dir_all};
|
||||
use std::io::Write;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use dirs;
|
||||
|
||||
const DEFAULT_PORT: u16 = 33444;
|
||||
const DEFAULT_CACHE_LIFE: u32 = 10_000;
|
||||
// use std::process::{Command,Stdio};
|
||||
// fn get_ssh_clinet() -> String {
|
||||
// // Run `who` command
|
||||
// let who = Command::new("who")
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Run `grep $(whoami)` command
|
||||
// let whoami = Command::new("whoami")
|
||||
// .stdout(Stdio::piped())
|
||||
// .output().unwrap();
|
||||
// let user = String::from_utf8_lossy(&whoami.stdout).trim().to_string();
|
||||
|
||||
// let grep = Command::new("grep")
|
||||
// .arg(&user)
|
||||
// .stdin(Stdio::from(who.stdout.unwrap()))
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Run `awk '{print $5}'` command
|
||||
// let awk = Command::new("awk")
|
||||
// .arg("{print $5}")
|
||||
// .stdin(Stdio::from(grep.stdout.unwrap()))
|
||||
// .stdout(Stdio::piped())
|
||||
// .spawn().unwrap();
|
||||
|
||||
// // Collect the output
|
||||
// let output = awk.wait_with_output().unwrap();
|
||||
// let output = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
// println!("output: {}",output);
|
||||
// let mut iter = output.split("(");
|
||||
// iter.next();
|
||||
// let output = iter.next().unwrap().split(")").collect::<Vec<&str>>()[0].to_string();
|
||||
// println!("output: {}",output);
|
||||
// output
|
||||
// }
|
||||
|
||||
fn get_ssh_clinet() -> String {
|
||||
// 获取特定环境变量的值
|
||||
let client = env::var("SSH_CLIENT").expect("only support auto get ssh connection");
|
||||
@ -123,10 +95,115 @@ fn param_parser() -> (String, String, u32) {
|
||||
(server, mount_point, cache_life)
|
||||
}
|
||||
|
||||
fn umount_point(mount: &String) {
|
||||
log::info!("fusermount -u {}", mount);
|
||||
let output = Command::new("fusermount")
|
||||
.arg("-u")
|
||||
.arg(mount)
|
||||
.output()
|
||||
.expect("umount command execaute failed");
|
||||
if output.status.success(){
|
||||
log::info!("{}", format!("umount {} success", mount));
|
||||
} else {
|
||||
if let Some(code) = output.status.code() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
let stderr:String = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
// 不存在的话就不需要umount了
|
||||
if stderr.contains("not found") {
|
||||
log::info!("{}", format!("not need umount {}", mount));
|
||||
return;
|
||||
}
|
||||
log::error!("{}", format!("umount {} fail: {}", mount, code));
|
||||
log::error!("{} {}", stdout, stderr);
|
||||
} else {
|
||||
log::error!("{}", format!("umount {} interrupted", mount));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
fn set_cleanup(mount: &String) {
|
||||
// 捕获 SIGCHLD 信号
|
||||
// log::info!("waiting for SIGCHLD SIGINT signal");
|
||||
// let mut signals = Signals::new(&[SIGCHLD, SIGINT, SIGTERM]).expect("Error creating signals");
|
||||
let mout_point = mount.clone();
|
||||
// 子线程监控父进程是否退出
|
||||
let th: thread::JoinHandle<()> = thread::spawn( move || {
|
||||
let lws_client_pid = getppid();
|
||||
loop {
|
||||
let ppid = getppid();
|
||||
if 1 == ppid.as_raw() {
|
||||
log::info!("lws_client({}) has been exited! try cleanup", lws_client_pid);
|
||||
umount_point(&mout_point);
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_micros(100));
|
||||
}
|
||||
});
|
||||
// for sig in signals.forever() {
|
||||
// match sig {
|
||||
// SIGCHLD | SIGINT | SIGTERM=> {
|
||||
// log::info!("Catch SIGCHLD|SIGINT|SIGTERM signal");
|
||||
// umount_point(&mount);
|
||||
// break;
|
||||
// }
|
||||
// _ => {
|
||||
// // 处理其他信号
|
||||
// log::info!("Catch signal {} ....", sig);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
th.join().unwrap();
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct LwsConfig {
|
||||
server: String,
|
||||
mount_point: String,
|
||||
cache_life: u32,
|
||||
}
|
||||
|
||||
fn save_config(server: &str, mount_point: &str, cache_life: u32) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Get the user's home directory using dirs crate
|
||||
let home_dir = match dirs::home_dir() {
|
||||
Some(path) => path,
|
||||
None => return Err("Unable to get the user's home directory".into()),
|
||||
};
|
||||
// Create the configuration directory
|
||||
let config_dir = home_dir.join(".config").join("lws_client");
|
||||
create_dir_all(&config_dir)?;
|
||||
|
||||
// Create the configuration file
|
||||
let config_file = config_dir.join("config.json");
|
||||
|
||||
// Prepare the configuration data
|
||||
let config = LwsConfig {
|
||||
server: server.to_string(),
|
||||
mount_point: mount_point.to_string(),
|
||||
cache_life,
|
||||
};
|
||||
|
||||
// Serialize to JSON
|
||||
let config_json = serde_json::to_string_pretty(&config)?;
|
||||
|
||||
// Write to the file
|
||||
let mut file = File::create(config_file)?;
|
||||
file.write_all(config_json.as_bytes())?;
|
||||
|
||||
log::info!("Configuration saved to ~/.config/lws_client/config.json");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
let (server, mount_point, cache_life) = param_parser();
|
||||
|
||||
// Save the configuration
|
||||
if let Err(e) = save_config(&server, &mount_point, cache_life) {
|
||||
log::warn!("Failed to save configuration: {:?}", e);
|
||||
}
|
||||
|
||||
let lws_ins = match LwsVfsIns::new(&server, cache_life).await {
|
||||
Ok(ins) => ins,
|
||||
Err(e) => {
|
||||
@ -134,6 +211,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
// 尝试卸载此前的挂载点
|
||||
umount_point(&mount_point);
|
||||
child_process(&mount_point);
|
||||
log::info!("lws client instance created");
|
||||
match lws_ins.hello().await {
|
||||
Err(e) => {
|
||||
@ -160,4 +240,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fn child_process(mount: &String) {
|
||||
match unsafe{fork()} {
|
||||
Ok(ForkResult::Parent { child: _ }) => {
|
||||
// no thing
|
||||
}
|
||||
Ok(ForkResult::Child) => {
|
||||
let pid = getpid();
|
||||
let ppid = getppid();
|
||||
log::info!("parent: {}, child: {}", ppid, pid);
|
||||
unsafe {
|
||||
prctl(libc::PR_SET_NAME, b"lws_monitor\0".as_ptr() as usize, 0, 0, 0);
|
||||
}
|
||||
set_cleanup(mount);
|
||||
process::exit(0);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("fork fail: {}", err);
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
355
src/lib.rs
355
src/lib.rs
@ -7,12 +7,13 @@ use lws_client::{
|
||||
use serde_json::{self, Value};
|
||||
use std::error::Error;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::File;
|
||||
use std::io::{Error as IoError, ErrorKind, Read as IoRead};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::{env, path, vec};
|
||||
use std::io::{Error as IoError, ErrorKind};
|
||||
use std::thread::{self};
|
||||
use std::{env, vec};
|
||||
use tokio::runtime::{Builder, Runtime};
|
||||
use tonic::transport::Channel as RpcChannel;
|
||||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock, Condvar};
|
||||
extern crate log;
|
||||
|
||||
pub mod lws_client {
|
||||
@ -55,10 +56,9 @@ impl Config {
|
||||
ret
|
||||
}
|
||||
}
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
struct RpcPool {
|
||||
clients: Arc<Mutex<Vec<Arc<RwLock<Rpc>>>>>,
|
||||
index: Arc<RwLock<usize>>,
|
||||
clients: Vec<Arc<RwLock<Rpc>>>, // 使用本地 Vec 而不是包裹在 Arc<Mutex<>> 中
|
||||
available: Mutex<Vec<usize>>, // 跟踪可用客户端的索引
|
||||
wait: Arc<(Mutex<()>, Condvar)>,
|
||||
}
|
||||
|
||||
@ -73,67 +73,75 @@ struct Rpc {
|
||||
impl RpcPool {
|
||||
async fn new(size: usize, uri: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let mut clients = Vec::with_capacity(size);
|
||||
let mut available = Vec::with_capacity(size);
|
||||
let uri = uri.to_ascii_lowercase();
|
||||
log::info!("create rpc pool size: {}", size);
|
||||
|
||||
let shared_rt = Arc::new(
|
||||
Builder::new_multi_thread()
|
||||
.worker_threads(10)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let channel = RpcChannel::from_shared(uri)?.connect().await?;
|
||||
for _ in 0..size {
|
||||
|
||||
for i in 0..size {
|
||||
let client = LwsVfsClient::new(channel.clone());
|
||||
let (tx, rx) = mpsc::channel();
|
||||
clients.push(Arc::new(RwLock::new(Rpc {
|
||||
client,
|
||||
in_use: false,
|
||||
rt: Arc::new(
|
||||
Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.build()
|
||||
.unwrap(),
|
||||
),
|
||||
rt: shared_rt.clone(),
|
||||
tx: Arc::new(tx),
|
||||
rx: Arc::new(Mutex::new(rx)),
|
||||
})));
|
||||
available.push(i); // 所有客户端初始时都是可用的
|
||||
}
|
||||
|
||||
let wait = (Mutex::new(()), Condvar::new());
|
||||
Ok(Self {
|
||||
clients: Arc::new(Mutex::new(clients)),
|
||||
index: Arc::new(RwLock::new(0)),
|
||||
clients,
|
||||
available: Mutex::new(available),
|
||||
wait: Arc::new(wait),
|
||||
})
|
||||
}
|
||||
|
||||
fn get(&self) -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>> {
|
||||
// TODO: 优化遍历的逻辑
|
||||
let find_idle = |rpc_pool: &Self| -> Result<Arc<RwLock<Rpc>>, Box<dyn Error>> {
|
||||
let cs = self.clients.clone();
|
||||
let clients = &(cs.lock().unwrap());
|
||||
let size = clients.len();
|
||||
let index = *rpc_pool.index.read().unwrap();
|
||||
for i in 0..size {
|
||||
let idx = (index + i) % size;
|
||||
let entry = clients[idx].read().unwrap();
|
||||
if !entry.in_use {
|
||||
drop(entry);
|
||||
let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> =
|
||||
clients[idx].write().unwrap();
|
||||
entry.in_use = true;
|
||||
return Ok(clients[idx].clone());
|
||||
const MAX_RETRIES: usize = 30;
|
||||
|
||||
for _ in 0..MAX_RETRIES {
|
||||
// 尝试获取可用客户端
|
||||
let client_opt = {
|
||||
let mut available = self.available.lock();
|
||||
if let Some(idx) = available.pop() {
|
||||
let client = self.clients[idx].clone();
|
||||
{
|
||||
// 在单独的作用域中借用 client
|
||||
let mut rpc = client.write();
|
||||
rpc.in_use = true;
|
||||
}
|
||||
Some(client) // 现在可以安全地移动 client
|
||||
} else {
|
||||
// 如果无法获取可用客户端,返回 None
|
||||
None
|
||||
}
|
||||
Err(Box::new(IoError::new(
|
||||
ErrorKind::Other,
|
||||
"get free rpc client fail",
|
||||
)))
|
||||
};
|
||||
loop {
|
||||
if let Ok(rpc) = find_idle(self) {
|
||||
return Ok(rpc);
|
||||
|
||||
if let Some(client) = client_opt {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
// 等待通知
|
||||
let (mutex, cond) = &(*self.wait.clone());
|
||||
let timeout = Duration::from_millis(100);
|
||||
let (_unused, timeout_res) = cond.wait_timeout(mutex.lock().unwrap(), timeout).unwrap();
|
||||
if timeout_res.timed_out() {
|
||||
break;
|
||||
let mut guard = mutex.lock();
|
||||
let timeout = Duration::from_millis(50);
|
||||
let result = cond.wait_for(&mut guard, timeout);
|
||||
if !result.timed_out() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
log::warn!("get free rpc client time out");
|
||||
Err(Box::new(IoError::new(
|
||||
ErrorKind::Other,
|
||||
@ -142,19 +150,27 @@ impl RpcPool {
|
||||
}
|
||||
|
||||
fn put(&self, client: Arc<RwLock<Rpc>>) {
|
||||
let mut i = 0;
|
||||
for entry in self.clients.lock().unwrap().iter_mut() {
|
||||
if Arc::ptr_eq(&entry, &client) {
|
||||
let mut item = entry.write().unwrap();
|
||||
item.in_use = false;
|
||||
*self.index.write().unwrap() = i;
|
||||
let (mutex, cond) = &(*self.wait.clone());
|
||||
let _unused = mutex.lock().unwrap();
|
||||
// 找到客户端在数组中的索引
|
||||
for (idx, c) in self.clients.iter().enumerate() {
|
||||
if Arc::ptr_eq(c, &client) {
|
||||
// 设置客户端为可用
|
||||
let mut entry = c.write();
|
||||
entry.in_use = false;
|
||||
|
||||
// 将索引添加到可用列表
|
||||
{
|
||||
let mut available = self.available.lock();
|
||||
available.push(idx);
|
||||
}
|
||||
|
||||
// 通知等待的线程
|
||||
let (_mutex, cond) = &(*self.wait.clone());
|
||||
cond.notify_one();
|
||||
break;
|
||||
return;
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
log::warn!("Attempted to return unknown RPC client to pool");
|
||||
}
|
||||
}
|
||||
|
||||
@ -163,10 +179,11 @@ struct VirFs {
|
||||
stat: lws_client::Fstat,
|
||||
sub: HashMap<String, VirFs>,
|
||||
is_root: bool,
|
||||
is_link_remote: bool
|
||||
}
|
||||
|
||||
impl VirFs {
|
||||
fn create<T>(name: T, is_dir: bool) -> VirFs
|
||||
fn create<T>(name: T, is_dir: bool, is_link_remote:bool) -> VirFs
|
||||
where
|
||||
T: ToString,
|
||||
{
|
||||
@ -199,15 +216,17 @@ impl VirFs {
|
||||
},
|
||||
sub: HashMap::new(),
|
||||
is_root: true,
|
||||
is_link_remote,
|
||||
}
|
||||
}
|
||||
pub fn new() -> VirFs {
|
||||
let mut node = VirFs::create("/", true);
|
||||
let mut node = VirFs::create("/", true, false);
|
||||
node.is_root = true;
|
||||
node
|
||||
}
|
||||
|
||||
pub fn add<T>(&mut self, name: T)
|
||||
// 添加子目录
|
||||
pub fn add<T>(&mut self, name: T, is_link_remote: bool)
|
||||
where
|
||||
T: ToString,
|
||||
{
|
||||
@ -219,7 +238,7 @@ impl VirFs {
|
||||
.collect::<Vec<&str>>();
|
||||
let name = ret[0];
|
||||
log::debug!("add fs {}", name);
|
||||
let mut node = VirFs::create(name.to_string(), true);
|
||||
let mut node = VirFs::create(name.to_string(), true, is_link_remote);
|
||||
node.is_root = false;
|
||||
self.sub.insert(name.to_string(), node);
|
||||
}
|
||||
@ -229,34 +248,22 @@ impl VirFs {
|
||||
T: ToString,
|
||||
{
|
||||
let name = name.to_string();
|
||||
let mut vname = name.split('/').filter(|x| x.len() > 0);
|
||||
//log::info!("try match {}", name);
|
||||
if let Some(_curr) = vname.next() {
|
||||
return None;
|
||||
}
|
||||
Some(self)
|
||||
//self.match_path(&mut vname)
|
||||
let mut vname = name.split('/').filter(|x| !x.is_empty());
|
||||
self._match_path(&mut vname)
|
||||
}
|
||||
|
||||
pub fn match_path<'a>(&self, name_list: &mut impl Iterator<Item = &'a str>) -> Option<&VirFs> {
|
||||
pub fn _match_path<'a>(&self, name_list: &mut impl Iterator<Item = &'a str>) -> Option<&VirFs> {
|
||||
if let Some(current) = name_list.next() {
|
||||
match self.sub.get(current) {
|
||||
Some(v) => {
|
||||
// try match sub dir
|
||||
if let Some(sub) = v.match_path(name_list) {
|
||||
Some(sub)
|
||||
// sub dirs not match
|
||||
self.sub.get(current).and_then(|v| v._match_path(name_list))
|
||||
} else {
|
||||
// 如果是链接到远程的虚拟目录,则不匹配,需要由远端获取属性
|
||||
if self.is_link_remote {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
// find over
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn readdir(&self) -> Vec<String> {
|
||||
let mut ret = vec![];
|
||||
@ -283,9 +290,12 @@ fn current_time_in_ms() -> u32 {
|
||||
}
|
||||
impl FileAttrCache {
|
||||
pub fn new(attr: FileAttrInfo, cache_life: u32) -> FileAttrCache {
|
||||
let now = current_time_in_ms();
|
||||
let exp_time = now + cache_life;
|
||||
log::trace!("Creating cache, current time: {}ms, expiration time: {}ms, lifetime: {}ms", now, exp_time, cache_life);
|
||||
FileAttrCache {
|
||||
attr,
|
||||
exp_time: current_time_in_ms() + cache_life,
|
||||
exp_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -313,7 +323,7 @@ impl FileAttrCacheCtx {
|
||||
}
|
||||
|
||||
pub struct FileAttrCacheManager {
|
||||
handle: thread::JoinHandle<()>,
|
||||
_handle: thread::JoinHandle<()>,
|
||||
tx: mpsc::Sender<FileAttrCacheMsg>,
|
||||
}
|
||||
|
||||
@ -322,16 +332,16 @@ impl FileAttrCacheManager {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cache_ctx = FileAttrCacheCtx::new(rx);
|
||||
let handle = thread::spawn(move || {
|
||||
Self::cache_manager(cache_ctx, cache_life);
|
||||
Self::cache_manager_thread(cache_ctx, cache_life);
|
||||
});
|
||||
FileAttrCacheManager { handle, tx }
|
||||
FileAttrCacheManager { _handle:handle, tx }
|
||||
}
|
||||
|
||||
fn cache_manager(mut ctx: FileAttrCacheCtx, cache_life: u32) {
|
||||
let rx = ctx.rx.lock().unwrap();
|
||||
fn cache_manager_thread(mut ctx: FileAttrCacheCtx, cache_life: u32) {
|
||||
let rx = ctx.rx.lock();
|
||||
fn invalid_cache(cache: &mut HashMap<String, FileAttrCache>) -> u32 {
|
||||
let mut cache_size = 0;
|
||||
let curr = current_time_in_ms();
|
||||
let curr: u32 = current_time_in_ms();
|
||||
let del: Vec<String> = cache
|
||||
.iter()
|
||||
.filter(|&(_, val)| val.exp_time < curr)
|
||||
@ -347,6 +357,7 @@ impl FileAttrCacheManager {
|
||||
loop {
|
||||
match rx.recv_timeout(Duration::from_millis(200)) {
|
||||
Ok(req) => {
|
||||
// 只有当缓存大小超过阈值时才进行清理
|
||||
if ctx.cache_size > 256 {
|
||||
ctx.cache_size -= invalid_cache(&mut ctx.cache);
|
||||
}
|
||||
@ -357,7 +368,16 @@ impl FileAttrCacheManager {
|
||||
}
|
||||
FileAttrCacheMsg::Get(path, chn) => match ctx.cache.get(&path) {
|
||||
Some(attr) => {
|
||||
let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr)));
|
||||
// 检查缓存是否已过期
|
||||
let curr = current_time_in_ms();
|
||||
if attr.exp_time >= curr {
|
||||
let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr.clone())));
|
||||
} else {
|
||||
// 缓存已过期,移除并返回None
|
||||
ctx.cache.remove(&path);
|
||||
ctx.cache_size -= 1;
|
||||
let _ = chn.send(FileAttrCacheMsg::GetReply(path, None));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let _ = chn.send(FileAttrCacheMsg::GetReply(path, None));
|
||||
@ -373,6 +393,7 @@ impl FileAttrCacheManager {
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// 定期清理过期缓存,而不是每次都清理
|
||||
ctx.cache_size -= invalid_cache(&mut ctx.cache);
|
||||
}
|
||||
}
|
||||
@ -389,7 +410,8 @@ impl FileAttrCacheManager {
|
||||
tx: &Arc<mpsc::Sender<FileAttrCacheMsg>>,
|
||||
rx: &Arc<Mutex<mpsc::Receiver<FileAttrCacheMsg>>>,
|
||||
) -> Option<FileAttrInfo> {
|
||||
//send get gile attribute request
|
||||
//send get file attribute request
|
||||
log::trace!("Requesting cache: {}", path);
|
||||
let _ = self
|
||||
.tx
|
||||
.send(FileAttrCacheMsg::Get(path.to_string(), tx.clone()));
|
||||
@ -398,27 +420,30 @@ impl FileAttrCacheManager {
|
||||
loop {
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed >= total_timeout {
|
||||
log::debug!("Cache request timeout: {}", path);
|
||||
return None;
|
||||
}
|
||||
let timeout = total_timeout - elapsed;
|
||||
let resp = {
|
||||
match rx.lock().unwrap().recv_timeout(timeout) {
|
||||
Ok(resp) => resp,
|
||||
Err(_e) => return None,
|
||||
}
|
||||
let rx = rx.lock();
|
||||
rx.recv_timeout(timeout)
|
||||
};
|
||||
match resp {
|
||||
FileAttrCacheMsg::GetReply(rpath, resp) => {
|
||||
if *path == rpath {
|
||||
//message is not match
|
||||
return resp;
|
||||
Ok(msg) => {
|
||||
if let FileAttrCacheMsg::GetReply(_, Some(attr)) = msg {
|
||||
log::trace!("Cache hit: {}", path);
|
||||
return Some(attr);
|
||||
}
|
||||
log::trace!("Cache miss: {}", path);
|
||||
return None;
|
||||
},
|
||||
Err(_e) => {
|
||||
log::debug!("Error receiving cache response: {}", path);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn clr(&self, paths: Vec<String>) {
|
||||
let _ = self.tx.send(FileAttrCacheMsg::Clr(paths));
|
||||
@ -432,6 +457,16 @@ pub struct LwsVfsIns {
|
||||
cache: FileAttrCacheManager,
|
||||
}
|
||||
|
||||
/// 将返回值转换为 Result<(), i32> 类型,给rpc使用
|
||||
macro_rules! ret2result {
|
||||
($ret:expr) => {
|
||||
match $ret {
|
||||
0 => Ok(()),
|
||||
_ => Err($ret),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn mode_to_filetype(mode: libc::mode_t) -> FileType {
|
||||
match mode & libc::S_IFMT {
|
||||
libc::S_IFDIR => FileType::Directory,
|
||||
@ -503,7 +538,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
if let Some(attr) = self.cache.get(&path, &rpc.tx, &rpc.rx) {
|
||||
let d = client.clone();
|
||||
drop(rpc);
|
||||
@ -541,8 +576,8 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
self.cache.set(&path, Err(libc::ENOENT));
|
||||
return Err(libc::ENOENT);
|
||||
self.cache.set(&path, Err(resp.ret));
|
||||
return Err(resp.ret);
|
||||
}
|
||||
let attr = file_attr(&resp.stat.unwrap(), &req);
|
||||
self.cache.set(&path, Ok(attr.clone()));
|
||||
@ -591,7 +626,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.ftruncate(request));
|
||||
@ -606,10 +641,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Set timestamps of a filesystem entry.
|
||||
@ -651,7 +683,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.futimens(request));
|
||||
@ -666,10 +698,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
// END OF SETATTR FUNCTIONS
|
||||
@ -716,7 +745,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fmkdir(request));
|
||||
@ -732,7 +761,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
return Err(resp.ret)
|
||||
}
|
||||
self.getattr(req, Path::new(&path), None)
|
||||
}
|
||||
@ -752,7 +781,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.funlink(request));
|
||||
@ -768,10 +797,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
self.cache.clr(vec![path]);
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Remove a directory.
|
||||
@ -792,7 +818,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.frmdir(request));
|
||||
@ -807,10 +833,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Create a symbolic link.
|
||||
@ -855,7 +878,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.frename(request));
|
||||
@ -870,10 +893,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Create a hard link.
|
||||
@ -913,7 +933,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fopen(request));
|
||||
@ -929,7 +949,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
return Err(resp.ret)
|
||||
}
|
||||
let fi = resp.fi.unwrap();
|
||||
Ok((fi.fh, fi.flags))
|
||||
@ -973,7 +993,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return callback(Err(libc::ENOMSG)),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fread(request));
|
||||
@ -989,7 +1009,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get fread resp size is: {}", resp.size);
|
||||
if resp.ret != 0 {
|
||||
return callback(Err(libc::ENOMSG));
|
||||
return callback(Err(resp.ret));
|
||||
}
|
||||
callback(Ok(&resp.buff))
|
||||
}
|
||||
@ -1028,7 +1048,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fwrite(request));
|
||||
@ -1044,7 +1064,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
return Err(resp.ret)
|
||||
}
|
||||
Ok(resp.size as u32)
|
||||
}
|
||||
@ -1074,7 +1094,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fflush(request));
|
||||
@ -1089,10 +1109,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Called when an open file is closed.
|
||||
@ -1130,7 +1147,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.frelease(request));
|
||||
@ -1145,11 +1162,8 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
ret2result!(resp.ret)
|
||||
// self.cache.clr(vec![path]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write out any pending changes of a file.
|
||||
@ -1187,7 +1201,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fopendir(request));
|
||||
@ -1203,7 +1217,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
return Err(resp.ret);
|
||||
}
|
||||
Ok((resp.fi.unwrap().fh, flags))
|
||||
}
|
||||
@ -1252,7 +1266,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.freaddir(request));
|
||||
@ -1268,7 +1282,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
return Err(resp.ret)
|
||||
}
|
||||
let mut dirs = vec![];
|
||||
// let clrs: Vec<String> = resp.dirs.iter().map(|dir| dir.name.clone().into()).collect();
|
||||
@ -1306,7 +1320,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.freleasedir(request));
|
||||
@ -1321,10 +1335,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::ENOMSG);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Write out any pending changes to a directory.
|
||||
@ -1419,8 +1430,13 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
mode as u32
|
||||
}
|
||||
// log::info!("access path: {:?}, mask: {:?}", path, mask);
|
||||
let path = path.to_string_lossy().into_owned();
|
||||
if let Some(_) = self.vir_root.find(&path) {
|
||||
return Ok(());
|
||||
}
|
||||
let request = tonic::Request::new(Access {
|
||||
path: path.to_string_lossy().into_owned(),
|
||||
path: path,
|
||||
mask: mask2mode(mask as i32),
|
||||
..Access::default()
|
||||
});
|
||||
@ -1429,7 +1445,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.faccess(request));
|
||||
@ -1444,10 +1460,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
};
|
||||
log::trace!("get resp: {:?}", resp);
|
||||
if resp.ret != 0 {
|
||||
return Err(libc::EACCES);
|
||||
}
|
||||
Ok(())
|
||||
ret2result!(resp.ret)
|
||||
}
|
||||
|
||||
/// Create and open a new file.
|
||||
@ -1481,7 +1494,7 @@ impl FilesystemMT for LwsVfsIns {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(libc::EAGAIN),
|
||||
};
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.fcreate(request));
|
||||
@ -1501,15 +1514,17 @@ impl FilesystemMT for LwsVfsIns {
|
||||
}
|
||||
self.cache.clr(vec![path.to_string()]);
|
||||
let fi = resp.fi.unwrap();
|
||||
if let Ok((ttl, attr)) = self.getattr(req, Path::new(&path), Some(fi.fh)) {
|
||||
return Ok(CreatedEntry {
|
||||
match self.getattr(req, Path::new(&path), Some(fi.fh)) {
|
||||
Ok((ttl, attr)) => {
|
||||
Ok(CreatedEntry {
|
||||
fh: fi.fh,
|
||||
flags,
|
||||
attr,
|
||||
ttl,
|
||||
});
|
||||
})
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
Err(libc::ENOMSG)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1521,7 +1536,7 @@ impl LwsVfsIns {
|
||||
let config = LwsVfsIns::fetch_config(&rpcs).await?;
|
||||
let mut vir_root = VirFs::new();
|
||||
for mount in config.get_mount() {
|
||||
vir_root.add(mount);
|
||||
vir_root.add(mount, true);
|
||||
}
|
||||
Ok(LwsVfsIns {
|
||||
config,
|
||||
@ -1537,7 +1552,7 @@ impl LwsVfsIns {
|
||||
..GetConfig::default()
|
||||
});
|
||||
let client = rpcs.get().unwrap();
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rpc.get_config(request).await;
|
||||
(client.clone(), resp)
|
||||
@ -1564,7 +1579,7 @@ impl LwsVfsIns {
|
||||
});
|
||||
let (rpc, resp) = {
|
||||
let client = self.rpcs.get()?;
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let rt = rpc.rt.clone();
|
||||
let rpc = &mut (rpc.client);
|
||||
let resp = rt.block_on(rpc.say_hello(request));
|
||||
@ -1581,7 +1596,7 @@ impl LwsVfsIns {
|
||||
});
|
||||
let (rpc, resp) = {
|
||||
let client = self.rpcs.get()?;
|
||||
let mut rpc = client.write().unwrap();
|
||||
let mut rpc = client.write();
|
||||
let resp = rpc.client.say_hello(request).await?;
|
||||
(client.clone(), resp)
|
||||
};
|
||||
@ -1594,7 +1609,15 @@ impl LwsVfsIns {
|
||||
where
|
||||
F: FilesystemMT + Sync + Send + 'static,
|
||||
{
|
||||
let fuse_args = [OsStr::new("-o"), OsStr::new("fsname=lws_vfs")];
|
||||
let user = match env::var("USER") {
|
||||
Ok(user) => user,
|
||||
Err(_) => {
|
||||
log::warn!("Can not get user name, use 'unknown'");
|
||||
"unknown".to_string()
|
||||
},
|
||||
};
|
||||
let name = format!("fsname=lws_vfs@{}", user);
|
||||
let fuse_args = [OsStr::new("-o"), OsStr::new(name.as_str())];
|
||||
fuse_mt::mount(
|
||||
fuse_mt::FuseMT::new(file_system, 10),
|
||||
&mount_point,
|
||||
|
29
sscp_tool/setup.py
Normal file
29
sscp_tool/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name="sscp",
|
||||
version="0.1.0",
|
||||
description="从L3向L0传输文件的简化工具",
|
||||
author="Your Name",
|
||||
author_email="your.email@example.com",
|
||||
packages=find_packages(),
|
||||
py_modules=["sscp"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"sscp=sscp:main",
|
||||
],
|
||||
},
|
||||
classifiers=[
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.6",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
)
|
203
sscp_tool/sscp.py
Normal file
203
sscp_tool/sscp.py
Normal file
@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
sscp - 简化从L3到L0的文件传输工具
|
||||
用法: sscp <源文件/目录> [<源文件/目录>...] <目标路径>
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import subprocess
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
def parse_arguments():
|
||||
"""解析命令行参数"""
|
||||
parser = argparse.ArgumentParser(description='从L3向L0传输文件的简化工具')
|
||||
parser.add_argument('sources', nargs='+', help='源文件或目录路径')
|
||||
parser.add_argument('target', help='目标路径')
|
||||
parser.add_argument('-v', '--verbose', action='store_true', help='显示详细信息')
|
||||
parser.add_argument('-r', '--recursive', action='store_true', help='递归处理目录')
|
||||
return parser.parse_args()
|
||||
|
||||
def get_mount_points():
|
||||
"""从配置文件获取挂载点信息"""
|
||||
config_path = Path.home() / ".config" / "sscp" / "config.json"
|
||||
|
||||
if not config_path.exists():
|
||||
print(f"警告: 找不到配置文件 {config_path},将使用默认挂载点 ~/mnt")
|
||||
return {str(Path.home() / "mnt"): "/l0"}
|
||||
|
||||
with open(config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
# 从配置中提取挂载点信息
|
||||
mount_points = []
|
||||
mount_points = config['mounts'].copy()
|
||||
return mount_points
|
||||
def encode_path(path):
|
||||
"""
|
||||
对L0路径进行编码
|
||||
将 /l0x/path/to/file 编码为 l0x@path@to@file
|
||||
同时处理路径中已有的@符号
|
||||
"""
|
||||
# 先将已有的@符号替换为特殊序列
|
||||
path_escaped = path.replace('@', '@@')
|
||||
|
||||
# 将路径中的斜杠替换为@符号
|
||||
encoded_path = path_escaped.replace('/', '@')
|
||||
|
||||
# 确保路径以l0开头
|
||||
if not encoded_path.startswith('@l0'):
|
||||
raise ValueError(f"无法识别的L0路径格式: {path}")
|
||||
|
||||
# 移除开头的@
|
||||
return encoded_path[1:]
|
||||
|
||||
def create_symlink(source_path, encoded_path):
|
||||
"""创建源文件或目录到临时位置的软链接"""
|
||||
temp_dir = Path("/tmp/sscp")
|
||||
temp_dir.mkdir(exist_ok=True)
|
||||
|
||||
temp_link = temp_dir / encoded_path
|
||||
# 确保父目录存在
|
||||
temp_link.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 如果已存在,先删除
|
||||
if temp_link.exists():
|
||||
if temp_link.is_dir() and not temp_link.is_symlink():
|
||||
shutil.rmtree(temp_link)
|
||||
else:
|
||||
temp_link.unlink()
|
||||
|
||||
# 创建软链接
|
||||
os.symlink(os.path.abspath(source_path), temp_link)
|
||||
return temp_link
|
||||
|
||||
def call_ssbuild(encoded_path, verbose=False):
|
||||
"""调用ssbuild工具发送文件或目录"""
|
||||
try:
|
||||
cmd = ["ssbuild", str(encoded_path)]
|
||||
if verbose:
|
||||
print(f"执行命令: {' '.join(cmd)}")
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
return result.stdout
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"调用ssbuild失败: {e}")
|
||||
print(f"错误输出: {e.stderr}")
|
||||
sys.exit(1)
|
||||
except FileNotFoundError:
|
||||
print("错误: 找不到ssbuild命令,请确保它已安装并在PATH中")
|
||||
sys.exit(1)
|
||||
|
||||
def is_mount_path(path, mount_points):
|
||||
"""检查路径是否在挂载点中"""
|
||||
path_str = str(path)
|
||||
for mount_point in mount_points:
|
||||
if path_str.startswith(mount_point):
|
||||
return mount_point
|
||||
return None
|
||||
|
||||
def process_source(source_path, target_path, mount_points, verbose=False):
|
||||
"""处理单个源文件或目录"""
|
||||
source_path = Path(source_path).resolve()
|
||||
target_path = Path(target_path)
|
||||
|
||||
# 检查源是否存在
|
||||
if not source_path.exists():
|
||||
print(f"错误: 源 '{source_path}' 不存在")
|
||||
return False
|
||||
|
||||
# 如果目标是目录,添加源文件名
|
||||
if target_path.is_dir() or (len(mount_points) > 0 and not target_path.suffix):
|
||||
final_target = target_path / source_path.name
|
||||
else:
|
||||
final_target = target_path
|
||||
|
||||
# 检查目标路径是否在挂载点中
|
||||
mount_point = is_mount_path(final_target, mount_points)
|
||||
if not mount_point:
|
||||
print(f"错误: 目标路径 '{final_target}' 不在已知的挂载目录中")
|
||||
print(f"有效的挂载点: {', '.join(mount_points.keys())}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 从挂载路径提取L0路径部分
|
||||
target_str = str(final_target)
|
||||
relative_path = target_str[len(mount_point):]
|
||||
|
||||
# 构建完整的L0路径
|
||||
l0_path = mount_points[mount_point] + relative_path
|
||||
|
||||
# 对L0路径进行编码
|
||||
encoded_path = encode_path(l0_path)
|
||||
|
||||
if verbose:
|
||||
print(f"源路径: {source_path}")
|
||||
print(f"目标路径: {final_target}")
|
||||
print(f"挂载点: {mount_point}")
|
||||
print(f"L0路径: {l0_path}")
|
||||
print(f"编码后的路径: {encoded_path}")
|
||||
|
||||
# 创建源的软链接
|
||||
temp_link = create_symlink(source_path, encoded_path)
|
||||
|
||||
if verbose:
|
||||
print(f"创建软链接: {source_path} -> {temp_link}")
|
||||
|
||||
# 调用ssbuild工具发送文件或目录
|
||||
print(f"正在发送: {source_path} -> {final_target}")
|
||||
output = call_ssbuild(temp_link, verbose)
|
||||
|
||||
if verbose and output:
|
||||
print(f"ssbuild输出: {output}")
|
||||
|
||||
print(f"已成功传输到 {final_target}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理 {source_path} 时出错: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
args = parse_arguments()
|
||||
|
||||
# 获取挂载点信息
|
||||
mount_points = get_mount_points()
|
||||
|
||||
if args.verbose:
|
||||
print(f"已识别的挂载点: {mount_points}")
|
||||
|
||||
# 最后一个参数是目标路径
|
||||
target = args.sources.pop()
|
||||
sources = args.sources
|
||||
|
||||
if not sources:
|
||||
print("错误: 至少需要一个源文件或目录")
|
||||
sys.exit(1)
|
||||
|
||||
# 处理每个源文件或目录
|
||||
success_count = 0
|
||||
for source in sources:
|
||||
if process_source(source, target, mount_points, args.verbose):
|
||||
success_count += 1
|
||||
|
||||
# 报告结果
|
||||
if success_count == len(sources):
|
||||
print(f"所有 {len(sources)} 个文件/目录已成功传输")
|
||||
else:
|
||||
print(f"传输完成: {success_count}/{len(sources)} 个文件/目录成功")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user