From 26a8aecfb4bf6994a3303c9ea8aa6bc08a2ff72b Mon Sep 17 00:00:00 2001 From: Begild Date: Tue, 14 Jan 2025 09:01:00 +0800 Subject: [PATCH] =?UTF-8?q?"=E4=BF=AE=E6=94=B9client=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E5=B0=86queue=E7=9A=84get=E9=98=BB=E5=A1=9E=E5=BC=8F?= =?UTF-8?q?=E6=94=B9=E6=88=90timout=E8=BD=AE=E8=AF=A2=E7=9A=84=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E4=BB=A5=E4=BE=BF=E4=BA=8E=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E6=94=AF=E6=8C=81ctrc=20+c=E4=B8=8D=E8=BF=87=E9=80=80=E5=87=BA?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E8=BF=98=E5=AD=98=E5=9C=A8=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E9=9C=80=E8=A6=81=E4=BF=AE=E6=94=B9=E3=80=82"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- license_client.py | 145 ++++++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 76 deletions(-) diff --git a/license_client.py b/license_client.py index 2c7e6e7..e335898 100755 --- a/license_client.py +++ b/license_client.py @@ -10,8 +10,7 @@ import sys from my_log import get_logger from mock_lmutil import mock_license_add_user, mock_license_remove_user, _read_sample_file, _write_sample_file import queue -import select # 添加 select 模块导入 -import signal +import select log = get_logger() @@ -31,25 +30,27 @@ class LicenseDispatcherClient: self.lic: queue.Queue = queue.Queue() self.current_lic: str = None self.dispatcher_timer: threading.Timer = None # Add missing attribute - self._lock = threading.Lock() # Add thread safety self.heartbeat_thread: threading.Thread | None = None # Add thread reference def request(self) -> str: - """Request license, block until license is obtained or user interrupts""" + """Request license, block until license is obtained""" response = self._request_license() - if response['status'] == 'refuse': - log.error(response['msg']) - return None - elif response['status'] == 'wait_dispatch': + if response['status'] == 'wait_dispatch': # Start heartbeat thread log.info("Waiting for license allocation...") self._start_dispatcher_client(response['dispatcher_server']['port']) - lic = self.get_lic() - return lic + else: + log.warning(f"{response['status']}: {response['msg']}") + return response['status'] def get_lic(self) -> str: """Get the assigned license""" - self.current_lic = self.lic.get() + while not self.exit_event.is_set(): + try: + self.current_lic = self.lic.get(timeout=0.1) + return self.current_lic + except queue.Empty: + continue return self.current_lic def _request_license(self) -> dict: @@ -71,7 +72,7 @@ class LicenseDispatcherClient: return json.loads(response_data) except (socket.timeout, ConnectionRefusedError) as e: log.error(f"Connection error: {e}") - return {"status": "refuse", "msg": str(e)} + return {"status": "connect_error", "msg": str(e)} except Exception as e: log.error(f"Requesting license error: {e}") return {"status": "error", "msg": str(e)} @@ -83,7 +84,11 @@ class LicenseDispatcherClient: """Start dispatcher connection with server""" log.info(f"Starting dispatcher: {port}") self.dispatcher_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.dispatcher_socket.connect((SERVER_HOST, port)) + try: + self.dispatcher_socket.connect((SERVER_HOST, port)) + except Exception as e: + log.error(f"Failed to connect to dispatcher: {e}") + return start_time = time.time() def heartbeat_loop(): @@ -100,13 +105,13 @@ class LicenseDispatcherClient: # Wait for response with timeout readable, _, _ = select.select([self.dispatcher_socket], [], [], 0.5) - + if self.exit_event.is_set(): + break if self.dispatcher_socket in readable: data = self.dispatcher_socket.recv(1024) if not data: log.warning("Connection closed by server") break - response = json.loads(data.decode()) if response.get('type') == 'lic_dispatch': self.lic.put(response['lic']) @@ -121,12 +126,20 @@ class LicenseDispatcherClient: break elif response.get('type') == 'heartbeat_ack': wait_time = int(time.time() - start_time) - print(f"\rwaiting... {wait_time}s", end='', flush=True) + if wait_time < 60: + print(f"\rwaiting... {wait_time}s", end='', flush=True) + elif wait_time < 3600: + print(f"\rwaiting... {wait_time / 60}m {wait_time}s", end='', flush=True) + else: + print(f"\rwaiting... {wait_time / 3600}h {wait_time / 60}m {wait_time}s", end='', flush=True) elif response.get('type') == 'queue': wait_time = int(time.time() - start_time) log.info(f"waiting... {wait_time}s") time.sleep(0.5) # Small sleep to prevent CPU spinning + except KeyboardInterrupt: + log.info("Keyboard interrupt received, shutting down...") + break except Exception as e: log.error(f"Error in heartbeat loop: {e}") break @@ -134,7 +147,7 @@ class LicenseDispatcherClient: self.lic.put(None) # Signal end of heartbeat loop # Start heartbeat thread - self.heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True) + self.heartbeat_thread = threading.Thread(target=heartbeat_loop) self.heartbeat_thread.start() def release(self) -> None: @@ -150,37 +163,22 @@ class LicenseDispatcherClient: if self.heartbeat_thread and self.heartbeat_thread.is_alive(): self.heartbeat_thread.join(timeout=1.0) - with self._lock: - if self.dispatcher_socket: - try: - self.dispatcher_socket.shutdown(socket.SHUT_RDWR) - except Exception: - pass - self.dispatcher_socket.close() - self.dispatcher_socket = None + if self.dispatcher_socket: + try: + self.dispatcher_socket.shutdown(socket.SHUT_RDWR) + except Exception: + pass + self.dispatcher_socket.close() + self.dispatcher_socket = None self.release() self.lic.put(None) except Exception as e: log.error(f"Error during exit: {e}") -# Global flag for exit control -is_exiting = False clients: dict[str, LicenseDispatcherClient] = {} - -def signal_handler(signum, frame): - """Handle interrupt signals""" - global is_exiting - if is_exiting: - log.info("\nForce exiting...") - sys.exit(1) - - log.info("\nReceived interrupt signal, cleaning up...") - is_exiting = True - cleanup_and_exit() - -def cleanup_and_exit(): - """Cleanup all resources and exit""" +def cleanup_clients(): + """清理所有客户端资源""" for client_key, client in list(clients.items()): try: log.info(f"Cleaning up client: {client_key}") @@ -189,43 +187,35 @@ def cleanup_and_exit(): except Exception as e: log.error(f"Error cleaning up client {client_key}: {e}") log.info("\nProgram exited") - sys.exit(0) def main() -> None: - # Register signal handlers - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - # Make sure SIGINT is not ignored - if os.name == 'posix': - signal.siginterrupt(signal.SIGINT, True) + try: + while True: + show_menu() + cmd = input("Please select an operation: ").strip() - while not is_exiting: - show_menu() - # Set timeout for input operations - cmd = input("Please select an operation: ").strip() - - if is_exiting: # Check if exit flag was set during input - break - - if cmd == '1': - create_client() - elif cmd == '2': - release_client() - elif cmd == '3': - show_clients() - elif cmd == '4': - break - else: - print("Invalid command") - if not is_exiting: - cleanup_and_exit() + if cmd == '1': + create_client() + elif cmd == '2': + release_client() + elif cmd == '3': + show_clients() + elif cmd == '4': + break + else: + print("Invalid command") + except KeyboardInterrupt: + print("\nReceived keyboard interrupt, cleaning up...") + finally: + cleanup_clients() def create_client(): """Create new license client""" - global is_exiting - username = input("Please enter username: ").strip() - hostname = input("Please enter hostname: ").strip() - if is_exiting: + try: + username = input("Please enter username: ").strip() + hostname = input("Please enter hostname: ").strip() + except KeyboardInterrupt: + print("\nOperation cancelled") return if not username or not hostname: @@ -238,14 +228,17 @@ def create_client(): return client = LicenseDispatcherClient(user=username, host=hostname) - lic = client.request() - if lic: + status = client.request() + if status == 'wait_dispatch': + lic = client.get_lic() clients[client_key] = client - # Add mock license here mock_license_add_user(lic, username, hostname) print(f"{client_key} successfully created and requested license: {lic}") else: - print(f"{client_key} License request failed") + if status == 'connect_error': + print(f"{client_key} License Server connect error, You can try again later") + else: + print(f"{client_key} License request failed: {status}") def release_client(): """Release existing license"""