vcast_license_usage_server/server.py

136 lines
4.9 KiB
Python
Raw Permalink Normal View History

2025-01-05 09:59:40 +08:00
#!/usr/bin/python3
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
2025-01-05 09:59:40 +08:00
import json
import os
from pathlib import Path
from license import LicenseManager
from my_log import get_logger
log = get_logger()
2025-01-05 09:59:40 +08:00
global LIC_MANAGER
LIC_MANAGER = None
2025-01-05 09:59:40 +08:00
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.handler_map = {
'/vcast/lic_use_info': self.send_lic_info,
'/vcast/request_lic': self.send_request_lic,
'/vcast/queue_info': self.send_queue_info,
}
super().__init__(*args, **kwargs)
2025-01-05 09:59:40 +08:00
def log_message(self, format, *args):
return # 重写该方法以禁用日志输出
def do_GET(self):
if self.path in self.handler_map:
self.handler_map[self.path]()
self.wfile.flush()
self.close_connection = True
2025-01-05 09:59:40 +08:00
return
path = self.path
if path == '/':
path = '/index.html'
path = path.strip('/')
if not self.send_file(path):
self.send_404()
self.wfile.flush()
self.close_connection = True
def send_lic_info(self):
data = LIC_MANAGER.get_licenses_info()
resp_str = json.dumps(data).encode('utf-8')
self.send_response(200)
self.send_header('Content-type', 'text/json')
self.send_header('Content-Length', len(resp_str))
self.end_headers()
self.wfile.write(resp_str)
self.wfile.flush()
self.close_connection = True
def send_queue_info(self):
data = LIC_MANAGER.get_wait_queue()
resp_str = json.dumps(data).encode('utf-8')
self.send_response(200)
self.send_header('Content-type', 'text/json')
self.send_header('Content-Length', len(resp_str))
self.end_headers()
self.wfile.write(resp_str)
self.wfile.flush()
self.close_connection = True
def send_request_lic(self):
content_length = int(self.headers.get('Content-Length'))
data = self.rfile.read(content_length)
data = json.loads(data)
log.info(f"request license: {data['user']}@{data['host']}")
resp = LIC_MANAGER.request(data["user"], data["host"])
resp_str = json.dumps(resp).encode('utf-8')
self.send_response(200)
self.send_header('Content-type', 'text/json')
self.send_header('Content-Length', len(resp_str))
self.end_headers()
self.wfile.write(resp_str)
self.wfile.flush()
self.close_connection = True
2025-01-05 09:59:40 +08:00
def send_404(self):
self.send_response(404)
self.send_header('Content-type', 'text/html')
resp_str = "<h1>404 Not Found</h1><p>The page you are looking for does not exist.</p>".encode('utf-8')
self.send_header('Content-Length', len(resp_str))
2025-01-05 09:59:40 +08:00
self.end_headers()
self.wfile.write(resp_str)
self.wfile.flush()
self.close_connection = True
2025-01-05 09:59:40 +08:00
def send_file(self, file_path) -> bool:
file_path =str(Path(os.path.abspath(__file__)).parent.joinpath(file_path))
log.info(f"clinet request: [{file_path}]")
2025-01-05 09:59:40 +08:00
if not os.path.isfile(file_path):
return False
self.send_response(200)
mime_type = self.get_mime_type(file_path)
self.send_header('Content-type', mime_type)
if mime_type.startswith('text/'):
self.send_header('Content-Disposition', f'inline; filename={os.path.basename(file_path)}')
else:
self.send_header('Content-Disposition', f'attachment; filename={os.path.basename(file_path)}')
self.end_headers()
with open(file_path, 'rb') as f:
self.wfile.write(f.read())
return True
def get_mime_type(self, file_path):
ext = os.path.splitext(file_path)[1]
if ext == '.txt':
return 'text/plain'
elif ext == '.js':
return 'application/javascript'
elif ext == '.html':
return 'text/html'
elif ext == '.css':
return 'text/css'
# 添加更多 MIME 类型根据需要
return 'application/octet-stream' # 默认类型
def run(server_class=ThreadingHTTPServer, handler_class=SimpleHTTPRequestHandler, port=8088):
2025-01-05 09:59:40 +08:00
server_address = ('0.0.0.0', port)
httpd = server_class(server_address, handler_class)
log.info(f'Starting threaded server on http://{server_address[0]}:{server_address[1]} ...')
2025-01-05 09:59:40 +08:00
httpd.serve_forever()
if __name__ == "__main__":
LIC_MANAGER = LicenseManager()
try:
if len(os.sys.argv) > 1:
run(port=int(os.sys.argv[1]))
else:
run()
except KeyboardInterrupt:
log.info("Keyboard interrupt received, shutting down...")
finally:
LIC_MANAGER.exit()
log.info("Server stopped")