浅谈软件许可认证

浅谈软件许可认证

Coast23

前言

之所以想写这么一篇文章,是因为计网实验要求用 Socket API 实现一个简单的在线并发式浮动授权应用,于是就有了写这篇文章的 motivation。这个计网实验的代码实现本身没什么好聊的,但相较之下,软件许可认证就是一个 significant 且 realistic 的 topic,涉及了应用层协议设计、网络通信机制以及软件系统安全等多个领域,感觉很值得一聊。

不过,我对这些领域只懂一点点皮毛,所以也没办法讲出深刻的东西(又水了一篇博文,哈哈),讲的东西也不一定对,若有错误,还恳请大佬指正。

软件许可认证简介

很多时候,我们不希望软件能给所有人使用,而是只有特定群体(比如付钱的、有权的)才能使用,于是就需要软件许可认证来解决这个问题。

软件许可认证起到一个门禁的作用,用于区分哪些人能进(有授权),哪些人不能进(未授权)。

但只要有门的存在,就必然会有翻窗的、破门的人在。几乎不存在不使用盗版软件、盗版系统的人吧(毕竟白嫖那可是人之常情~),甚至出现了像吾爱破解这样的讨论破解技术的论坛。

因此,如何阻止非授权的使用,如何保护软件,是一个很重要的技术问题,事关软件开发者能否安心赚钱。

常见软件许可认证方式

可以将软件许可认证按是否联网分为两大类:离线认证与在线认证。

在线认证需要有一个授权服务器,优点是控制力强,防破解能力也强,缺点是强依赖于网络,会产生长期维护服务器和数据库的成本。

相较之下,离线认证轻量得多,一般会内置验证算法,通过验证 key 是否合法来授权。相较在线认证要不安全得多,一旦被破解并上传到网络,就很难赚钱了。

离线认证常见模式

序列号/注册码验证(Serial Key / CDKey)

最简单的做法。判断是否为真,其中是校验算法,一般会是哈希或者非对称加密算法,且不依赖于硬件。

一个有效的能全网通用,安全性约等于没有。当然,如果加上用于 CDKey 的生成、校验和销毁的服务器,就会变成一种可靠的在线许可认证模式。

基于机器码的离线授权

软件可以读取当前计算机的不变的硬件信息(如 CPU 序列号,主板序列号,网卡 MAC 等),基于这些生成一串机器码(HWID)。

用户把机器码发给软件开发者,软件开发者使用私钥(一般是用 RSA/ECC)对机器码和授权信息做签名,生成 .lic 或者 .key 之类的授权文件或授权码,发给用户。

软件导入这个授权文件,用内置的公钥验证签名和硬件信息,能匹配才解锁。

优点是安全性很高,缺点嘛,只要用户更换硬件或者设备就得重发授权申请。

若用户群体的范围小,则很适合用这个方法。

简单示例:

1

硬件加密狗

尽管用过,但不是很了解。

原理大概就是,内置一个硬件 ID,或内置不可读的加密算法,和软件做 Challenge-Response 身份认证,或将软件的核心算法移植到硬件芯片中运行。此外,物理硬件应该也会有一些防攻击手段。

通过将软件的 secret 封装到唯一且不可复制的物理设备中,使其相比纯软件方案更难以 bypass。

在线认证常见模式

账号订阅制

在线应用基本上都会给用户分配账号,用户通过打钱来获取相应的软件服务。绝大部分在线软件的商业模式都是这样的。

在线激活验证

软件首次使用或定期向厂商服务器发送“激活请求”,服务器核验后返回授权凭证和状态。优点是易于管理与升级,支持功能开关和试用。

浮动授权(Floating Authorization)

在局域网或广域网上部署认证服务器,客户端启动时签出一个授权席位,关闭时将其归还,适合校企批量采购的少量授权。

其它

不论是哪种许可认证方式,客户端的校验逻辑都极为重要,必须防范本地的逆向或绕过攻击。至少也要做代码混淆或者加壳(如 Themida,VMP 等壳)。

对于在线认证,通讯加密是必要的。

实现一个简单的浮动许可模型

实验要求

某远程桌面连接软件 A 需要有一个许可证程序来保证合法运行。这样的许可证(License)是带有限制的。用户提供支付费用,获得一个若干人(如 10 人或 50 人)的许可证。规则如下:

