From acece94fae4775a361e4f4d0e33b6ff8e00f9051 Mon Sep 17 00:00:00 2001 From: "ekko.bao" Date: Mon, 31 Mar 2025 10:46:21 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=96=B0=E5=A2=9E=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=AF=B9=E4=BA=8E=E9=A9=B1=E5=8A=A8=E5=99=A8=E6=98=BE=E7=A4=BA?= =?UTF-8?q?=E5=90=8D=E7=A7=B0=E7=9A=84=E8=8E=B7=E5=8F=96=EF=BC=8C=E9=80=9A?= =?UTF-8?q?=E8=BF=87=E6=98=BE=E7=A4=BA=E5=90=8D=E7=A7=B0=E7=9A=84=E5=8C=B9?= =?UTF-8?q?=E9=85=8D=E6=9B=B4=E5=8A=A0=E7=9B=B4=E8=A7=82=202.=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96rpc=20pool=E7=9A=84=E4=BD=BF=E7=94=A8=E3=80=82=203.=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BF=9D=E5=AD=98=E9=85=8D=E7=BD=AE=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E7=BB=99=E5=85=B6=E4=BB=96=E7=9A=84=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 197 ++++++++++++++++++++++++++++++---- Cargo.toml | 8 +- config.json | 12 +-- src/client.rs | 100 +++++++++++++++-- src/lib.rs | 259 +++++++++++++++++++++++++-------------------- sscp_tool/setup.py | 29 +++++ sscp_tool/sscp.py | 203 +++++++++++++++++++++++++++++++++++ 7 files changed, 657 insertions(+), 151 deletions(-) create mode 100644 sscp_tool/setup.py create mode 100644 sscp_tool/sscp.py diff --git a/Cargo.lock b/Cargo.lock index e81119c..3822f1b 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,6 +190,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clap" version = "3.2.25" @@ -214,6 +220,26 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "either" version = "1.13.0" @@ -526,36 +552,38 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" -[[package]] -name = "lws_client" -version = "0.1.0" -dependencies = [ - "clap", - "env_logger", - "fuse_mt", - "hashbrown 0.9.1", - "libc", - "log", - "prost", - "serde", - "serde_json", - "tokio", - "tonic", - "tonic-build", -] - [[package]] name = "matchit" version = "0.7.3" @@ -600,6 +628,18 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -641,6 +681,29 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -812,6 +875,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.10.5" @@ -872,6 +955,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.204" @@ -903,6 +992,25 @@ dependencies = [ "serde", ] +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -928,6 +1036,28 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "sstar_lws_vfs" +version = "0.1.0" +dependencies = [ + "clap", + "dirs", + "env_logger", + "fuse_mt", + "hashbrown 0.9.1", + "libc", + "log", + "nix", + "parking_lot", + "prost", + "serde", + "serde_json", + "signal-hook", + "tokio", + "tonic", + "tonic-build", +] + [[package]] name = "strsim" version = "0.10.0" @@ -978,6 +1108,26 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" +[[package]] +name = "thiserror" +version = "1.0.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -1201,7 +1351,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1228,6 +1378,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 0cbc565..4104b18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "lws_client" +name = "sstar_lws_vfs" version = "0.1.0" edition = "2021" @@ -7,6 +7,10 @@ edition = "2021" name = "lws_vfs_client" path = "src/client.rs" +# [[bin]] +# name = "sscp" +# path = "src/sscp.rs" + [dependencies] tonic = "0.11" prost = "0.12" @@ -21,6 +25,8 @@ env_logger = "0.8" clap = "3.0" nix = { version = "0.29.0", features = ["process", "feature"]} signal-hook = "0.3.17" +parking_lot = "0.12" +dirs = "4.0" [build-dependencies] tonic-build = "0.11" diff --git a/config.json b/config.json index 6a2f901..e4f1e1d 100755 --- a/config.json +++ b/config.json @@ -1,9 +1,9 @@ { "mount": { - "c:\\": "/l3c", - "d:\\": "/l0d", - "f:\\": "/l0e" + "系统": "/l3c", + "ekko.bao_ubuntu (\\\\192.168.0.112)": "/l0d", + "softerware": "/l0e", + "nas (\\\\192.168.0.117)": "/l0v" }, - "port": 7412, - "addr": "192.168.0.111" -} + "port": 33444 +} \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 50a1f9f..3466824 100755 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,18 @@ use clap::{App, Arg}; -use lws_client::LwsVfsIns; +use sstar_lws_vfs::LwsVfsIns; use std::{process, thread}; use std::process::Command; extern crate log; use nix::unistd::{fork, ForkResult, getpid, getppid}; -use signal_hook::consts::{SIGCHLD, SIGINT}; -use signal_hook::iterator::Signals; +// use signal_hook::consts::{SIGCHLD, SIGINT, SIGTERM}; +// use signal_hook::iterator::Signals; use std::env; +use libc::prctl; +use std::time::Duration; +use std::fs::{File, create_dir_all}; +use std::io::Write; +use serde::{Serialize, Deserialize}; +use dirs; const DEFAULT_PORT: u16 = 33444; const DEFAULT_CACHE_LIFE: u32 = 10_000; @@ -47,7 +53,7 @@ fn param_parser() -> (String, String, u32) { .max_values(1) .value_name("mount point") .required(true) - .help("mount point, eg: ~/mnt"), + .help("mount point, eg: ~/mnt"), ) .arg( Arg::with_name("c") @@ -117,19 +123,87 @@ fn umount_point(mount: &String) { } fn set_cleanup(mount: &String) { // 捕获 SIGCHLD 信号 - log::info!("waiting for SIGCHLD SIGINT signal"); - let mut signals = Signals::new(&[SIGCHLD, SIGINT]).expect("Error creating signals"); - for _ in signals.forever() { - log::info!("Catch SIGCHLD||SIGINT, exit...."); - umount_point(&mount); - break; - } + // log::info!("waiting for SIGCHLD SIGINT signal"); + // let mut signals = Signals::new(&[SIGCHLD, SIGINT, SIGTERM]).expect("Error creating signals"); + let mout_point = mount.clone(); + // 子线程监控父进程是否退出 + let th: thread::JoinHandle<()> = thread::spawn( move || { + let lws_client_pid = getppid(); + loop { + let ppid = getppid(); + if 1 == ppid.as_raw() { + log::info!("lws_client({}) has been exited! try cleanup", lws_client_pid); + umount_point(&mout_point); + break; + } + thread::sleep(Duration::from_micros(100)); + } + }); + // for sig in signals.forever() { + // match sig { + // SIGCHLD | SIGINT | SIGTERM=> { + // log::info!("Catch SIGCHLD|SIGINT|SIGTERM signal"); + // umount_point(&mount); + // break; + // } + // _ => { + // // 处理其他信号 + // log::info!("Catch signal {} ....", sig); + // } + // } + // } + th.join().unwrap(); +} + +#[derive(Serialize, Deserialize)] +struct LwsConfig { + server: String, + mount_point: String, + cache_life: u32, +} + +fn save_config(server: &str, mount_point: &str, cache_life: u32) -> Result<(), Box> { + // Get the user's home directory using dirs crate + let home_dir = match dirs::home_dir() { + Some(path) => path, + None => return Err("Unable to get the user's home directory".into()), + }; + // Create the configuration directory + let config_dir = home_dir.join(".config").join("lws_client"); + create_dir_all(&config_dir)?; + + // Create the configuration file + let config_file = config_dir.join("config.json"); + + // Prepare the configuration data + let config = LwsConfig { + server: server.to_string(), + mount_point: mount_point.to_string(), + cache_life, + }; + + // Serialize to JSON + let config_json = serde_json::to_string_pretty(&config)?; + + // Write to the file + let mut file = File::create(config_file)?; + file.write_all(config_json.as_bytes())?; + + log::info!("Configuration saved to ~/.config/lws_client/config.json"); + + Ok(()) } #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); let (server, mount_point, cache_life) = param_parser(); + + // Save the configuration + if let Err(e) = save_config(&server, &mount_point, cache_life) { + log::warn!("Failed to save configuration: {:?}", e); + } + let lws_ins = match LwsVfsIns::new(&server, cache_life).await { Ok(ins) => ins, Err(e) => { @@ -178,7 +252,11 @@ fn child_process(mount: &String) { let pid = getpid(); let ppid = getppid(); log::info!("parent: {}, child: {}", ppid, pid); + unsafe { + prctl(libc::PR_SET_NAME, b"lws_monitor\0".as_ptr() as usize, 0, 0, 0); + } set_cleanup(mount); + process::exit(0); } Err(err) => { log::error!("fork fail: {}", err); diff --git a/src/lib.rs b/src/lib.rs index 5b526e4..177d105 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ use std::thread::{self}; use std::{env, vec}; use tokio::runtime::{Builder, Runtime}; use tonic::transport::Channel as RpcChannel; +use std::sync::Arc; +use parking_lot::{Mutex, RwLock, Condvar}; extern crate log; pub mod lws_client { @@ -54,10 +56,9 @@ impl Config { ret } } -use std::sync::{Arc, Condvar, Mutex, RwLock}; struct RpcPool { - clients: Arc>>>>, - index: Arc>, + clients: Vec>>, // 使用本地 Vec 而不是包裹在 Arc> 中 + available: Mutex>, // 跟踪可用客户端的索引 wait: Arc<(Mutex<()>, Condvar)>, } @@ -72,67 +73,75 @@ struct Rpc { impl RpcPool { async fn new(size: usize, uri: String) -> Result> { let mut clients = Vec::with_capacity(size); + let mut available = Vec::with_capacity(size); let uri = uri.to_ascii_lowercase(); log::info!("create rpc pool size: {}", size); + + let shared_rt = Arc::new( + Builder::new_multi_thread() + .worker_threads(10) + .build() + .unwrap(), + ); + let channel = RpcChannel::from_shared(uri)?.connect().await?; - for _ in 0..size { + + for i in 0..size { let client = LwsVfsClient::new(channel.clone()); let (tx, rx) = mpsc::channel(); clients.push(Arc::new(RwLock::new(Rpc { client, in_use: false, - rt: Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .build() - .unwrap(), - ), + rt: shared_rt.clone(), tx: Arc::new(tx), rx: Arc::new(Mutex::new(rx)), }))); + available.push(i); // 所有客户端初始时都是可用的 } + let wait = (Mutex::new(()), Condvar::new()); Ok(Self { - clients: Arc::new(Mutex::new(clients)), - index: Arc::new(RwLock::new(0)), + clients, + available: Mutex::new(available), wait: Arc::new(wait), }) } fn get(&self) -> Result>, Box> { - // TODO: 优化遍历的逻辑 - let find_idle = |rpc_pool: &Self| -> Result>, Box> { - let cs = self.clients.clone(); - let clients = &(cs.lock().unwrap()); - let size = clients.len(); - let index = *rpc_pool.index.read().unwrap(); - for i in 0..size { - let idx = (index + i) % size; - let entry = clients[idx].read().unwrap(); - if !entry.in_use { - drop(entry); - let mut entry: std::sync::RwLockWriteGuard<'_, Rpc> = - clients[idx].write().unwrap(); - entry.in_use = true; - return Ok(clients[idx].clone()); + const MAX_RETRIES: usize = 30; + + for _ in 0..MAX_RETRIES { + // 尝试获取可用客户端 + let client_opt = { + let mut available = self.available.lock(); + if let Some(idx) = available.pop() { + let client = self.clients[idx].clone(); + { + // 在单独的作用域中借用 client + let mut rpc = client.write(); + rpc.in_use = true; + } + Some(client) // 现在可以安全地移动 client + } else { + // 如果无法获取可用客户端,返回 None + None } + }; + + if let Some(client) = client_opt { + return Ok(client); } - Err(Box::new(IoError::new( - ErrorKind::Other, - "get free rpc client fail", - ))) - }; - loop { - if let Ok(rpc) = find_idle(self) { - return Ok(rpc); - } + + // 等待通知 let (mutex, cond) = &(*self.wait.clone()); - let timeout = Duration::from_millis(100); - let (_unused, timeout_res) = cond.wait_timeout(mutex.lock().unwrap(), timeout).unwrap(); - if timeout_res.timed_out() { - break; + let mut guard = mutex.lock(); + let timeout = Duration::from_millis(50); + let result = cond.wait_for(&mut guard, timeout); + if !result.timed_out() { + continue; } } + log::warn!("get free rpc client time out"); Err(Box::new(IoError::new( ErrorKind::Other, @@ -141,19 +150,27 @@ impl RpcPool { } fn put(&self, client: Arc>) { - let mut i = 0; - for entry in self.clients.lock().unwrap().iter_mut() { - if Arc::ptr_eq(&entry, &client) { - let mut item = entry.write().unwrap(); - item.in_use = false; - *self.index.write().unwrap() = i; - let (mutex, cond) = &(*self.wait.clone()); - let _unused = mutex.lock().unwrap(); + // 找到客户端在数组中的索引 + for (idx, c) in self.clients.iter().enumerate() { + if Arc::ptr_eq(c, &client) { + // 设置客户端为可用 + let mut entry = c.write(); + entry.in_use = false; + + // 将索引添加到可用列表 + { + let mut available = self.available.lock(); + available.push(idx); + } + + // 通知等待的线程 + let (_mutex, cond) = &(*self.wait.clone()); cond.notify_one(); - break; + return; } - i += 1; } + + log::warn!("Attempted to return unknown RPC client to pool"); } } @@ -162,10 +179,11 @@ struct VirFs { stat: lws_client::Fstat, sub: HashMap, is_root: bool, + is_link_remote: bool } impl VirFs { - fn create(name: T, is_dir: bool) -> VirFs + fn create(name: T, is_dir: bool, is_link_remote:bool) -> VirFs where T: ToString, { @@ -198,15 +216,17 @@ impl VirFs { }, sub: HashMap::new(), is_root: true, + is_link_remote, } } pub fn new() -> VirFs { - let mut node = VirFs::create("/", true); + let mut node = VirFs::create("/", true, false); node.is_root = true; node } - pub fn add(&mut self, name: T) + // 添加子目录 + pub fn add(&mut self, name: T, is_link_remote: bool) where T: ToString, { @@ -218,7 +238,7 @@ impl VirFs { .collect::>(); let name = ret[0]; log::debug!("add fs {}", name); - let mut node = VirFs::create(name.to_string(), true); + let mut node = VirFs::create(name.to_string(), true, is_link_remote); node.is_root = false; self.sub.insert(name.to_string(), node); } @@ -228,32 +248,20 @@ impl VirFs { T: ToString, { let name = name.to_string(); - let mut vname = name.split('/').filter(|x| x.len() > 0); - //log::info!("try match {}", name); - if let Some(_curr) = vname.next() { - return None; - } - Some(self) - //self.match_path(&mut vname) + let mut vname = name.split('/').filter(|x| !x.is_empty()); + self._match_path(&mut vname) } pub fn _match_path<'a>(&self, name_list: &mut impl Iterator) -> Option<&VirFs> { if let Some(current) = name_list.next() { - match self.sub.get(current) { - Some(v) => { - // try match sub dir - if let Some(sub) = v._match_path(name_list) { - Some(sub) - // sub dirs not match - } else { - None - } - } - None => None, - } + self.sub.get(current).and_then(|v| v._match_path(name_list)) } else { - // find over - Some(self) + // 如果是链接到远程的虚拟目录,则不匹配,需要由远端获取属性 + if self.is_link_remote { + None + } else { + Some(self) + } } } @@ -282,9 +290,12 @@ fn current_time_in_ms() -> u32 { } impl FileAttrCache { pub fn new(attr: FileAttrInfo, cache_life: u32) -> FileAttrCache { + let now = current_time_in_ms(); + let exp_time = now + cache_life; + log::trace!("Creating cache, current time: {}ms, expiration time: {}ms, lifetime: {}ms", now, exp_time, cache_life); FileAttrCache { attr, - exp_time: current_time_in_ms() + cache_life, + exp_time, } } } @@ -321,16 +332,16 @@ impl FileAttrCacheManager { let (tx, rx) = mpsc::channel(); let cache_ctx = FileAttrCacheCtx::new(rx); let handle = thread::spawn(move || { - Self::cache_manager(cache_ctx, cache_life); + Self::cache_manager_thread(cache_ctx, cache_life); }); FileAttrCacheManager { _handle:handle, tx } } - fn cache_manager(mut ctx: FileAttrCacheCtx, cache_life: u32) { - let rx = ctx.rx.lock().unwrap(); + fn cache_manager_thread(mut ctx: FileAttrCacheCtx, cache_life: u32) { + let rx = ctx.rx.lock(); fn invalid_cache(cache: &mut HashMap) -> u32 { let mut cache_size = 0; - let curr = current_time_in_ms(); + let curr: u32 = current_time_in_ms(); let del: Vec = cache .iter() .filter(|&(_, val)| val.exp_time < curr) @@ -346,6 +357,7 @@ impl FileAttrCacheManager { loop { match rx.recv_timeout(Duration::from_millis(200)) { Ok(req) => { + // 只有当缓存大小超过阈值时才进行清理 if ctx.cache_size > 256 { ctx.cache_size -= invalid_cache(&mut ctx.cache); } @@ -356,7 +368,16 @@ impl FileAttrCacheManager { } FileAttrCacheMsg::Get(path, chn) => match ctx.cache.get(&path) { Some(attr) => { - let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr))); + // 检查缓存是否已过期 + let curr = current_time_in_ms(); + if attr.exp_time >= curr { + let _ = chn.send(FileAttrCacheMsg::GetReply(path, Some(attr.attr.clone()))); + } else { + // 缓存已过期,移除并返回None + ctx.cache.remove(&path); + ctx.cache_size -= 1; + let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); + } } None => { let _ = chn.send(FileAttrCacheMsg::GetReply(path, None)); @@ -372,6 +393,7 @@ impl FileAttrCacheManager { } } Err(_) => { + // 定期清理过期缓存,而不是每次都清理 ctx.cache_size -= invalid_cache(&mut ctx.cache); } } @@ -388,7 +410,8 @@ impl FileAttrCacheManager { tx: &Arc>, rx: &Arc>>, ) -> Option { - //send get gile attribute request + //send get file attribute request + log::trace!("Requesting cache: {}", path); let _ = self .tx .send(FileAttrCacheMsg::Get(path.to_string(), tx.clone())); @@ -397,26 +420,29 @@ impl FileAttrCacheManager { loop { let elapsed = start.elapsed(); if elapsed >= total_timeout { + log::debug!("Cache request timeout: {}", path); return None; } let timeout = total_timeout - elapsed; let resp = { - match rx.lock().unwrap().recv_timeout(timeout) { - Ok(resp) => resp, - Err(_e) => return None, - } + let rx = rx.lock(); + rx.recv_timeout(timeout) }; match resp { - FileAttrCacheMsg::GetReply(rpath, resp) => { - if *path == rpath { - //message is not match - return resp; + Ok(msg) => { + if let FileAttrCacheMsg::GetReply(_, Some(attr)) = msg { + log::trace!("Cache hit: {}", path); + return Some(attr); } + log::trace!("Cache miss: {}", path); + return None; + }, + Err(_e) => { + log::debug!("Error receiving cache response: {}", path); + return None; } - _ => break, } } - None } pub fn clr(&self, paths: Vec) { @@ -512,7 +538,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); if let Some(attr) = self.cache.get(&path, &rpc.tx, &rpc.rx) { let d = client.clone(); drop(rpc); @@ -600,7 +626,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.ftruncate(request)); @@ -657,7 +683,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.futimens(request)); @@ -719,7 +745,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fmkdir(request)); @@ -755,7 +781,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.funlink(request)); @@ -792,7 +818,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.frmdir(request)); @@ -852,7 +878,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.frename(request)); @@ -907,7 +933,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fopen(request)); @@ -967,7 +993,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return callback(Err(libc::ENOMSG)), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fread(request)); @@ -1022,7 +1048,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fwrite(request)); @@ -1068,7 +1094,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fflush(request)); @@ -1121,7 +1147,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.frelease(request)); @@ -1175,7 +1201,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fopendir(request)); @@ -1240,7 +1266,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.freaddir(request)); @@ -1294,7 +1320,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.freleasedir(request)); @@ -1404,8 +1430,13 @@ impl FilesystemMT for LwsVfsIns { } mode as u32 } + // log::info!("access path: {:?}, mask: {:?}", path, mask); + let path = path.to_string_lossy().into_owned(); + if let Some(_) = self.vir_root.find(&path) { + return Ok(()); + } let request = tonic::Request::new(Access { - path: path.to_string_lossy().into_owned(), + path: path, mask: mask2mode(mask as i32), ..Access::default() }); @@ -1414,7 +1445,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.faccess(request)); @@ -1463,7 +1494,7 @@ impl FilesystemMT for LwsVfsIns { Ok(c) => c, Err(_) => return Err(libc::EAGAIN), }; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.fcreate(request)); @@ -1505,7 +1536,7 @@ impl LwsVfsIns { let config = LwsVfsIns::fetch_config(&rpcs).await?; let mut vir_root = VirFs::new(); for mount in config.get_mount() { - vir_root.add(mount); + vir_root.add(mount, true); } Ok(LwsVfsIns { config, @@ -1521,7 +1552,7 @@ impl LwsVfsIns { ..GetConfig::default() }); let client = rpcs.get().unwrap(); - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rpc = &mut (rpc.client); let resp = rpc.get_config(request).await; (client.clone(), resp) @@ -1548,7 +1579,7 @@ impl LwsVfsIns { }); let (rpc, resp) = { let client = self.rpcs.get()?; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let rt = rpc.rt.clone(); let rpc = &mut (rpc.client); let resp = rt.block_on(rpc.say_hello(request)); @@ -1565,7 +1596,7 @@ impl LwsVfsIns { }); let (rpc, resp) = { let client = self.rpcs.get()?; - let mut rpc = client.write().unwrap(); + let mut rpc = client.write(); let resp = rpc.client.say_hello(request).await?; (client.clone(), resp) }; diff --git a/sscp_tool/setup.py b/sscp_tool/setup.py new file mode 100644 index 0000000..ad0ee93 --- /dev/null +++ b/sscp_tool/setup.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from setuptools import setup, find_packages + +setup( + name="sscp", + version="0.1.0", + description="从L3向L0传输文件的简化工具", + author="Your Name", + author_email="your.email@example.com", + packages=find_packages(), + py_modules=["sscp"], + entry_points={ + "console_scripts": [ + "sscp=sscp:main", + ], + }, + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + ], + python_requires=">=3.6", +) \ No newline at end of file diff --git a/sscp_tool/sscp.py b/sscp_tool/sscp.py new file mode 100644 index 0000000..3df7334 --- /dev/null +++ b/sscp_tool/sscp.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +sscp - 简化从L3到L0的文件传输工具 +用法: sscp <源文件/目录> [<源文件/目录>...] <目标路径> +""" + +import os +import sys +import json +import argparse +import subprocess +import re +import shutil +from pathlib import Path + +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description='从L3向L0传输文件的简化工具') + parser.add_argument('sources', nargs='+', help='源文件或目录路径') + parser.add_argument('target', help='目标路径') + parser.add_argument('-v', '--verbose', action='store_true', help='显示详细信息') + parser.add_argument('-r', '--recursive', action='store_true', help='递归处理目录') + return parser.parse_args() + +def get_mount_points(): + """从配置文件获取挂载点信息""" + config_path = Path.home() / ".config" / "sscp" / "config.json" + + if not config_path.exists(): + print(f"警告: 找不到配置文件 {config_path},将使用默认挂载点 ~/mnt") + return {str(Path.home() / "mnt"): "/l0"} + + with open(config_path, 'r') as f: + config = json.load(f) + + # 从配置中提取挂载点信息 + mount_points = [] + mount_points = config['mounts'].copy() + return mount_points +def encode_path(path): + """ + 对L0路径进行编码 + 将 /l0x/path/to/file 编码为 l0x@path@to@file + 同时处理路径中已有的@符号 + """ + # 先将已有的@符号替换为特殊序列 + path_escaped = path.replace('@', '@@') + + # 将路径中的斜杠替换为@符号 + encoded_path = path_escaped.replace('/', '@') + + # 确保路径以l0开头 + if not encoded_path.startswith('@l0'): + raise ValueError(f"无法识别的L0路径格式: {path}") + + # 移除开头的@ + return encoded_path[1:] + +def create_symlink(source_path, encoded_path): + """创建源文件或目录到临时位置的软链接""" + temp_dir = Path("/tmp/sscp") + temp_dir.mkdir(exist_ok=True) + + temp_link = temp_dir / encoded_path + # 确保父目录存在 + temp_link.parent.mkdir(parents=True, exist_ok=True) + + # 如果已存在,先删除 + if temp_link.exists(): + if temp_link.is_dir() and not temp_link.is_symlink(): + shutil.rmtree(temp_link) + else: + temp_link.unlink() + + # 创建软链接 + os.symlink(os.path.abspath(source_path), temp_link) + return temp_link + +def call_ssbuild(encoded_path, verbose=False): + """调用ssbuild工具发送文件或目录""" + try: + cmd = ["ssbuild", str(encoded_path)] + if verbose: + print(f"执行命令: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + check=True, + capture_output=True, + text=True + ) + return result.stdout + except subprocess.CalledProcessError as e: + print(f"调用ssbuild失败: {e}") + print(f"错误输出: {e.stderr}") + sys.exit(1) + except FileNotFoundError: + print("错误: 找不到ssbuild命令,请确保它已安装并在PATH中") + sys.exit(1) + +def is_mount_path(path, mount_points): + """检查路径是否在挂载点中""" + path_str = str(path) + for mount_point in mount_points: + if path_str.startswith(mount_point): + return mount_point + return None + +def process_source(source_path, target_path, mount_points, verbose=False): + """处理单个源文件或目录""" + source_path = Path(source_path).resolve() + target_path = Path(target_path) + + # 检查源是否存在 + if not source_path.exists(): + print(f"错误: 源 '{source_path}' 不存在") + return False + + # 如果目标是目录,添加源文件名 + if target_path.is_dir() or (len(mount_points) > 0 and not target_path.suffix): + final_target = target_path / source_path.name + else: + final_target = target_path + + # 检查目标路径是否在挂载点中 + mount_point = is_mount_path(final_target, mount_points) + if not mount_point: + print(f"错误: 目标路径 '{final_target}' 不在已知的挂载目录中") + print(f"有效的挂载点: {', '.join(mount_points.keys())}") + return False + + try: + # 从挂载路径提取L0路径部分 + target_str = str(final_target) + relative_path = target_str[len(mount_point):] + + # 构建完整的L0路径 + l0_path = mount_points[mount_point] + relative_path + + # 对L0路径进行编码 + encoded_path = encode_path(l0_path) + + if verbose: + print(f"源路径: {source_path}") + print(f"目标路径: {final_target}") + print(f"挂载点: {mount_point}") + print(f"L0路径: {l0_path}") + print(f"编码后的路径: {encoded_path}") + + # 创建源的软链接 + temp_link = create_symlink(source_path, encoded_path) + + if verbose: + print(f"创建软链接: {source_path} -> {temp_link}") + + # 调用ssbuild工具发送文件或目录 + print(f"正在发送: {source_path} -> {final_target}") + output = call_ssbuild(temp_link, verbose) + + if verbose and output: + print(f"ssbuild输出: {output}") + + print(f"已成功传输到 {final_target}") + return True + + except Exception as e: + print(f"处理 {source_path} 时出错: {e}") + return False + +def main(): + args = parse_arguments() + + # 获取挂载点信息 + mount_points = get_mount_points() + + if args.verbose: + print(f"已识别的挂载点: {mount_points}") + + # 最后一个参数是目标路径 + target = args.sources.pop() + sources = args.sources + + if not sources: + print("错误: 至少需要一个源文件或目录") + sys.exit(1) + + # 处理每个源文件或目录 + success_count = 0 + for source in sources: + if process_source(source, target, mount_points, args.verbose): + success_count += 1 + + # 报告结果 + if success_count == len(sources): + print(f"所有 {len(sources)} 个文件/目录已成功传输") + else: + print(f"传输完成: {success_count}/{len(sources)} 个文件/目录成功") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file