#!/usr/bin/python3 from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json import os from pathlib import Path from license import LicenseManager from my_log import get_logger log = get_logger() global LIC_MANAGER LIC_MANAGER = None class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self.handler_map = { '/vcast/lic_use_info': self.send_lic_info, '/vcast/request_lic': self.send_request_lic, '/vcast/queue_info': self.send_queue_info, } super().__init__(*args, **kwargs) def log_message(self, format, *args): return # 重写该方法以禁用日志输出 def do_GET(self): if self.path in self.handler_map: self.handler_map[self.path]() self.wfile.flush() self.close_connection = True return path = self.path if path == '/': path = '/index.html' path = path.strip('/') if not self.send_file(path): self.send_404() self.wfile.flush() self.close_connection = True def send_lic_info(self): data = LIC_MANAGER.get_licenses_info() resp_str = json.dumps(data).encode('utf-8') self.send_response(200) self.send_header('Content-type', 'text/json') self.send_header('Content-Length', len(resp_str)) self.end_headers() self.wfile.write(resp_str) self.wfile.flush() self.close_connection = True def send_queue_info(self): data = LIC_MANAGER.get_wait_queue() resp_str = json.dumps(data).encode('utf-8') self.send_response(200) self.send_header('Content-type', 'text/json') self.send_header('Content-Length', len(resp_str)) self.end_headers() self.wfile.write(resp_str) self.wfile.flush() self.close_connection = True def send_request_lic(self): content_length = int(self.headers.get('Content-Length')) data = self.rfile.read(content_length) data = json.loads(data) log.info(f"request license: {data['user']}@{data['host']}") resp = LIC_MANAGER.request(data["user"], data["host"]) resp_str = json.dumps(resp).encode('utf-8') self.send_response(200) self.send_header('Content-type', 'text/json') self.send_header('Content-Length', len(resp_str)) self.end_headers() self.wfile.write(resp_str) self.wfile.flush() self.close_connection = True def send_404(self): self.send_response(404) self.send_header('Content-type', 'text/html') resp_str = "
The page you are looking for does not exist.
".encode('utf-8') self.send_header('Content-Length', len(resp_str)) self.end_headers() self.wfile.write(resp_str) self.wfile.flush() self.close_connection = True def send_file(self, file_path) -> bool: file_path =str(Path(os.path.abspath(__file__)).parent.joinpath(file_path)) log.info(f"clinet request: [{file_path}]") if not os.path.isfile(file_path): return False self.send_response(200) mime_type = self.get_mime_type(file_path) self.send_header('Content-type', mime_type) if mime_type.startswith('text/'): self.send_header('Content-Disposition', f'inline; filename={os.path.basename(file_path)}') else: self.send_header('Content-Disposition', f'attachment; filename={os.path.basename(file_path)}') self.end_headers() with open(file_path, 'rb') as f: self.wfile.write(f.read()) return True def get_mime_type(self, file_path): ext = os.path.splitext(file_path)[1] if ext == '.txt': return 'text/plain' elif ext == '.js': return 'application/javascript' elif ext == '.html': return 'text/html' elif ext == '.css': return 'text/css' # 添加更多 MIME 类型根据需要 return 'application/octet-stream' # 默认类型 def run(server_class=ThreadingHTTPServer, handler_class=SimpleHTTPRequestHandler, port=8088): server_address = ('0.0.0.0', port) httpd = server_class(server_address, handler_class) log.info(f'Starting threaded server on http://{server_address[0]}:{server_address[1]} ...') httpd.serve_forever() if __name__ == "__main__": LIC_MANAGER = LicenseManager() try: if len(os.sys.argv) > 1: run(port=int(os.sys.argv[1])) else: run() except KeyboardInterrupt: log.info("Keyboard interrupt received, shutting down...") finally: LIC_MANAGER.exit() log.info("Server stopped")