1、某组织管理员在购买许可证时,输入用户名、口令和许可证类型,许可证程序返回一个由 10 个数字组成的序列号。
2、该组织的用户第一次使用软件 A 时,输入序列号。
3、该组织用户运行软件时,向许可证服务器发送验证。
4、许可证服务器查询得到该序列号的使用人数,如果未到达上限,则返回授权指令;否则,返回拒绝指令。
5、软件 A 得到授权指令,允许用户使用软件。否则,提示用户稍后再试,退出程序。

当软件 A 或非正常退出(崩溃被其它程序中止)时,许可证服务器应在扣除使用人数时剔除它。可以使软件 A 定期(如:30 分钟)向服务器报告其状态,超过一定时间没有收到报告时,则认定崩溃。

当许可证服务器崩溃时,软件 A 应能在重新启动时恢复。许可证服务器重启后,如果新的软件 A 前来连接,服务器不可以因接受其连接而拒绝已认证用户的连接。

简单分析 & 实现

显然这是一个简单的浮动许可模型。

对于传输层协议的选择,显然我们不能容忍丢包,因此选择 TCP 而非 UDP

TCP Socket 的流程很工作化,服务端 bind -> listen -> accept,客户端 connect,然后互相 send/recv 即可。

针对实验要求的容灾机制,可以通过以下两点实现:

  1. 客户端心跳(Heartbeat):客户端定时发送心跳包,服务端记录最后心跳时间,超时则剔除,避免客户端崩溃导致僵尸占用。
  2. 服务端持久化(Persistence):服务端把状态(License 和 Session)实时写入本地 JSON(这个小场景没必要起数据库),避免服务端崩溃导致数据丢失。

然后把认证规则模拟一下即可。

很容易借助 LLM 搓出花里胡哨的界面。我懒得搓 UI,只用 Pythonrich 库简单包装了一下。

# server.py
import os
import json
import time
import socket
import random
import threading
from collections import deque
from rich.console import Console
from rich.table import Table

console = Console()

HOST = "0.0.0.0"
PORT = 11451

LICENSE_FILE = "licenses.json"
SESSION_FILE = "sessions.json"
HEARTBEAT_TIMEOUT = 60

lock = threading.Lock()

log_buffer = deque(maxlen = 1000)
log_lock = threading.Lock()
LOG_MODE = False

def log(msg):
global LOG_MODE
line = f"[{time.strftime('%H:%M:%S')}] {msg}"

with log_lock:
log_buffer.append(line)

if LOG_MODE:
console.print(line)

def load_json(path):
try:
with open(path, "r", encoding = "utf-8") as f:
return json.load(f)
except Exception as e:
return {}

def save_json(path, data):
with lock:
with open(path, "w", encoding = "utf-8") as f:
json.dump(data, f, indent = 4)

def generate_license():
return "".join(str(random.randint(0, 9)) for _ in range(10))

class LicenseServer:
def __init__(self):
self.licenses = load_json(LICENSE_FILE)
self.sessions = load_json(SESSION_FILE)

def cleanup(self):
now = int(time.time())
changed = False

for lic, clients in list(self.sessions.items()):
for cid, info in list(clients.items()):
if now - info["last_heartbeat"] > HEARTBEAT_TIMEOUT:
log(f"Session expired | License={lic} ClientID={cid}")
del clients[cid]
changed = True

if not clients:
del self.sessions[lic]

if changed:
save_json(SESSION_FILE, self.sessions)

def create_license(self, username, password, max_users):
key = generate_license()
self.licenses[key] = {
"username": username,
"password": password,
"max_users": max_users,
"created_at": int(time.time())
}
save_json(LICENSE_FILE, self.licenses)
return key

def authorize(self, lic, client_id):
self.cleanup()

if lic not in self.licenses:
return False, "Invalid license"

max_users = self.licenses[lic]["max_users"]
clients = self.sessions.setdefault(lic, {})

if client_id in clients:
clients[client_id]["last_heartbeat"] = int(time.time())
save_json(SESSION_FILE, self.sessions)
return True, "OK"

if len(clients) >= max_users:
return False, "Limit reached"

clients[client_id] = {
"last_heartbeat": int(time.time())
}
save_json(SESSION_FILE, self.sessions)
return True, "OK"

def heartbeat(self, lic, client_id):
if lic not in self.sessions:
return False
if client_id not in self.sessions[lic]:
return False

self.sessions[lic][client_id]["last_heartbeat"] = int(time.time())
save_json(SESSION_FILE, self.sessions)
return True

