""" License Server — Secure Edition Security: TLS 1.2+ encryption, length-prefixed framing (solves TCP streaming issues), anti-replay via timestamp + nonce, strong license keys via secrets module, server identity via certificate fingerprint (TOFU). """
import os import json import time import struct import socket import ssl import secrets import hashlib import threading from collections import deque
from rich.console import Console from rich.table import Table
console = Console()
HOST = "0.0.0.0" PORT = 11451 LICENSE_FILE = "licenses.json" SESSION_FILE = "sessions.json" CERT_FILE = "server_cert.pem" KEY_FILE = "server_key.pem" FINGERPRINT_FILE = "server_fingerprint.txt" HEARTBEAT_TIMEOUT = 60 NONCE_WINDOW = 300 MAX_PAYLOAD = 10 * 1024 * 1024
file_lock = threading.Lock() log_buffer = deque(maxlen=1000) log_lock = threading.Lock() LOG_MODE = False
def log(msg): line = f"[{time.strftime('%H:%M:%S')}] {msg}" with log_lock: log_buffer.append(line) if LOG_MODE: console.print(line)
def load_json(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {}
def save_json(path, data): with file_lock: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4)
def generate_self_signed_cert(): try: from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization import ipaddress import datetime as dt except ImportError: console.print("[red]Error: 'cryptography' library required.[/red]") console.print("Install: [bold]pip install cryptography[/bold]") return None
console.print("[yellow]Generating self-signed TLS certificate …[/yellow]")
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Beijing"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Beijing"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "LicenseServer"), x509.NameAttribute(NameOID.COMMON_NAME, "localhost"), ])
cert = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(dt.datetime.now(dt.timezone.utc)) .not_valid_after(dt.datetime.now(dt.timezone.utc) + dt.timedelta(days=3650)) .add_extension( x509.SubjectAlternativeName([ x509.DNSName("localhost"), x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), ]), critical=False, ) .sign(private_key, hashes.SHA256()) )
with open(CERT_FILE, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) with open(KEY_FILE, "wb") as f: f.write(private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption(), ))
fingerprint = hashlib.sha256( cert.public_bytes(serialization.Encoding.DER) ).hexdigest()
with open(FINGERPRINT_FILE, "w") as f: f.write(fingerprint)
console.print("[green]Certificate generated.[/green]") console.print(f"[bold]SHA256 Fingerprint:[/bold] [cyan]{fingerprint}[/cyan]") return fingerprint
def get_or_create_cert(): if os.path.exists(CERT_FILE) and os.path.exists(KEY_FILE): if os.path.exists(FINGERPRINT_FILE): with open(FINGERPRINT_FILE) as f: fp = f.read().strip() else: with open(CERT_FILE, "rb") as f: from cryptography import x509 cert = x509.load_pem_x509_certificate(f.read()) from cryptography.hazmat.primitives import serialization fp = hashlib.sha256(cert.public_bytes(serialization.Encoding.DER)).hexdigest() with open(FINGERPRINT_FILE, "w") as f: f.write(fp) console.print("[green]Using existing TLS certificate.[/green]") console.print(f"[bold]SHA256 Fingerprint:[/bold] [cyan]{fp}[/cyan]") return fp return generate_self_signed_cert()
def send_msg(sock, data): """Length-prefixed JSON: 4-byte BE length + payload.""" payload = json.dumps(data).encode("utf-8") sock.sendall(struct.pack("!I", len(payload)) + payload)
def recv_msg(sock): """Read a length-prefixed JSON message from the stream."""
def _read_exact(n): buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: return None buf += chunk return buf
lb = _read_exact(4) if lb is None: return None length = struct.unpack("!I", lb)[0] if length > MAX_PAYLOAD: return None
payload = _read_exact(length) if payload is None: return None return json.loads(payload.decode("utf-8"))
class NonceStore: """Tracks seen nonces to detect replay attacks."""
def __init__(self): self._nonces = {} self._lock = threading.Lock()
def check_and_add(self, nonce, now): with self._lock: self._cleanup(now) if nonce in self._nonces: return False self._nonces[nonce] = now + NONCE_WINDOW return True
def _cleanup(self, now): expired = [n for n, exp in self._nonces.items() if now > exp] for n in expired: del self._nonces[n]
def generate_license(): raw = secrets.token_hex(8) return f"LC-{raw[:4]}-{raw[4:8]}-{raw[8:12]}-{raw[12:16]}"
class LicenseServer: def __init__(self): self.licenses = load_json(LICENSE_FILE) self.sessions = load_json(SESSION_FILE) self.nonce_store = NonceStore()
def cleanup(self): now = int(time.time()) changed = False
for lic, clients in list(self.sessions.items()): for cid, info in list(clients.items()): if now - info["last_heartbeat"] > HEARTBEAT_TIMEOUT: log(f"Session expired | License={lic} ClientID={cid}") del clients[cid] changed = True if not clients: del self.sessions[lic]
if changed: save_json(SESSION_FILE, self.sessions)
def create_license(self, username, password, max_users): key = generate_license() self.licenses[key] = { "username": username, "password": password, "max_users": max_users, "created_at": int(time.time()), } save_json(LICENSE_FILE, self.licenses) return key
def authorize(self, lic, client_id): self.cleanup()
if lic not in self.licenses: return False, "Invalid license"
max_users = self.licenses[lic]["max_users"] clients = self.sessions.setdefault(lic, {})
if client_id in clients: clients[client_id]["last_heartbeat"] = int(time.time()) save_json(SESSION_FILE, self.sessions) return True, "OK"
if len(clients) >= max_users: return False, "Limit reached"
clients[client_id] = {"last_heartbeat": int(time.time())} save_json(SESSION_FILE, self.sessions) return True, "OK"
def heartbeat(self, lic, client_id): if lic not in self.sessions: return False if client_id not in self.sessions[lic]: return False
self.sessions[lic][client_id]["last_heartbeat"] = int(time.time()) save_json(SESSION_FILE, self.sessions) return True
server = LicenseServer()
def validate_request(req): now = int(time.time()) ts = req.get("timestamp", 0) nonce = req.get("nonce", "")
if abs(now - ts) > NONCE_WINDOW: return False, "Request expired or clock skew too large"
if not nonce or len(nonce) < 16: return False, "Invalid nonce"
if not server.nonce_store.check_and_add(nonce, now): return False, "Duplicate request (possible replay attack)"
return True, "OK"
def handle_client(conn, addr): ip = addr[0]
try: while True: req = recv_msg(conn) if req is None: break
valid, msg = validate_request(req) if not valid: log(f"[REPLAY] | IP={ip} Reason={msg}") send_msg(conn, {"ok": False, "msg": msg}) continue
action = req.get("action") lic = req.get("license") cid = req.get("client_id")
if action == "auth": ok, msg = server.authorize(lic, cid) log(f"[AUTH] | IP={ip} License={lic} ClientID={cid} Result={msg}") send_msg(conn, {"ok": ok, "msg": msg})
elif action == "heartbeat": ok = server.heartbeat(lic, cid) log(f"[HEARTBEAT] | IP={ip} License={lic} ClientID={cid} Status={ok}") send_msg(conn, {"ok": ok})
else: send_msg(conn, {"ok": False, "msg": "Unknown action"})
except (ssl.SSLError, ConnectionError, OSError) as e: log(f"Connection error | IP={ip} Error={e}") except Exception as e: log(f"Unexpected error | IP={ip} Error={e}") finally: try: conn.close() except Exception: pass log(f"Disconnected | IP={ip}")
def create_ssl_context(): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 ctx.load_cert_chain(CERT_FILE, KEY_FILE) return ctx
def socket_server(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen(5)
ssl_ctx = create_ssl_context() log(f"Secure server listening on {HOST}:{PORT} (TLS 1.2+)")
while True: raw_conn, addr = s.accept() log(f"Connected | IP={addr[0]}:{addr[1]}")
try: conn = ssl_ctx.wrap_socket(raw_conn, server_side=True) except ssl.SSLError as e: log(f"TLS handshake failed | IP={addr[0]} Error={e}") try: raw_conn.close() except Exception: pass continue
t = threading.Thread(target=handle_client, args=(conn, addr), daemon=True) t.start()
def show_menu(): console.clear() console.print(""" [bold cyan]Secure License Server[/bold cyan]
[1] Create License [2] Show Licenses [3] Show Sessions [4] View Logs [5] Show Certificate Fingerprint [6] Exit """)
def create_license(): u = input("Username: ").strip() p = input("Password: ").strip() if not u or not p: console.print("[red]Username and password cannot be empty.[/red]") return
try: n = int(input("Max users: ").strip()) if n < 1 or n > 1000: raise ValueError except ValueError: console.print("[red]Max users must be an integer between 1 and 1000.[/red]") return
key = server.create_license(u, p, n) log(f"License created | Key={key} User={u} Max={n}")
table = Table(title="License Created") table.add_column("Key", style="cyan") table.add_column("User") table.add_column("Max") table.add_row(key, u, str(n)) console.print(table)
def show_licenses(): if not server.licenses: console.print("[yellow]No licenses found.[/yellow]") return
table = Table(title="Licenses") table.add_column("Key", style="cyan") table.add_column("User") table.add_column("Max") for k, v in server.licenses.items(): table.add_row(k, v["username"], str(v["max_users"])) console.print(table)
def show_sessions(): if not server.sessions: console.print("[yellow]No active sessions.[/yellow]") return
table = Table(title="Active Sessions") table.add_column("License") table.add_column("Client ID") table.add_column("Last Heartbeat") for lic, clients in server.sessions.items(): for cid, info in clients.items(): table.add_row(lic, cid, str(info["last_heartbeat"])) console.print(table)
def log_view(): global LOG_MODE LOG_MODE = True console.clear() console.print("[bold]Live Log[/bold] (press Enter to return)\n") with log_lock: for line in log_buffer: console.print(line) input() LOG_MODE = False
def show_fingerprint(): if os.path.exists(FINGERPRINT_FILE): with open(FINGERPRINT_FILE) as f: fp = f.read().strip() console.print(f"[bold]Server SHA256 Fingerprint:[/bold]\n[cyan]{fp}[/cyan]") console.print( "\n[yellow]Give this fingerprint to clients so they can verify the server.[/yellow]" ) else: console.print("[red]Fingerprint file not found.[/red]")
def cli(): while True: show_menu() choice = input("> ").strip()
if choice == "1": create_license() input("\nPress Enter …") elif choice == "2": show_licenses() input("\nPress Enter …") elif choice == "3": show_sessions() input("\nPress Enter …") elif choice == "4": log_view() elif choice == "5": show_fingerprint() input("\nPress Enter …") elif choice == "6": console.print("[yellow]Shutting down …[/yellow]") os._exit(0) else: console.print("[red]Invalid choice.[/red]") time.sleep(0.5)
if __name__ == "__main__": console.print("[bold cyan]Secure License Server[/bold cyan]\n")
fp = get_or_create_cert() if fp is None: console.print("[red]Cannot set up TLS — exiting.[/red]") os._exit(1)
threading.Thread(target=socket_server, daemon=True).start() cli()
|