vcast_license_usage_server/license.py

267 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
import json
import queue
import re
import sys
import socket
import asyncio
import threading
import time
from typing import Dict, List
from datetime import datetime
from utils import *
import logger
log = logger.get_logger()
def _get_ip_address(domain):
try:
ip_address = socket.gethostbyname(domain)
# log.debug(f"get_ip_address {domain} {ip_address}")
return ip_address
except socket.gaierror as e:
# print(f"Error retrieving IP address: {e}")
return None
def _get_valid_licenses():
lic_id = 0
while True:
lic_id += 1
domain = f"szmaslic{lic_id:02d}"
if _get_ip_address(domain) is None:
break
yield "27003@" + domain
class UsedInfo:
def __init__(self):
self.user = ''
self.host = ''
self.lic = ''
self.start_time:datetime = None
def __str__(self):
time = datetime.now() - self.start_time
login_time = (time.days * 24 + time.seconds / 3600.0)
obj = {
"user":self.user,
"host":self.host,
'lic':self.lic,
'login':f'{login_time:.2f}h'
}
return str(obj)
def __repr__(self):
return self.__str__()
class LicenseInfo:
def __init__(self, name='', total=0, used=0):
self.name = name
self.total = total
self.used = used
self.used_info:List[UsedInfo] = []
def available(self):
return self.total - self.used
def __str__(self):
obj = {
"name":self.name,
"total":self.total,
"used":self.used,
"used_info":self.used_info
}
return str(obj)
def __repr__(self):
return self.__str__()
class LicenseManager:
def __init__(self):
self.wait_queue = queue.Queue()
self.ack_queue = queue.Queue()
self.licenses_stat = LicenseStatistic()
self.licenses:Dict[str,LicenseInfo] = {}
#启动定时获取
self.licenses_stat.lic_info_updater()
def lic_info_updater(self):
self.update()
if self.wait_queue.qsize() > 0:
for lic, available_cnt in self.licenses_stat.available():
for i in range(available_cnt):
req = self.wait_queue.get()
if req == None:
return
req["lic"] = lic
self.ack_queue.put(req)
threading.Timer(10, self.lic_info_updater).start() #每间隔一段时间更新一次lic的使用情况
def _in_wait_queue(self, user):
return any(user == _.user for _ in list(self.wait_queue))
def request(self, user:str, host:str):
#如果已经是在线的状态就不允许申请
if any(user == _.user for _ in self.licenses_stat.online_users()):
log.warning(f"{user} is online, please try again later")
return False
#如果是已经在队列中则返回等待
if self._in_wait_queue(user):
return None
data = {
"user":user,
"host":host,
"start_time":time.now()
}
self.wait_queue.put(data)
return False
def cancel(self):
pass
def release(self):
pass
def get_wait_queue(self):
return [_ for _ in self.wait_queue.queue]
class LicenseStatistic:
def __init__(self):
self.licenses:Dict[str,LicenseInfo] = {}
def update(self):
# 更新license信息使用异步获取
tasks = []
for lic in _get_valid_licenses():
log.trace(f'find valid lic: {lic}')
tasks.append(get_license_used_info(lic))
loop = asyncio.get_event_loop()
infos = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
for info in infos:
if info is not None:
self.licenses[info.name] = info
if not self.licenses:
log.warning("not vcast lic found")
def get_lic_list(self):
# 获取所有的license名称
return self.licenses.keys()
def available(self):
# 获取所有的可用数量
for _,lic in self.licenses.items():
value = lic.available()
if value > 0:
yield (lic.name, value) #lic的name和个数
def online_users(self):
# 获取在线用户
for used_info in self.used():
yield used_info.user #返回的是user name
def used(self):
# 获取所有的已使用信息
for _,info in self.licenses.items():
for used in info.used_info:
yield used #返回的是UsedInfo对象
def sorted(self):
info = {}
for used_info in self.used():
time = datetime.now() - used_info.start_time
login_time = (time.days * 24 + time.seconds / 3600.0)
info[used_info.user] = login_time
sorted_info = dict(sorted(info.items(), key=lambda item: item[1], reverse=True))
return sorted_info #返回的是字典key为username value为登录时间
def __str__(self):
string = str(self.licenses).replace("'", '"')
log.info(string)
obj = json.loads(string)
string = json.dumps(obj, indent = 4)
return string
def __repr__(self):
return self.__str__()
"""
{"27003@szmaslic03": {"name": "27003@szmaslic03", "total": 2, "used": 1, "used_info": "[{"name": "haoxiang.ran", "host": "szl3bc12808", "lic": "szmaslic03", "login": "0.41"}]"}, "27003@szmaslic04": {"name": "27003@szmaslic04", "total": 2, "used": 1, "used_info": "[{"name": "macro.yang", "host": "szl3bc12804", "lic": "szmaslic04", "login": "52.28"}]"}, "27003@szmaslic06": {"name": "27003@szmaslic06", "total": 2, "used": 2, "used_info": "[{"name": "zhenhao.he", "host": "szl3bc12804", "lic": "szmaslic06", "login": "0.20"}, {"name": "phillip.chan", "host": "szl3bc12810", "lic": "szmaslic06", "login": "0.16"}]"}, "27003@szmaslic07": {"name": "27003@szmaslic07", "total": 2, "used": 1, "used_info": "[{"name": "zhiyi.li", "host": "szl3bc12806", "lic": "szmaslic07", "login": "2.96"}]"}, "27003@szmaslic08": {"name": "27003@szmaslic08", "total": 2, "used": 2, "used_info": "[{"name": "soo.liu", "host": "szl3bc12809", "lic": "szmaslic08", "login": "6.55"}, {"name": "haichao.ou", "host": "szl3bc12806", "lic": "szmaslic08", "login": "5.93"}]"}, "27003@szmaslic09": {"name": "27003@szmaslic09", "total": 2, "used": 2, "used_info": "[{"name": "louie.liang", "host": "szl3bc12810", "lic": "szmaslic09", "login": "8.31"}, {"name": "zabbix", "host": "szl3bc06409", "lic": "szmaslic09", "login": "0.90"}]"}, "27003@szmaslic10": {"name": "27003@szmaslic10", "total": 2, "used": 2, "used_info": "[{"name": "harvey.li", "host": "szl3bc12810", "lic": "szmaslic10", "login": "57.38"}, {"name": "kw.hu", "host": "szl3bc12809", "lic": "szmaslic10", "login": "1.41"}]"}}
"""
async def get_license_used_info(lic):
server_used_info = f"timeout 1 /tools/software/vcast/flexlm/lmutil lmstat -a -c {lic} -a"
ret,out,err = shell(server_used_info, checkret=False)
# log.info(f"get_license_used_info {lic} {out} {ret} {err}")
if ret != 0:
return None
""" data like:
Users of VCAST_C_ENT_0800: (Total of 2 licenses issued; Total of 2 licenses in use)
"VCAST_C_ENT_0800" v23, vendor: vector, expiry: permanent(no expiration date)
vendor_string: CUST:64925:
floating license
zhenhao.he szl3bc12804 /dev/tty (v23) (szmaslic06/27003 337), start Thu 1/2 8:52
phillip.chan szl3bc12810 /dev/tty (v23) (szmaslic06/27003 233), start Thu 1/2 14:59
Users of CCAST_0801: (Total of 2 licenses issued; Total of 0 licenses in use)
"""
status = 'idle'
info = LicenseInfo(lic)
for line in out.splitlines():
line = line.strip()
if status == 'idle':
#Error getting status: Cannot find license file. (-1,359:2 "No such file or directory")
if line.startswith("Error getting status"):
log.warning(f"{lic}")
return None
if not line.startswith("Users of VCAST_C_ENT_0800:"):
continue
status = 'start'
#[Users of VCAST_C_ENT_0800: (Total of 2 licenses issued; Total of 1 license in use)
pattern = r"\(Total of (\d+).*Total of (\d+).*\)"
ret = re.findall(pattern, line)
if not ret:
log.warning(f"{lic} parser [{line}] fail")
return None
(total, used) = ret[0]
info.total = int(total)
info.used = int(used)
elif status == 'start':
#zhenhao.he szl3bc12804 /dev/tty (v23) (szmaslic06/27003 337), start Thu 1/2 8:52
pattern = '(\S+) (\S+).*\((\S+)/.*\), start (.*)'
ret = re.findall(pattern, line)
if not ret:
continue
used = UsedInfo()
(used.user,used.host,used.lic,time_string) = ret[0]
#处理跨年的情况
year = datetime.now().year
start_time = datetime.strptime(f"{year} {time_string}", "%Y %a %m/%d %H:%M")
if start_time > datetime.now():
year -= 1
start_time = datetime.strptime(f"{year} {time_string}", "%Y %a %m/%d %H:%M")
used.start_time = start_time
# log.info(f"{lic} => {used}")
info.used_info.append(used)
# Next section
elif line.startswith('Users of '):
break
return info
def draw_time_grpah(license_manager):
info = license_manager.sorted()
max_time = max(info.values())
name_len = max([len(_) for _ in info.keys()])
top = 100
for user, time in info.items():
curr = '' * max(int(top * time / max_time), 1)
name_remain = ' ' * (name_len - len(user))
log.info(f"{user}{name_remain} :{time:5.2f}h {curr}")
if __name__ == '__main__':
log.info("License status query...")
lm = LicenseStatistic()
log.info(lm)
available_lic = lm.available()
if not available_lic:
log.info("No license available")
log.info(f"All licenses is: {[str(_) for _ in lm.get_lic_list()]}")
for lic,number in lm.available():
log.info(f"{lic}: {number} lic available")
draw_time_grpah(lm)