server = LicenseServer()

def handle_client(conn, addr):
ip = addr[0]

try:
while True:
data = conn.recv(4096)
if not data:
break

req = json.loads(data.decode())
action = req.get("action")
lic = req.get("license")
cid = req.get("client_id")

if action == "auth":
ok, msg = server.authorize(lic, cid)
log(f"[AUTH] | IP={ip} License={lic} ClientID={cid} Result={msg}")
resp = {"ok": ok, "msg": msg}

elif action == "heartbeat":
ok = server.heartbeat(lic, cid)
log(f"[HEARTBEAT] | IP={ip} License={lic} ClientID={cid} Status={ok}")
resp = {"ok": ok}

else:
resp = {"ok": False, "msg": "Unknown action"}

conn.send(json.dumps(resp).encode())

except Exception as e:
log(f"Connection error | IP={ip} Error={e}")

finally:
conn.close()
log(f"Disconnected | IP={ip}")

def socket_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(5)

log(f"Server started on {HOST}:{PORT}")

while True:
conn, addr = s.accept()
log(f"Connected | IP={addr[0]}:{addr[1]}")
t = threading.Thread(target = handle_client, args = (conn, addr), daemon = True)
t.start()

def show_menu():
console.clear()
console.print("""
[1] Create License
[2] Show Licenses
[3] Show Sessions
[4] View Logs
[5] Exit
""")

def create_license():
u = input("Username: ")
p = input("Password: ")

try:
n = int(input("Max users: "))
if n < 10 or n > 1000:
raise ValueError("Max-users must be between 10 and 1000.")
except Exception:
console.print("Invalid input. Max-users must be an integer between 10 and 1000.")
return

key = server.create_license(u, p, n)
log(f"License created: key={key}, user={u}, max={n}")

table = Table(title = "License Created")
table.add_column("Key")
table.add_column("User")
table.add_column("Max")
table.add_row(key, u, str(n))

console.print(table)

def show_licenses():
table = Table(title = "Licenses")
table.add_column("Key")
table.add_column("User")
table.add_column("Max")

for k, v in server.licenses.items():
table.add_row(k, v["username"], str(v["max_users"]))

console.print(table)


def show_sessions():
table = Table(title = "Sessions")
table.add_column("License")
table.add_column("ClientID")
table.add_column("Last Heartbeat")

for lic, clients in server.sessions.items():
for cid, info in clients.items():
table.add_row(lic, cid, str(info["last_heartbeat"]))

console.print(table)


def log_view():
global LOG_MODE
LOG_MODE = True

console.clear()
console.print("Log View (press ENTER to toggle)\n")

with log_lock:
for line in log_buffer:
console.print(line)

input()
LOG_MODE = False

def cli():
while True:
show_menu()
choice = input("> ").strip()

if choice == "1":
create_license()
input("\nPress Enter...")

elif choice == "2":
show_licenses()
input("\nPress Enter...")

elif choice == "3":
show_sessions()
input("\nPress Enter...")

elif choice == "4":
log_view()

elif choice == "5":
os._exit(0)

if __name__ == "__main__":
threading.Thread(target = socket_server, daemon = True).start()
cli()
# client.py
import json
import time
import uuid
import socket
from rich.prompt import Prompt
from rich.console import Console

console = Console()

SERVER_IP = "127.0.0.1"
PORT = 11451
HEARTBEAT_TIME = 15


class Client:
def __init__(self):
self.client_id = str(uuid.uuid4())

def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((SERVER_IP, PORT))

def send(self, data):
try:
self.sock.send(json.dumps(data).encode())
resp = self.sock.recv(4096)
return json.loads(resp.decode())
except Exception as e:
console.print(f"Failed to send data - {e}")
return {}

def auth_loop(self):
while True:
# SERVER_IP = input("Server IP: ").strip()
# PORT = int(input("Server port: ").strip())
# lic = input("License: ").strip()
SERVER_IP = Prompt.ask("Server IP", default = "127.0.0.1")
PORT = int(Prompt.ask("Server port", default = "11451"))

try:
self.connect()
except Exception as e:
console.print(f"Failed to connect to {SERVER_IP}:{PORT} - {e}")
self.sock.close()
continue

lic = Prompt.ask("License")

resp = self.send({
"action": "auth",
"license": lic,
"client_id": self.client_id
})

if "ok" in resp.keys() and resp["ok"]:
console.print("Authorization successful")
return lic
else:
console.print(f"Authorization failed: {resp['msg']}")


def run(self):

license_key = self.auth_loop()

try:
while True:
time.sleep(HEARTBEAT_TIME)

resp = self.send({
"action": "heartbeat",
"license": license_key,
"client_id": self.client_id
})

if "ok" in resp.keys() and resp["ok"]:
console.print("Heartbeat OK")

else:
console.print("Lost authorization, re-auth required")
license_key = self.auth_loop()

except KeyboardInterrupt:
console.print("Client exit")


if __name__ == "__main__":
c = Client()
c.run()

服务端菜单:

创建 License:

列出 License:

启动客户端,并尝试认证:

服务端日志:

列出 Sessions:

若客户端收不到心跳包的响应(比如发生了 Server Down),则要求重新认证:

服务端会移除过期 Session,避免僵尸占用:

Discussion

这个实现乍一看没啥问题,已经完美契合了实验要求,但如果把这个模型扔进生产环境,从网络安全的角度来看,以下问题不可忽视:

  • 序列号由个数字组成,不够安全,很容易被爆破
  • 协议报文是简单的文本,没有定义报文格式(如 TLV 格式)。由于 TCP 是面向字节流的,并不懂得如何切分报文,直接 recv(4096) 极有可能产生粘包/半包问题
  • 数据在网络层是明文传输的。抓个包就能窃取序列号,最少最少也要套一层 TLS。
  • 没有任何数据包的防重放措施,攻击者截获有效流量后可以直接重放。应在应用层协议里加 Timestamp 或者 Nonce。
  • 客户端没验证服务器的身份,随便做中间人攻击

由此可见,想实现一个安全、可靠的软件许可认证模型,并不是一件简单的事。

附上 DeepSeek V4 Pro 给出的修复版本:

"""
License Server — Secure Edition
Security: TLS 1.2+ encryption, length-prefixed framing (solves TCP streaming issues),
anti-replay via timestamp + nonce, strong license keys via secrets module,
server identity via certificate fingerprint (TOFU).
"""

import os
import json
import time
import struct
import socket
import ssl
import secrets
import hashlib
import threading
from collections import deque

from rich.console import Console
from rich.table import Table

console = Console()

# --- Constants ---
HOST = "0.0.0.0"
PORT = 11451
LICENSE_FILE = "licenses.json"
SESSION_FILE = "sessions.json"
CERT_FILE = "server_cert.pem"
KEY_FILE = "server_key.pem"
FINGERPRINT_FILE = "server_fingerprint.txt"
HEARTBEAT_TIMEOUT = 60
NONCE_WINDOW = 300 # ±5 minutes for timestamp validation
MAX_PAYLOAD = 10 * 1024 * 1024 # 10 MB sanity limit

file_lock = threading.Lock()
log_buffer = deque(maxlen=1000)
log_lock = threading.Lock()
LOG_MODE = False


# --- Logging ---
def log(msg):
line = f"[{time.strftime('%H:%M:%S')}] {msg}"
with log_lock:
log_buffer.append(line)
if LOG_MODE:
console.print(line)


# --- JSON Persistence ---
def load_json(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}


def save_json(path, data):
with file_lock:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)


# --- Certificate Generation ---
def generate_self_signed_cert():
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import ipaddress
import datetime as dt
except ImportError:
console.print("[red]Error: 'cryptography' library required.[/red]")
console.print("Install: [bold]pip install cryptography[/bold]")
return None

console.print("[yellow]Generating self-signed TLS certificate …[/yellow]")

private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)

subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Beijing"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Beijing"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "LicenseServer"),
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
])

cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(dt.datetime.now(dt.timezone.utc))
.not_valid_after(dt.datetime.now(dt.timezone.utc) + dt.timedelta(days=3650))
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]),
critical=False,
)
.sign(private_key, hashes.SHA256())
)

with open(CERT_FILE, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(KEY_FILE, "wb") as f:
f.write(private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
))

fingerprint = hashlib.sha256(
cert.public_bytes(serialization.Encoding.DER)
).hexdigest()

with open(FINGERPRINT_FILE, "w") as f:
f.write(fingerprint)

console.print("[green]Certificate generated.[/green]")
console.print(f"[bold]SHA256 Fingerprint:[/bold] [cyan]{fingerprint}[/cyan]")
return fingerprint


def get_or_create_cert():
if os.path.exists(CERT_FILE) and os.path.exists(KEY_FILE):
if os.path.exists(FINGERPRINT_FILE):
with open(FINGERPRINT_FILE) as f:
fp = f.read().strip()
else:
with open(CERT_FILE, "rb") as f:
from cryptography import x509
cert = x509.load_pem_x509_certificate(f.read())
from cryptography.hazmat.primitives import serialization
fp = hashlib.sha256(cert.public_bytes(serialization.Encoding.DER)).hexdigest()
with open(FINGERPRINT_FILE, "w") as f:
f.write(fp)
console.print("[green]Using existing TLS certificate.[/green]")
console.print(f"[bold]SHA256 Fingerprint:[/bold] [cyan]{fp}[/cyan]")
return fp
return generate_self_signed_cert()


# --- Message Framing (solves TCP sticky-packet / half-packet) ---
def send_msg(sock, data):
"""Length-prefixed JSON: 4-byte BE length + payload."""
payload = json.dumps(data).encode("utf-8")
sock.sendall(struct.pack("!I", len(payload)) + payload)


def recv_msg(sock):
"""Read a length-prefixed JSON message from the stream."""

def _read_exact(n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf

lb = _read_exact(4)
if lb is None:
return None
length = struct.unpack("!I", lb)[0]
if length > MAX_PAYLOAD:
return None

payload = _read_exact(length)
if payload is None:
return None
return json.loads(payload.decode("utf-8"))


# --- In-memory Nonce Store ---
class NonceStore:
"""Tracks seen nonces to detect replay attacks."""

def __init__(self):
self._nonces = {} # nonce → expiry_timestamp
self._lock = threading.Lock()

def check_and_add(self, nonce, now):
with self._lock:
self._cleanup(now)
if nonce in self._nonces:
return False
self._nonces[nonce] = now + NONCE_WINDOW
return True

def _cleanup(self, now):
expired = [n for n, exp in self._nonces.items() if now > exp]
for n in expired:
del self._nonces[n]


# --- Strong License Key Generation ---
def generate_license():
raw = secrets.token_hex(8) # 16 hex chars → 64 bits entropy
return f"LC-{raw[:4]}-{raw[4:8]}-{raw[8:12]}-{raw[12:16]}"


# --- License Server ---
class LicenseServer:
def __init__(self):
self.licenses = load_json(LICENSE_FILE)
self.sessions = load_json(SESSION_FILE)
self.nonce_store = NonceStore()

def cleanup(self):
now = int(time.time())
changed = False

for lic, clients in list(self.sessions.items()):
for cid, info in list(clients.items()):
if now - info["last_heartbeat"] > HEARTBEAT_TIMEOUT:
log(f"Session expired | License={lic} ClientID={cid}")
del clients[cid]
changed = True
if not clients:
del self.sessions[lic]

if changed:
save_json(SESSION_FILE, self.sessions)

def create_license(self, username, password, max_users):
key = generate_license()
self.licenses[key] = {
"username": username,
"password": password,
"max_users": max_users,
"created_at": int(time.time()),
}
save_json(LICENSE_FILE, self.licenses)
return key

def authorize(self, lic, client_id):
self.cleanup()

if lic not in self.licenses:
return False, "Invalid license"

max_users = self.licenses[lic]["max_users"]
clients = self.sessions.setdefault(lic, {})

# Existing client reconnecting
if client_id in clients:
clients[client_id]["last_heartbeat"] = int(time.time())
save_json(SESSION_FILE, self.sessions)
return True, "OK"

# New client
if len(clients) >= max_users:
return False, "Limit reached"

clients[client_id] = {"last_heartbeat": int(time.time())}
save_json(SESSION_FILE, self.sessions)
return True, "OK"

def heartbeat(self, lic, client_id):
if lic not in self.sessions:
return False
if client_id not in self.sessions[lic]:
return False

self.sessions[lic][client_id]["last_heartbeat"] = int(time.time())
save_json(SESSION_FILE, self.sessions)
return True


server = LicenseServer()


# --- Anti-replay Validation ---
def validate_request(req):
now = int(time.time())
ts = req.get("timestamp", 0)
nonce = req.get("nonce", "")

if abs(now - ts) > NONCE_WINDOW:
return False, "Request expired or clock skew too large"

if not nonce or len(nonce) < 16:
return False, "Invalid nonce"

if not server.nonce_store.check_and_add(nonce, now):
return False, "Duplicate request (possible replay attack)"

return True, "OK"


# --- Per-Client Handler ---
def handle_client(conn, addr):
ip = addr[0]

try:
while True:
req = recv_msg(conn)
if req is None:
break

valid, msg = validate_request(req)
if not valid:
log(f"[REPLAY] | IP={ip} Reason={msg}")
send_msg(conn, {"ok": False, "msg": msg})
continue

action = req.get("action")
lic = req.get("license")
cid = req.get("client_id")

if action == "auth":
ok, msg = server.authorize(lic, cid)
log(f"[AUTH] | IP={ip} License={lic} ClientID={cid} Result={msg}")
send_msg(conn, {"ok": ok, "msg": msg})

elif action == "heartbeat":
ok = server.heartbeat(lic, cid)
log(f"[HEARTBEAT] | IP={ip} License={lic} ClientID={cid} Status={ok}")
send_msg(conn, {"ok": ok})

else:
send_msg(conn, {"ok": False, "msg": "Unknown action"})

except (ssl.SSLError, ConnectionError, OSError) as e:
log(f"Connection error | IP={ip} Error={e}")
except Exception as e:
log(f"Unexpected error | IP={ip} Error={e}")
finally:
try:
conn.close()
except Exception:
pass
log(f"Disconnected | IP={ip}")


# --- TLS Socket Server ---
def create_ssl_context():
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(CERT_FILE, KEY_FILE)
return ctx


def socket_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen(5)

ssl_ctx = create_ssl_context()
log(f"Secure server listening on {HOST}:{PORT} (TLS 1.2+)")

while True:
raw_conn, addr = s.accept()
log(f"Connected | IP={addr[0]}:{addr[1]}")

try:
conn = ssl_ctx.wrap_socket(raw_conn, server_side=True)
except ssl.SSLError as e:
log(f"TLS handshake failed | IP={addr[0]} Error={e}")
try:
raw_conn.close()
except Exception:
pass
continue

t = threading.Thread(target=handle_client, args=(conn, addr), daemon=True)
t.start()


# --- Admin CLI ---
def show_menu():
console.clear()
console.print("""
[bold cyan]Secure License Server[/bold cyan]

[1] Create License
[2] Show Licenses
[3] Show Sessions
[4] View Logs
[5] Show Certificate Fingerprint
[6] Exit
""")


def create_license():
u = input("Username: ").strip()
p = input("Password: ").strip()
if not u or not p:
console.print("[red]Username and password cannot be empty.[/red]")
return

try:
n = int(input("Max users: ").strip())
if n < 1 or n > 1000:
raise ValueError
except ValueError:
console.print("[red]Max users must be an integer between 1 and 1000.[/red]")
return

key = server.create_license(u, p, n)
log(f"License created | Key={key} User={u} Max={n}")

table = Table(title="License Created")
table.add_column("Key", style="cyan")
table.add_column("User")
table.add_column("Max")
table.add_row(key, u, str(n))
console.print(table)


def show_licenses():
if not server.licenses:
console.print("[yellow]No licenses found.[/yellow]")
return

table = Table(title="Licenses")
table.add_column("Key", style="cyan")
table.add_column("User")
table.add_column("Max")
for k, v in server.licenses.items():
table.add_row(k, v["username"], str(v["max_users"]))
console.print(table)


def show_sessions():
if not server.sessions:
console.print("[yellow]No active sessions.[/yellow]")
return

table = Table(title="Active Sessions")
table.add_column("License")
table.add_column("Client ID")
table.add_column("Last Heartbeat")
for lic, clients in server.sessions.items():
for cid, info in clients.items():
table.add_row(lic, cid, str(info["last_heartbeat"]))
console.print(table)


def log_view():
global LOG_MODE
LOG_MODE = True
console.clear()
console.print("[bold]Live Log[/bold] (press Enter to return)\n")
with log_lock:
for line in log_buffer:
console.print(line)
input()
LOG_MODE = False


def show_fingerprint():
if os.path.exists(FINGERPRINT_FILE):
with open(FINGERPRINT_FILE) as f:
fp = f.read().strip()
console.print(f"[bold]Server SHA256 Fingerprint:[/bold]\n[cyan]{fp}[/cyan]")
console.print(
"\n[yellow]Give this fingerprint to clients so they can verify the server.[/yellow]"
)
else:
console.print("[red]Fingerprint file not found.[/red]")


def cli():
while True:
show_menu()
choice = input("> ").strip()

if choice == "1":
create_license()
input("\nPress Enter …")
elif choice == "2":
show_licenses()
input("\nPress Enter …")
elif choice == "3":
show_sessions()
input("\nPress Enter …")
elif choice == "4":
log_view()
elif choice == "5":
show_fingerprint()
input("\nPress Enter …")
elif choice == "6":
console.print("[yellow]Shutting down …[/yellow]")
os._exit(0)
else:
console.print("[red]Invalid choice.[/red]")
time.sleep(0.5)


if __name__ == "__main__":
console.print("[bold cyan]Secure License Server[/bold cyan]\n")

fp = get_or_create_cert()
if fp is None:
console.print("[red]Cannot set up TLS — exiting.[/red]")
os._exit(1)

threading.Thread(target=socket_server, daemon=True).start()
cli()
"""
License Client — Secure Edition
Security: TLS 1.2+ with TOFU certificate pinning, length-prefixed framing,
timestamp + nonce on every request, persistent client identity.
"""

import json
import time
import uuid
import struct
import socket
import ssl
import secrets
import hashlib
import os

from rich.prompt import Prompt
from rich.console import Console

console = Console()

SERVER_IP = "127.0.0.1"
PORT = 11451
HEARTBEAT_INTERVAL = 15
FINGERPRINT_FILE = "server_fingerprint.txt"
CLIENT_ID_FILE = "client_id.txt"


# --- Persistent client identity ---
def load_client_id():
if os.path.exists(CLIENT_ID_FILE):
with open(CLIENT_ID_FILE, "r") as f:
return f.read().strip()
cid = str(uuid.uuid4())
with open(CLIENT_ID_FILE, "w") as f:
f.write(cid)
return cid


# --- Fingerprint TOFU helpers ---
def load_saved_fingerprint():
if os.path.exists(FINGERPRINT_FILE):
with open(FINGERPRINT_FILE, "r") as f:
return f.read().strip()
return None


def save_fingerprint(fp):
with open(FINGERPRINT_FILE, "w") as f:
f.write(fp)


def get_server_fingerprint(sock):
"""Extract SHA256 fingerprint of the peer's certificate (DER form)."""
cert_der = sock.getpeercert(binary_form=True)
if cert_der is None:
return None
return hashlib.sha256(cert_der).hexdigest()


# --- Message Framing ---
def send_msg(sock, data):
payload = json.dumps(data).encode("utf-8")
sock.sendall(struct.pack("!I", len(payload)) + payload)


def recv_msg(sock):
def _read_exact(n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf

lb = _read_exact(4)
if lb is None:
return None
length = struct.unpack("!I", lb)[0]
if length > 10 * 1024 * 1024:
return None

payload = _read_exact(length)
if payload is None:
return None
return json.loads(payload.decode("utf-8"))


# --- Secure Client ---
class Client:
def __init__(self):
self.client_id = load_client_id()
self.saved_fp = load_saved_fingerprint()
self.sock = None

def _verify_server(self):
"""TOFU: verify server certificate fingerprint."""
fp = get_server_fingerprint(self.sock)
if fp is None:
console.print("[red]Server presented no certificate — aborting.[/red]")
return False

if self.saved_fp is not None:
if fp == self.saved_fp:
return True
else:
console.print("[red blink]WARNING: Server fingerprint changed![/red blink]")
console.print(f" Saved: [yellow]{self.saved_fp}[/yellow]")
console.print(f" Received: [yellow]{fp}[/yellow]")
console.print("[red]This could be a man-in-the-middle attack![/red]")
ans = Prompt.ask("Accept anyway?", choices=["y", "n"], default="n")
if ans == "y":
save_fingerprint(fp)
self.saved_fp = fp
return True
return False
else:
# First connection — TOFU
console.print(f"[yellow]First connection — server fingerprint:[/yellow]")
console.print(f" [cyan]{fp}[/cyan]")
ans = Prompt.ask("Trust this server?", choices=["y", "n"], default="y")
if ans == "y":
save_fingerprint(fp)
self.saved_fp = fp
console.print("[green]Fingerprint saved for future connections.[/green]")
return True
return False

def connect(self, host, port):
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.settimeout(10)

ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
# We verify the certificate manually (TOFU) — disable automatic check
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

try:
self.sock = ctx.wrap_socket(raw, server_hostname=host)
self.sock.connect((host, port))
except (ssl.SSLError, OSError) as e:
self.sock = None
raise ConnectionError(f"TLS connection failed: {e}") from e

if not self._verify_server():
self.sock.close()
self.sock = None
raise ConnectionError("Server identity verification failed")

console.print("[green]TLS connection established (identity verified).[/green]")

def _build_msg(self, **kwargs):
return {
**kwargs,
"client_id": self.client_id,
"timestamp": int(time.time()),
"nonce": secrets.token_hex(16),
}

def send(self, **kwargs):
if self.sock is None:
return {"ok": False, "msg": "Not connected"}

msg = self._build_msg(**kwargs)
try:
send_msg(self.sock, msg)
resp = recv_msg(self.sock)
if resp is None:
return {"ok": False, "msg": "Connection closed by server"}
return resp
except (ssl.SSLError, OSError) as e:
console.print(f"[red]Communication error: {e}[/red]")
return {"ok": False, "msg": str(e)}

def auth_loop(self):
while True:
host = Prompt.ask("Server IP", default="127.0.0.1")
port = int(Prompt.ask("Server port", default="11451"))

try:
self.connect(host, port)
except Exception as e:
console.print(f"[red]Failed to connect to {host}:{port}{e}[/red]")
continue

lic = Prompt.ask("License")

resp = self.send(action="auth", license=lic)
if resp.get("ok"):
console.print("[green]Authorization successful.[/green]")
return lic
else:
console.print(f"[red]Authorization failed: {resp.get('msg')}[/red]")
try:
self.sock.close()
except Exception:
pass
self.sock = None

def run(self):
license_key = self.auth_loop()

try:
while True:
time.sleep(HEARTBEAT_INTERVAL)

resp = self.send(action="heartbeat", license=license_key)
if resp.get("ok"):
console.print("[dim]Heartbeat OK[/dim]")
else:
console.print("[yellow]Lost authorization — re-authenticating …[/yellow]")
try:
self.sock.close()
except Exception:
pass
self.sock = None
license_key = self.auth_loop()
except KeyboardInterrupt:
console.print("\n[yellow]Client exit.[/yellow]")


if __name__ == "__main__":
c = Client()
c.run()

常见软件破解思路

了解攻击方的手段,有助于我们更好地防范软件许可认证的安全问题。

所谓的“破解”,其实就是绕过许可认证的过程,让软件误以为用户已经被授权了。

二进制可执行文件层面

从本地的客户端软件入手是最直接的破解手段。

简化一下许可认证的逻辑,无非就是:

if(auth(key)){
allow_use();
}
else{
deny_use();
}

转化为汇编看看呢?

mov eax, key
call auth

test eax, eax
jnz allow_use

deny_use:
; ...
call _exit

allow_use:
; ...
call use

如果能成功给客户端脱壳(一般来说是很难的),并成功定位到认证相关的汇编语句(一般要动态调试,静态分析很难),那么随便 patch 即可(比如条件跳转改成无条件跳转)。这是最简单的情况。

怎么防范这种攻击?加壳,虚拟机,花指令,反调试,反虚拟化,这样就可以极大地增加代码被逆向分析、内存注入、Hook 等攻击的难度和成本。

网络层面

回到计网的范畴,若能解析网络数据协议和验证逻辑,那就可以编写一个假的服务端来骗过本地的客户端(也就是前面提到的中间人攻击)。

通过配置代理,把发向官方服务器的验证请求,劫持到伪造的服务器上,由假的服务器返回假的授权响应,就轻松实现了 bypass。

针对 Windows 的 KMS 激活器就是这么干的。

小结

总的来说,软件许可认证并不是一个锦上添花的东西,而是现实商业软件不可或缺的一环。尽管破解与反破解的对抗是没有尽头的拉锯战,绝对的安全也并不存在,但至少也要有基本的网络验证与本地防逆向门槛吧。

对于开发者而言,为了保护好自己的饭碗,除了认真实现软件核心业务逻辑外,也要重视对软件资产本身的保护。倘若没有一套可靠的授权认证机制,爆肝出来的代码很容易就会被无成本地白嫖、肆意分发甚至盗卖。代码写得再好,能安心圈米,才是商业开发的最终 motivation 吧。

  • 标题: 浅谈软件许可认证
  • 作者: Coast23
  • 创建于 : 2026-05-14 11:16:02
  • 更新于 : 2026-05-14 17:06:23
  • 链接: https://coast23.github.io/2026/05/14/浅谈软件许可认证/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论