CertTransparencySearch/ct_scan.py

1508 lines
63 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import re
import shutil
import subprocess
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import psycopg
from cryptography import x509
from cryptography.x509 import general_name
from cryptography.x509.oid import ExtensionOID
from cryptography.x509.oid import NameOID
from psycopg.rows import dict_row
QUERY_SQL = """
WITH ci AS (
SELECT
min(sub.certificate_id) AS id,
min(sub.issuer_ca_id) AS issuer_ca_id,
x509_commonName(sub.certificate) AS common_name,
x509_subjectName(sub.certificate) AS subject_dn,
x509_notBefore(sub.certificate) AS not_before,
x509_notAfter(sub.certificate) AS not_after,
encode(x509_serialNumber(sub.certificate), 'hex') AS serial_number,
sub.certificate AS certificate
FROM (
SELECT cai.*
FROM certificate_and_identities cai
WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate)
AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\'
LIMIT %(max_candidates)s
) sub
GROUP BY sub.certificate
)
SELECT
ci.id,
ci.issuer_ca_id,
ca.name AS issuer_name,
ci.common_name,
ci.subject_dn,
ci.not_before,
ci.not_after,
cl.first_seen,
ci.serial_number,
coalesce(cl.revoked, 0) AS revoked_count,
rev.revocation_date,
rev.reason_code,
rev.last_seen_check_date,
crl_state.active_crl_count,
crl_state.last_checked AS crl_last_checked,
ci.certificate
FROM ci
JOIN ca ON ca.id = ci.issuer_ca_id
JOIN certificate_lifecycle cl ON cl.certificate_id = ci.id
LEFT JOIN LATERAL (
SELECT
cr.revocation_date,
cr.reason_code,
cr.last_seen_check_date
FROM crl_revoked cr
WHERE cr.ca_id = ci.issuer_ca_id
AND cr.serial_number = decode(ci.serial_number, 'hex')
ORDER BY cr.last_seen_check_date DESC NULLS LAST
LIMIT 1
) rev ON TRUE
LEFT JOIN LATERAL (
SELECT
count(*) FILTER (
WHERE crl.error_message IS NULL
AND crl.next_update > now() AT TIME ZONE 'UTC'
) AS active_crl_count,
max(crl.last_checked) AS last_checked
FROM crl
WHERE crl.ca_id = ci.issuer_ca_id
) crl_state ON TRUE
WHERE ci.not_before <= now() AT TIME ZONE 'UTC'
AND ci.not_after >= now() AT TIME ZONE 'UTC'
AND cl.certificate_type = 'Certificate'
ORDER BY cl.first_seen DESC NULLS LAST, ci.id DESC;
"""
RAW_MATCH_COUNT_SQL = """
SELECT count(*)
FROM certificate_and_identities cai
WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate)
AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\'
"""
REVOCATION_REASONS = {
1: "keyCompromise",
2: "cACompromise",
3: "affiliationChanged",
4: "superseded",
5: "cessationOfOperation",
6: "certificateHold",
8: "removeFromCRL",
9: "privilegeWithdrawn",
10: "aACompromise",
}
PRECERT_POISON_OID = x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3")
@dataclass
class DatabaseRecord:
domain: str
certificate_id: int
issuer_ca_id: int
issuer_name: str
common_name: str | None
subject_dn: str | None
not_before: datetime
not_after: datetime
first_seen: datetime | None
serial_number: str
revoked_count: int
revocation_date: datetime | None
reason_code: int | None
last_seen_check_date: datetime | None
active_crl_count: int
crl_last_checked: datetime | None
certificate_der: bytes
@dataclass
class CertificateHit:
fingerprint_sha256: str
subject_cn: str
validity_not_before: datetime
validity_not_after: datetime
san_entries: list[str]
revocation_status: str
revocation_date: datetime | None
revocation_reason: str | None
revocation_note: str | None
crtsh_crl_timestamp: datetime | None
matched_domains: set[str] = field(default_factory=set)
first_seen: datetime | None = None
crtsh_certificate_ids: set[int] = field(default_factory=set)
serial_numbers: set[str] = field(default_factory=set)
issuer_names: set[str] = field(default_factory=set)
issuer_ca_ids: set[int] = field(default_factory=set)
@dataclass
class VerificationStats:
input_rows: int = 0
unique_leaf_certificates: int = 0
non_leaf_filtered: int = 0
precertificate_poison_filtered: int = 0
@dataclass
class CertificateGroup:
group_id: str
group_type: str
member_indices: list[int]
member_count: int
distinct_subject_cn_count: int
distinct_exact_content_count: int
numbered_cn_patterns: set[str]
matched_domains: set[str]
subject_cns: set[str]
first_seen_min: datetime | None
first_seen_max: datetime | None
valid_from_min: datetime
valid_to_max: datetime
revocation_counts: Counter
@dataclass
class ScanStats:
generated_at_utc: str
configured_domains: list[str]
unique_leaf_certificates: int
groups_total: int
groups_multi_member: int
groups_singleton: int
groups_by_type: dict[str, int]
verification: VerificationStats
@dataclass
class IssuerTrustInfo:
issuer_name: str
issuer_ca_ids: set[int]
server_auth_contexts: set[str]
major_webpki: bool
def load_domains(path: Path) -> list[str]:
domains: list[str] = []
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip().lower()
if not line or line.startswith("#"):
continue
if line.startswith("*."):
line = line[2:]
domains.append(line)
unique_domains = sorted(set(domains))
if not unique_domains:
raise ValueError(f"No domains found in {path}")
return unique_domains
def escape_like(value: str) -> str:
return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def utc_iso(value: datetime | None) -> str:
if value is None:
return "n/a"
if value.tzinfo is None:
value = value.replace(tzinfo=UTC)
else:
value = value.astimezone(UTC)
return value.isoformat(timespec="seconds").replace("+00:00", "Z")
def serialize_datetime(value: datetime | None) -> str | None:
return utc_iso(value) if value is not None else None
def parse_datetime(value: str | None) -> datetime | None:
if value is None:
return None
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
def cache_path(cache_dir: Path, domain: str) -> Path:
safe_domain = "".join(ch if ch.isalnum() or ch in "-._" else "_" for ch in domain)
return cache_dir / f"{safe_domain}.json"
def record_to_cache_payload(record: DatabaseRecord) -> dict[str, Any]:
return {
"domain": record.domain,
"certificate_id": record.certificate_id,
"issuer_ca_id": record.issuer_ca_id,
"issuer_name": record.issuer_name,
"common_name": record.common_name,
"subject_dn": record.subject_dn,
"not_before": serialize_datetime(record.not_before),
"not_after": serialize_datetime(record.not_after),
"first_seen": serialize_datetime(record.first_seen),
"serial_number": record.serial_number,
"revoked_count": record.revoked_count,
"revocation_date": serialize_datetime(record.revocation_date),
"reason_code": record.reason_code,
"last_seen_check_date": serialize_datetime(record.last_seen_check_date),
"active_crl_count": record.active_crl_count,
"crl_last_checked": serialize_datetime(record.crl_last_checked),
"certificate_der_b64": base64.b64encode(record.certificate_der).decode("ascii"),
}
def record_from_cache_payload(payload: dict[str, Any]) -> DatabaseRecord:
return DatabaseRecord(
domain=payload["domain"],
certificate_id=int(payload["certificate_id"]),
issuer_ca_id=int(payload["issuer_ca_id"]),
issuer_name=payload["issuer_name"],
common_name=payload.get("common_name"),
subject_dn=payload.get("subject_dn"),
not_before=parse_datetime(payload["not_before"]) or datetime.min,
not_after=parse_datetime(payload["not_after"]) or datetime.min,
first_seen=parse_datetime(payload.get("first_seen")),
serial_number=payload["serial_number"],
revoked_count=int(payload["revoked_count"]),
revocation_date=parse_datetime(payload.get("revocation_date")),
reason_code=payload.get("reason_code"),
last_seen_check_date=parse_datetime(payload.get("last_seen_check_date")),
active_crl_count=int(payload["active_crl_count"]),
crl_last_checked=parse_datetime(payload.get("crl_last_checked")),
certificate_der=base64.b64decode(payload["certificate_der_b64"]),
)
def load_cached_records(cache_dir: Path, domain: str, ttl_seconds: int, max_candidates: int) -> list[DatabaseRecord] | None:
path = cache_path(cache_dir, domain)
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
if payload.get("version") != 1:
return None
if payload.get("max_candidates") != max_candidates:
return None
cached_at = parse_datetime(payload.get("cached_at"))
if cached_at is None:
return None
age = time.time() - cached_at.replace(tzinfo=UTC).timestamp()
if age > ttl_seconds:
return None
return [record_from_cache_payload(item) for item in payload.get("records", [])]
def store_cached_records(cache_dir: Path, domain: str, max_candidates: int, records: list[DatabaseRecord]) -> None:
cache_dir.mkdir(parents=True, exist_ok=True)
payload = {
"version": 1,
"cached_at": utc_iso(datetime.now(UTC)),
"max_candidates": max_candidates,
"records": [record_to_cache_payload(record) for record in records],
}
cache_path(cache_dir, domain).write_text(
json.dumps(payload, indent=2, sort_keys=True),
encoding="utf-8",
)
def connect() -> psycopg.Connection:
return psycopg.connect(
host="crt.sh",
port=5432,
dbname="certwatch",
user="guest",
password="guest",
connect_timeout=5,
sslmode="disable",
autocommit=True,
application_name="ct_transparency_search",
)
def query_domain(domain: str, max_candidates: int, attempts: int, verbose: bool) -> list[DatabaseRecord]:
params = {
"domain": domain,
"name_pattern": f"%{escape_like(domain)}%",
"max_candidates": max_candidates,
}
raw_match_count = query_raw_match_count(domain=domain, attempts=attempts, verbose=verbose)
if raw_match_count > max_candidates:
raise ValueError(
f"domain={domain} raw identity matches={raw_match_count} exceed max_candidates={max_candidates}; "
f"increase --max-candidates-per-domain to at least {raw_match_count} for a complete result set"
)
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
with connect() as conn, conn.cursor(row_factory=dict_row) as cur:
cur.execute(QUERY_SQL, params)
rows = cur.fetchall()
return [row_to_record(domain, row) for row in rows]
except Exception as exc:
last_error = exc
if attempt == attempts:
break
if verbose:
print(
f"[warn] domain={domain} attempt={attempt}/{attempts} failed: {exc}",
file=sys.stderr,
)
time.sleep(min(2 ** attempt, 10))
assert last_error is not None
raise last_error
def query_raw_match_count(domain: str, attempts: int, verbose: bool) -> int:
params = {
"domain": domain,
"name_pattern": f"%{escape_like(domain)}%",
}
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
with connect() as conn, conn.cursor() as cur:
cur.execute(RAW_MATCH_COUNT_SQL, params)
row = cur.fetchone()
return int(row[0])
except Exception as exc:
last_error = exc
if attempt == attempts:
break
if verbose:
print(
f"[warn] domain={domain} raw-count attempt={attempt}/{attempts} failed: {exc}",
file=sys.stderr,
)
time.sleep(min(2 ** attempt, 10))
assert last_error is not None
raise last_error
def row_to_record(domain: str, row: dict[str, Any]) -> DatabaseRecord:
return DatabaseRecord(
domain=domain,
certificate_id=int(row["id"]),
issuer_ca_id=int(row["issuer_ca_id"]),
issuer_name=row["issuer_name"],
common_name=row["common_name"],
subject_dn=row["subject_dn"],
not_before=row["not_before"],
not_after=row["not_after"],
first_seen=row["first_seen"],
serial_number=row["serial_number"],
revoked_count=int(row["revoked_count"]),
revocation_date=row["revocation_date"],
reason_code=row["reason_code"],
last_seen_check_date=row["last_seen_check_date"],
active_crl_count=int(row["active_crl_count"] or 0),
crl_last_checked=row["crl_last_checked"],
certificate_der=bytes(row["certificate"]),
)
def extract_san_entries(cert: x509.Certificate) -> list[str]:
try:
extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.ExtensionNotFound:
return []
entries: list[str] = []
for name in extension.value:
entries.append(format_general_name(name))
return sorted(set(entries), key=str.casefold)
def format_general_name(name: general_name.GeneralName) -> str:
if isinstance(name, x509.DNSName):
return f"DNS:{name.value}"
if isinstance(name, x509.RFC822Name):
return f"EMAIL:{name.value}"
if isinstance(name, x509.UniformResourceIdentifier):
return f"URI:{name.value}"
if isinstance(name, x509.IPAddress):
return f"IP:{name.value}"
if isinstance(name, x509.RegisteredID):
return f"RID:{name.value.dotted_string}"
if isinstance(name, x509.DirectoryName):
return f"DIR:{name.value.rfc4514_string()}"
if isinstance(name, x509.OtherName):
encoded = base64.b64encode(name.value).decode("ascii")
return f"OTHER:{name.type_id.dotted_string}:{encoded}"
return str(name)
def extract_common_name(cert: x509.Certificate) -> str | None:
attributes = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if not attributes:
return None
return attributes[0].value
def has_precertificate_poison(cert: x509.Certificate) -> bool:
try:
cert.extensions.get_extension_for_oid(PRECERT_POISON_OID)
except x509.ExtensionNotFound:
return False
return True
def is_leaf_certificate(cert: x509.Certificate) -> tuple[bool, str]:
if has_precertificate_poison(cert):
return (False, "precertificate_poison")
try:
basic_constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value
if basic_constraints.ca:
return (False, "basic_constraints_ca")
except x509.ExtensionNotFound:
pass
try:
key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value
if key_usage.key_cert_sign:
return (False, "key_cert_sign")
except x509.ExtensionNotFound:
pass
return (True, "leaf")
def revocation_fields(record: DatabaseRecord) -> tuple[str, datetime | None, str | None, datetime | None, str | None]:
if record.revoked_count > 0:
reason: str | None = None
if record.reason_code in REVOCATION_REASONS:
reason = REVOCATION_REASONS[record.reason_code]
elif record.reason_code not in (None, 0):
reason = f"unknown({record.reason_code})"
return ("revoked", record.revocation_date, reason, record.last_seen_check_date, None)
if record.active_crl_count > 0:
return ("not_revoked", None, None, record.crl_last_checked, None)
return ("unknown", None, None, record.crl_last_checked, "no fresh crt.sh CRL data")
def revocation_priority(status: str) -> int:
return {
"unknown": 0,
"not_revoked": 1,
"revoked": 2,
}[status]
def build_hits(records: list[DatabaseRecord]) -> tuple[list[CertificateHit], VerificationStats]:
verification = VerificationStats(input_rows=len(records))
hits: dict[str, CertificateHit] = {}
for record in records:
cert = x509.load_der_x509_certificate(record.certificate_der)
is_leaf, reason = is_leaf_certificate(cert)
if not is_leaf:
if reason == "precertificate_poison":
verification.precertificate_poison_filtered += 1
else:
verification.non_leaf_filtered += 1
continue
fingerprint_hex = hashlib.sha256(record.certificate_der).hexdigest()
subject_cn = record.common_name or extract_common_name(cert) or "-"
revocation_status, revocation_date, revocation_reason, crtsh_crl_timestamp, revocation_note = revocation_fields(record)
hit = hits.get(fingerprint_hex)
if hit is None:
hit = CertificateHit(
fingerprint_sha256=fingerprint_hex,
subject_cn=subject_cn,
validity_not_before=record.not_before,
validity_not_after=record.not_after,
san_entries=extract_san_entries(cert),
revocation_status=revocation_status,
revocation_date=revocation_date,
revocation_reason=revocation_reason,
revocation_note=revocation_note,
crtsh_crl_timestamp=crtsh_crl_timestamp,
matched_domains={record.domain},
first_seen=record.first_seen,
crtsh_certificate_ids={record.certificate_id},
serial_numbers={record.serial_number},
issuer_names={record.issuer_name},
issuer_ca_ids={record.issuer_ca_id},
)
hits[fingerprint_hex] = hit
continue
hit.matched_domains.add(record.domain)
hit.crtsh_certificate_ids.add(record.certificate_id)
hit.serial_numbers.add(record.serial_number)
hit.issuer_names.add(record.issuer_name)
hit.issuer_ca_ids.add(record.issuer_ca_id)
if hit.first_seen is None or (record.first_seen is not None and record.first_seen < hit.first_seen):
hit.first_seen = record.first_seen
if revocation_priority(revocation_status) > revocation_priority(hit.revocation_status):
hit.revocation_status = revocation_status
hit.revocation_date = revocation_date
hit.revocation_reason = revocation_reason
hit.revocation_note = revocation_note
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is not None and crtsh_crl_timestamp is not None:
if crtsh_crl_timestamp > hit.crtsh_crl_timestamp:
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is None:
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
ordered_hits = sorted(
hits.values(),
key=lambda hit: (
sorted(hit.matched_domains),
hit.subject_cn.casefold(),
hit.validity_not_before,
hit.fingerprint_sha256,
),
)
verification.unique_leaf_certificates = len(ordered_hits)
return (ordered_hits, verification)
def canonicalize_subject_cn(subject_cn: str) -> str:
subject_cn = subject_cn.lower()
if subject_cn.startswith("www."):
return subject_cn[4:]
return subject_cn
def normalize_counter_pattern(hostname: str) -> str | None:
normalized = re.sub(r"\d+", "#", canonicalize_subject_cn(hostname))
if normalized == canonicalize_subject_cn(hostname):
return None
return normalized
class UnionFind:
def __init__(self, size: int) -> None:
self.parent = list(range(size))
self.rank = [0] * size
def find(self, value: int) -> int:
while self.parent[value] != value:
self.parent[value] = self.parent[self.parent[value]]
value = self.parent[value]
return value
def union(self, left: int, right: int) -> None:
left_root = self.find(left)
right_root = self.find(right)
if left_root == right_root:
return
if self.rank[left_root] < self.rank[right_root]:
left_root, right_root = right_root, left_root
self.parent[right_root] = left_root
if self.rank[left_root] == self.rank[right_root]:
self.rank[left_root] += 1
def build_groups(hits: list[CertificateHit]) -> list[CertificateGroup]:
if not hits:
return []
canonical_cns_by_pattern: dict[str, set[str]] = defaultdict(set)
for hit in hits:
pattern = normalize_counter_pattern(hit.subject_cn)
if pattern is not None:
canonical_cns_by_pattern[pattern].add(canonicalize_subject_cn(hit.subject_cn))
qualifying_patterns = {
pattern
for pattern, canonical_cns in canonical_cns_by_pattern.items()
if len(canonical_cns) > 1
}
components: dict[tuple[str, str], list[int]] = defaultdict(list)
for index, hit in enumerate(hits):
canonical_cn = canonicalize_subject_cn(hit.subject_cn)
pattern = normalize_counter_pattern(hit.subject_cn)
if pattern in qualifying_patterns:
components[("pattern", pattern)].append(index)
else:
components[("exact", canonical_cn)].append(index)
provisional_groups: list[CertificateGroup] = []
for (family_kind, family_key), member_indices in components.items():
member_hits = [hits[index] for index in member_indices]
subject_cns = {hit.subject_cn for hit in member_hits}
unique_san_profiles = {tuple(hit.san_entries) for hit in member_hits}
numbered_patterns = {family_key} if family_kind == "pattern" else set()
group_type = "numbered_cn_pattern" if family_kind == "pattern" else "exact_endpoint_family"
first_seen_values = [hit.first_seen for hit in member_hits if hit.first_seen is not None]
provisional_groups.append(
CertificateGroup(
group_id="",
group_type=group_type,
member_indices=sorted(member_indices),
member_count=len(member_indices),
distinct_subject_cn_count=len(subject_cns),
distinct_exact_content_count=len(unique_san_profiles),
numbered_cn_patterns=numbered_patterns,
matched_domains={domain for hit in member_hits for domain in hit.matched_domains},
subject_cns=subject_cns,
first_seen_min=min(first_seen_values) if first_seen_values else None,
first_seen_max=max(first_seen_values) if first_seen_values else None,
valid_from_min=min(hit.validity_not_before for hit in member_hits),
valid_to_max=max(hit.validity_not_after for hit in member_hits),
revocation_counts=Counter(hit.revocation_status for hit in member_hits),
)
)
provisional_groups.sort(
key=lambda group: (
-group.member_count,
group.group_type,
min(canonicalize_subject_cn(value) for value in group.subject_cns),
)
)
for position, group in enumerate(provisional_groups, start=1):
group.group_id = f"G{position:04d}"
return provisional_groups
def describe_group_basis(group: CertificateGroup) -> str:
if group.group_type == "numbered_cn_pattern":
pattern = next(iter(group.numbered_cn_patterns))
return f"CN pattern with running-number slot: `{pattern}`"
base = min(canonicalize_subject_cn(value) for value in group.subject_cns)
return f"Same endpoint CN family (exact CN; `www.` grouped with base name): `{base}`"
def primary_issuer_name(hit: CertificateHit) -> str:
return sorted(hit.issuer_names)[0]
def query_issuer_trust(hits: list[CertificateHit]) -> dict[str, IssuerTrustInfo]:
issuer_name_to_ca_ids: dict[str, set[int]] = defaultdict(set)
for hit in hits:
issuer_name_to_ca_ids[primary_issuer_name(hit)].update(hit.issuer_ca_ids)
all_ca_ids = sorted({ca_id for ca_ids in issuer_name_to_ca_ids.values() for ca_id in ca_ids})
contexts_by_ca_id: dict[int, set[str]] = defaultdict(set)
if all_ca_ids:
query = """
SELECT ctp.ca_id, tc.ctx
FROM ca_trust_purpose ctp
JOIN trust_context tc ON tc.id = ctp.trust_context_id
JOIN trust_purpose tp ON tp.id = ctp.trust_purpose_id
WHERE ctp.ca_id = ANY(%s)
AND tp.purpose = 'Server Authentication'
AND ctp.is_time_valid = TRUE
AND ctp.disabled_from IS NULL
"""
with connect() as conn, conn.cursor() as cur:
cur.execute(query, (all_ca_ids,))
for ca_id, trust_context in cur.fetchall():
contexts_by_ca_id[int(ca_id)].add(str(trust_context))
major_contexts = {"Mozilla", "Chrome", "Apple", "Microsoft", "Android"}
results: dict[str, IssuerTrustInfo] = {}
for issuer_name, ca_ids in issuer_name_to_ca_ids.items():
merged_contexts = {ctx for ca_id in ca_ids for ctx in contexts_by_ca_id.get(ca_id, set())}
results[issuer_name] = IssuerTrustInfo(
issuer_name=issuer_name,
issuer_ca_ids=set(ca_ids),
server_auth_contexts=merged_contexts,
major_webpki=major_contexts.issubset(merged_contexts),
)
return results
def status_marker(status: str) -> str:
return {
"not_revoked": "OK ",
"revoked": "REV",
"unknown": "UNK",
}[status]
def one_line_revocation(hit: CertificateHit) -> str:
if hit.revocation_status == "revoked":
detail = f"revoked {utc_iso(hit.revocation_date)}" if hit.revocation_date else "revoked"
if hit.revocation_reason:
detail += f", reason={hit.revocation_reason}"
return detail
if hit.revocation_status == "unknown":
if hit.revocation_note:
return f"unknown, {hit.revocation_note}"
return "unknown"
return "not revoked"
def san_tail_split(domain: str) -> tuple[list[str], str]:
labels = domain.split(".")
common_second_level = {"ac", "co", "com", "edu", "gov", "net", "org"}
suffix_len = 2
if len(labels) >= 3 and len(labels[-1]) == 2 and labels[-2] in common_second_level:
suffix_len = 3
if len(labels) <= suffix_len:
return ([], domain)
return (labels[:-suffix_len], ".".join(labels[-suffix_len:]))
def build_san_tree_lines(san_entries: list[str]) -> list[str]:
return build_san_tree_lines_with_style(san_entries, ascii_only=False)
def build_san_tree_units_with_style(san_entries: list[str], ascii_only: bool) -> list[list[str]]:
dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")})
other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")})
tree: dict[str, Any] = {}
for domain in dns_entries:
prefix_labels, tail = san_tail_split(domain)
cursor = tree
for label in prefix_labels:
cursor = cursor.setdefault(label, {})
cursor.setdefault(tail, {})
def render(node: dict[str, Any], prefix: str = "") -> list[str]:
lines: list[str] = []
keys = sorted(node.keys(), key=str.casefold)
for index, key in enumerate(keys):
is_last = index == len(keys) - 1
if ascii_only:
connector = "`- " if is_last else "|- "
else:
connector = "└─ " if is_last else "├─ "
lines.append(prefix + connector + key)
child = node[key]
if ascii_only:
child_prefix = prefix + (" " if is_last else "| ")
else:
child_prefix = prefix + (" " if is_last else "")
lines.extend(render(child, child_prefix))
return lines
units: list[list[str]] = []
for key in sorted(tree.keys(), key=str.casefold):
units.append(render({key: tree[key]}))
for entry in other_entries:
units.append([f"{'*' if ascii_only else ''} {entry}"])
if not units:
units.append([f"{'*' if ascii_only else ''} -"])
return units
def build_san_tree_chunks_with_style(
san_entries: list[str],
ascii_only: bool,
max_lines_per_chunk: int = 24,
) -> list[list[str]]:
chunks: list[list[str]] = []
current_chunk: list[str] = []
current_lines = 0
def flush_current_chunk() -> None:
nonlocal current_chunk, current_lines
if current_chunk:
chunks.append(current_chunk)
current_chunk = []
current_lines = 0
for unit in build_san_tree_units_with_style(san_entries, ascii_only=ascii_only):
if len(unit) > max_lines_per_chunk:
flush_current_chunk()
for start in range(0, len(unit), max_lines_per_chunk):
chunks.append(unit[start : start + max_lines_per_chunk])
continue
if current_chunk and current_lines + len(unit) > max_lines_per_chunk:
flush_current_chunk()
current_chunk.extend(unit)
current_lines += len(unit)
flush_current_chunk()
return chunks
def build_san_tree_lines_with_style(san_entries: list[str], ascii_only: bool) -> list[str]:
lines: list[str] = []
for chunk in build_san_tree_chunks_with_style(
san_entries,
ascii_only=ascii_only,
max_lines_per_chunk=10_000,
):
lines.extend(chunk)
return lines
def group_hits_by_issuer(hits: list[CertificateHit]) -> tuple[dict[str, list[CertificateHit]], list[str]]:
issuer_hits: dict[str, list[CertificateHit]] = defaultdict(list)
for hit in hits:
issuer_hits[primary_issuer_name(hit)].append(hit)
ordered_issuers = sorted(
issuer_hits,
key=lambda issuer_name: (-len(issuer_hits[issuer_name]), issuer_name.casefold()),
)
return issuer_hits, ordered_issuers
def latex_escape(value: str) -> str:
replacements = {
"\\": r"\textbackslash{}",
"&": r"\&",
"%": r"\%",
"$": r"\$",
"#": r"\#",
"_": r"\_",
"{": r"\{",
"}": r"\}",
"~": r"\textasciitilde{}",
"^": r"\textasciicircum{}",
}
return "".join(replacements.get(char, char) for char in value)
def summarize_san_patterns(san_entries: list[str]) -> dict[str, Any]:
dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}, key=str.casefold)
other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}, key=str.casefold)
zone_counts: Counter[str] = Counter()
normalized_pattern_counts: Counter[str] = Counter()
wildcard_count = 0
numbered_count = 0
for domain in dns_entries:
normalized_domain = domain[2:] if domain.startswith("*.") else domain
if domain.startswith("*."):
wildcard_count += 1
if re.search(r"\d", normalized_domain):
numbered_count += 1
prefix_labels, tail = san_tail_split(normalized_domain)
zone_counts[tail] += 1
normalized_prefix = ".".join(re.sub(r"\d+", "#", label) for label in prefix_labels if label)
if normalized_prefix:
normalized_pattern_counts[f"{normalized_prefix}.{tail}"] += 1
else:
normalized_pattern_counts[tail] += 1
repeating_patterns = [
(pattern, count)
for pattern, count in normalized_pattern_counts.most_common(6)
if count > 1
]
return {
"dns_count": len(dns_entries),
"other_count": len(other_entries),
"wildcard_count": wildcard_count,
"numbered_count": numbered_count,
"zone_count": len(zone_counts),
"top_zones": zone_counts.most_common(6),
"repeating_patterns": repeating_patterns,
}
def latex_status_badge(status: str) -> str:
return {
"not_revoked": r"\StatusOK{}",
"revoked": r"\StatusREV{}",
"unknown": r"\StatusUNK{}",
}[status]
def latex_webpki_badge(value: bool) -> str:
return r"\WebPKIYes{}" if value else r"\WebPKINo{}"
def render_markdown_report(
path: Path,
hits: list[CertificateHit],
groups: list[CertificateGroup],
stats: ScanStats,
issuer_trust: dict[str, IssuerTrustInfo],
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
issuer_hits, ordered_issuers = group_hits_by_issuer(hits)
lines: list[str] = []
lines.append("# Certificate CN Family Report")
lines.append("")
lines.append(f"Generated: {stats.generated_at_utc}")
lines.append(f"Configured domains: {', '.join(stats.configured_domains)}")
lines.append("")
lines.append("## What This File Contains")
lines.append("")
lines.append("- Chapters are built from Subject CN construction only.")
lines.append("- If multiple concrete CNs share the same numbered schema, they are grouped together.")
lines.append("- Otherwise the chapter is one endpoint family; `www.` is grouped with the base name as a low-signal convenience.")
lines.append("- SAN entries are shown only inside each Subject CN subsection.")
lines.append("- All certificates shown here are verified leaf certificates.")
lines.append("")
lines.append("## Issuer Overview")
lines.append("")
for issuer_name in ordered_issuers:
trust = issuer_trust[issuer_name]
ca_ids = ", ".join(str(value) for value in sorted(trust.issuer_ca_ids))
trust_label = "YES" if trust.major_webpki else "NO"
lines.append(
f"- {issuer_name} | certificates={len(issuer_hits[issuer_name])} | WebPKI server-auth in major stores={trust_label} | ca_id={ca_ids}"
)
lines.append("")
lines.append("## Leaf-Certificate Assurance")
lines.append("")
lines.append("- SQL filter: `certificate_lifecycle.certificate_type = 'Certificate'`")
lines.append("- Local filter: precertificate poison absent, `BasicConstraints.ca != true`, `KeyUsage.keyCertSign != true`")
lines.append(f"- Verified leaf certificates kept: {stats.unique_leaf_certificates}")
lines.append(f"- Non-leaf filtered after download: {stats.verification.non_leaf_filtered}")
lines.append(f"- Precertificate poison filtered after download: {stats.verification.precertificate_poison_filtered}")
lines.append("")
for issuer_position, issuer_name in enumerate(ordered_issuers, start=1):
trust = issuer_trust[issuer_name]
issuer_title = f"Issuer {issuer_position:02d} {issuer_name}"
lines.append(f"## {issuer_title}")
lines.append("")
lines.append(f"- Certificates under issuer: {len(issuer_hits[issuer_name])}")
lines.append(
f"- WebPKI server-auth in major stores (Mozilla, Chrome, Apple, Microsoft, Android): {'YES' if trust.major_webpki else 'NO'}"
)
lines.append(
f"- Server-auth trust contexts seen in crt.sh live trust data: {', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none'}"
)
lines.append(f"- Issuer CA IDs: {', '.join(str(value) for value in sorted(trust.issuer_ca_ids))}")
lines.append("")
issuer_groups = build_groups(issuer_hits[issuer_name])
for family_index, group in enumerate(issuer_groups, start=1):
member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices]
chapter_title = f"Family {family_index:02d} {describe_group_basis(group)}"
lines.append(f"### {chapter_title}")
lines.append("")
lines.append(f"- Certificates in chapter: {group.member_count}")
lines.append(f"- Concrete Subject CNs: {group.distinct_subject_cn_count}")
lines.append(f"- Distinct SAN profiles in chapter: {group.distinct_exact_content_count}")
lines.append(f"- Matched domains: {', '.join(sorted(group.matched_domains))}")
lines.append(f"- Family validity span: {utc_iso(group.valid_from_min)} -> {utc_iso(group.valid_to_max)}")
if group.first_seen_min and group.first_seen_max:
lines.append(f"- First seen span: {utc_iso(group.first_seen_min)} -> {utc_iso(group.first_seen_max)}")
lines.append(f"- Revocation mix: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown")
lines.append("")
hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list)
for hit in member_hits:
hits_by_subject[hit.subject_cn].append(hit)
ordered_subjects = sorted(
hits_by_subject.keys(),
key=lambda value: (canonicalize_subject_cn(value), value.casefold()),
)
for subject_cn in ordered_subjects:
subject_hits = sorted(
hits_by_subject[subject_cn],
key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256),
)
lines.append(f"#### Subject CN: `{subject_cn}`")
lines.append("")
lines.append(f"- Certificates under this CN: {len(subject_hits)}")
lines.append(f"- Validity span under this CN: {utc_iso(min(hit.validity_not_before for hit in subject_hits))} -> {utc_iso(max(hit.validity_not_after for hit in subject_hits))}")
san_profiles: dict[tuple[str, ...], list[CertificateHit]] = defaultdict(list)
for hit in subject_hits:
san_profiles[tuple(hit.san_entries)].append(hit)
profile_size_counts = Counter(len(profile) for profile in san_profiles)
unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries})
lines.append(f"- Distinct SAN profiles under this CN: {len(san_profiles)}")
lines.append(
"- SAN profile sizes seen: "
+ ", ".join(
f"{size} SAN x {count}"
for size, count in sorted(profile_size_counts.items())
)
)
lines.append("")
lines.append("Validity history")
lines.append("")
for hit in subject_hits:
crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids))
lines.append(
f"- [{status_marker(hit.revocation_status)}] {utc_iso(hit.validity_not_before)} -> {utc_iso(hit.validity_not_after)} | SANs={len(hit.san_entries)} | crt.sh={crtsh_ids} | {one_line_revocation(hit)}"
)
lines.append("")
lines.append("SAN structure")
lines.append("")
lines.append("```text")
for tree_line in build_san_tree_lines(unique_san_entries):
lines.append(tree_line)
lines.append("```")
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Statistics")
lines.append("")
lines.append(f"- Unique leaf certificates: {stats.unique_leaf_certificates}")
lines.append(f"- CN-family chapters: {stats.groups_total}")
lines.append(f"- Chapters with more than one certificate: {stats.groups_multi_member}")
lines.append(f"- Single-certificate chapters: {stats.groups_singleton}")
lines.append(f"- Numbered CN pattern chapters: {stats.groups_by_type.get('numbered_cn_pattern', 0)}")
lines.append(f"- Exact endpoint chapters: {stats.groups_by_type.get('exact_endpoint_family', 0)}")
lines.append("")
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def render_latex_report(
path: Path,
hits: list[CertificateHit],
groups: list[CertificateGroup],
stats: ScanStats,
issuer_trust: dict[str, IssuerTrustInfo],
show_page_numbers: bool = True,
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
issuer_hits, ordered_issuers = group_hits_by_issuer(hits)
revoked_total = sum(1 for hit in hits if hit.revocation_status == "revoked")
unknown_total = sum(1 for hit in hits if hit.revocation_status == "unknown")
not_revoked_total = sum(1 for hit in hits if hit.revocation_status == "not_revoked")
lines: list[str] = [
r"\documentclass[11pt]{article}",
r"\usepackage[a4paper,margin=18mm]{geometry}",
r"\usepackage{fontspec}",
r"\usepackage[table]{xcolor}",
r"\usepackage{microtype}",
r"\usepackage{hyperref}",
r"\usepackage{xurl}",
r"\usepackage{array}",
r"\usepackage{booktabs}",
r"\usepackage{tabularx}",
r"\usepackage{longtable}",
r"\usepackage{enumitem}",
r"\usepackage{titlesec}",
r"\usepackage[most]{tcolorbox}",
r"\usepackage{fancyvrb}",
r"\usepackage{needspace}",
r"\defaultfontfeatures{Ligatures=TeX,Scale=MatchLowercase}",
r"\definecolor{Ink}{HTML}{17202A}",
r"\definecolor{Muted}{HTML}{667085}",
r"\definecolor{Line}{HTML}{D0D5DD}",
r"\definecolor{Panel}{HTML}{F8FAFC}",
r"\definecolor{Accent}{HTML}{0F766E}",
r"\definecolor{AccentSoft}{HTML}{E6F4F1}",
r"\definecolor{AccentLine}{HTML}{74C4B8}",
r"\definecolor{Warn}{HTML}{9A6700}",
r"\definecolor{WarnSoft}{HTML}{FFF4DB}",
r"\definecolor{Danger}{HTML}{B42318}",
r"\definecolor{DangerSoft}{HTML}{FEE4E2}",
r"\definecolor{OkText}{HTML}{065F46}",
r"\definecolor{OkSoft}{HTML}{DCFCE7}",
r"\definecolor{UnknownText}{HTML}{9A6700}",
r"\definecolor{UnknownSoft}{HTML}{FEF3C7}",
r"\hypersetup{colorlinks=true,linkcolor=Accent,urlcolor=Accent,pdfauthor={CertTransparencySearch},pdftitle={Certificate Transparency Endpoint Atlas}}",
r"\setlength{\parindent}{0pt}",
r"\setlength{\parskip}{6pt}",
r"\setlength{\emergencystretch}{3em}",
r"\setlength{\footskip}{24pt}",
r"\setlength{\tabcolsep}{4.2pt}",
r"\renewcommand{\arraystretch}{1.12}",
r"\raggedbottom",
r"\setcounter{tocdepth}{2}",
rf"\pagestyle{{{'plain' if show_page_numbers else 'empty'}}}",
r"\titleformat{\section}{\sffamily\bfseries\LARGE\color{Ink}\raggedright}{\thesection}{0.8em}{}",
r"\titleformat{\subsection}{\sffamily\bfseries\Large\color{Ink}\raggedright}{\thesubsection}{0.8em}{}",
r"\titleformat{\subsubsection}{\sffamily\bfseries\normalsize\color{Ink}\raggedright}{\thesubsubsection}{0.8em}{}",
r"\tcbset{",
r" panel/.style={enhanced,breakable,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=white,colframe=Line},",
r" hero/.style={panel,colback=Ink,colframe=Ink,left=14pt,right=14pt,top=14pt,bottom=14pt},",
r" summary/.style={panel,colback=Panel,colframe=Line},",
r" issuerpanel/.style={panel,colback=Panel,colframe=Ink!45},",
r" familypanel/.style={panel,colback=AccentSoft,colframe=AccentLine},",
r" subjectpanel/.style={panel,colback=white,colframe=Line},",
r" treepanel/.style={enhanced,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=Panel,colframe=AccentLine},",
r"}",
r"\newcommand{\DomainChip}[1]{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=AccentSoft]{\sffamily\footnotesize\texttt{#1}}}",
r"\newcommand{\MetricChip}[2]{\tcbox[on line,boxrule=0pt,arc=3pt,left=6pt,right=6pt,top=3pt,bottom=3pt,colback=Panel]{\sffamily\footnotesize\textcolor{Muted}{#1}\hspace{0.45em}\textbf{#2}}}",
r"\newcommand{\StatusOK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{OK}}}",
r"\newcommand{\StatusREV}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{REV}}}",
r"\newcommand{\StatusUNK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=UnknownSoft]{\sffamily\bfseries\footnotesize\textcolor{UnknownText}{UNK}}}",
r"\newcommand{\WebPKIYes}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{WebPKI: YES}}}",
r"\newcommand{\WebPKINo}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{WebPKI: NO}}}",
r"\begin{document}",
r"\begin{titlepage}",
r"\thispagestyle{empty}",
r"\vspace*{20mm}",
r"\begin{tcolorbox}[hero]",
r"{\color{white}\sffamily\bfseries\fontsize{24}{28}\selectfont Certificate Transparency Endpoint Atlas\par}",
r"\vspace{4pt}",
r"{\color{white}\Large Currently valid leaf certificates matching the configured domains\par}",
r"\vspace{12pt}",
r"{\color{white}\sffamily\small This artefact is optimized for review: issuer-first navigation, CN-family grouping, certificate timelines, and SAN structure blocks designed to be read rather than decoded.}",
r"\end{tcolorbox}",
r"\vspace{10mm}",
r"\begin{tcolorbox}[summary]",
rf"\textbf{{Generated}}: {latex_escape(stats.generated_at_utc)}\par",
r"\textbf{Configured domains}: " + " ".join(
rf"\DomainChip{{{latex_escape(domain)}}}" for domain in stats.configured_domains
),
r"\par\medskip",
r"\MetricChip{Leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " "
+ r"\MetricChip{CN families}{" + str(stats.groups_total) + r"}" + " "
+ r"\MetricChip{Numbered families}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " "
+ r"\MetricChip{Exact families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}",
r"\par\medskip",
r"\MetricChip{Not revoked}{" + str(not_revoked_total) + r"}" + " "
+ r"\MetricChip{Revoked}{" + str(revoked_total) + r"}" + " "
+ r"\MetricChip{Unknown}{" + str(unknown_total) + r"}",
r"\end{tcolorbox}",
r"\vfill",
r"{\sffamily\small\textcolor{Muted}{Same scan, three outputs: Markdown for editor preview, LaTeX for source control, PDF for distribution.}}",
r"\end{titlepage}",
r"\tableofcontents",
r"\clearpage",
r"\section*{Executive Summary}",
r"\addcontentsline{toc}{section}{Executive Summary}",
r"\begin{tcolorbox}[summary]",
r"\textbf{Reading guide}\par",
r"Major chapters are exact issuer names. Inside each issuer, families are derived only from the construction of the Subject CN. Each concrete Subject CN then gets its own certificate timeline and a SAN structure panel.\par",
r"\medskip",
r"\textbf{Leaf-only assurance}\par",
r"SQL excludes entries whose lifecycle type is not \texttt{Certificate}. Local parsing then rejects any artifact with precertificate poison, \texttt{BasicConstraints.ca = true}, or \texttt{KeyUsage.keyCertSign = true}.",
r"\end{tcolorbox}",
r"\begin{tcolorbox}[summary]",
r"\textbf{Issuer landscape}\par",
r"\medskip",
r"\begin{tabularx}{\linewidth}{>{\raggedright\arraybackslash}X >{\raggedleft\arraybackslash}p{1.7cm} >{\raggedleft\arraybackslash}p{1.9cm} >{\raggedleft\arraybackslash}p{2.0cm}}",
r"\toprule",
r"Issuer & Certificates & Share & WebPKI \\",
r"\midrule",
]
total_hits = len(hits) if hits else 1
for issuer_name in ordered_issuers:
issuer_count = len(issuer_hits[issuer_name])
share = f"{issuer_count / total_hits:.1%}"
lines.append(
rf"{latex_escape(issuer_name)} & {issuer_count} & {latex_escape(share)} & {latex_webpki_badge(issuer_trust[issuer_name].major_webpki)} \\"
)
lines.extend(
[
r"\bottomrule",
r"\end{tabularx}",
r"\end{tcolorbox}",
]
)
for issuer_position, issuer_name in enumerate(ordered_issuers, start=1):
trust = issuer_trust[issuer_name]
issuer_groups = build_groups(issuer_hits[issuer_name])
lines.extend(
[
r"\clearpage",
rf"\section{{Issuer {issuer_position:02d}: {latex_escape(issuer_name)}}}",
r"\begin{tcolorbox}[issuerpanel]",
r"\MetricChip{Certificates}{" + str(len(issuer_hits[issuer_name])) + r"}" + " "
+ r"\MetricChip{Families}{" + str(len(issuer_groups)) + r"}" + " "
+ latex_webpki_badge(trust.major_webpki),
r"\par\medskip",
rf"\textbf{{Trust contexts seen in crt.sh live data}}: {latex_escape(', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none')}\par",
rf"\textbf{{Issuer CA IDs}}: {latex_escape(', '.join(str(value) for value in sorted(trust.issuer_ca_ids)))}",
r"\end{tcolorbox}",
]
)
for family_index, group in enumerate(issuer_groups, start=1):
member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices]
lines.extend(
[
r"\Needspace{14\baselineskip}",
rf"\subsection{{Family {family_index:02d}: {latex_escape(describe_group_basis(group).replace('`', ''))}}}",
r"\begin{tcolorbox}[familypanel]",
r"\MetricChip{Certificates}{" + str(group.member_count) + r"}" + " "
+ r"\MetricChip{Concrete CNs}{" + str(group.distinct_subject_cn_count) + r"}" + " "
+ r"\MetricChip{Distinct SAN profiles}{" + str(group.distinct_exact_content_count) + r"}",
r"\par\medskip",
rf"\textbf{{Matched domains}}: {' '.join(rf'\DomainChip{{{latex_escape(domain)}}}' for domain in sorted(group.matched_domains))}\par",
rf"\textbf{{Family validity span}}: \texttt{{{latex_escape(utc_iso(group.valid_from_min))}}} to \texttt{{{latex_escape(utc_iso(group.valid_to_max))}}}\par",
(
rf"\textbf{{First seen span}}: \texttt{{{latex_escape(utc_iso(group.first_seen_min))}}} to \texttt{{{latex_escape(utc_iso(group.first_seen_max))}}}\par"
if group.first_seen_min and group.first_seen_max
else ""
),
rf"\textbf{{Revocation mix}}: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown",
r"\end{tcolorbox}",
]
)
hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list)
for hit in member_hits:
hits_by_subject[hit.subject_cn].append(hit)
ordered_subjects = sorted(
hits_by_subject.keys(),
key=lambda value: (canonicalize_subject_cn(value), value.casefold()),
)
for subject_cn in ordered_subjects:
subject_hits = sorted(
hits_by_subject[subject_cn],
key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256),
)
san_summary = summarize_san_patterns(sorted({entry for hit in subject_hits for entry in hit.san_entries}))
unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries})
lines.extend(
[
r"\Needspace{18\baselineskip}",
rf"\subsubsection{{Subject CN: {latex_escape(subject_cn)}}}",
r"\begin{tcolorbox}[subjectpanel]",
r"\MetricChip{Certificates under this CN}{" + str(len(subject_hits)) + r"}" + " "
+ r"\MetricChip{Distinct SAN profiles}{" + str(len({tuple(hit.san_entries) for hit in subject_hits})) + r"}" + " "
+ r"\MetricChip{Unique SAN entries}{" + str(len(unique_san_entries)) + r"}",
r"\par\medskip",
rf"\textbf{{Validity span under this CN}}: \texttt{{{latex_escape(utc_iso(min(hit.validity_not_before for hit in subject_hits)))}}} to \texttt{{{latex_escape(utc_iso(max(hit.validity_not_after for hit in subject_hits)))}}}",
r"\par\medskip",
r"\textbf{Certificate timeline}",
r"\begin{itemize}[leftmargin=1.4em,itemsep=0.55em,topsep=0.4em]",
]
)
for hit in subject_hits:
crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids))
lines.extend(
[
r"\item "
+ latex_status_badge(hit.revocation_status)
+ " "
+ rf"\texttt{{{latex_escape(utc_iso(hit.validity_not_before))}}} to \texttt{{{latex_escape(utc_iso(hit.validity_not_after))}}}",
rf"\newline \textcolor{{Muted}}{{SANs: {len(hit.san_entries)} \quad crt.sh: {latex_escape(crtsh_ids)} \quad {latex_escape(one_line_revocation(hit))}}}",
]
)
tree_chunks = build_san_tree_chunks_with_style(
unique_san_entries,
ascii_only=True,
max_lines_per_chunk=24,
)
lines.extend(
[
r"\end{itemize}",
r"\medskip",
r"\textbf{SAN pattern snapshot}",
r"\par\medskip",
r"\MetricChip{DNS SANs}{" + str(san_summary["dns_count"]) + r"}" + " "
+ r"\MetricChip{Other SANs}{" + str(san_summary["other_count"]) + r"}" + " "
+ r"\MetricChip{Wildcard SANs}{" + str(san_summary["wildcard_count"]) + r"}" + " "
+ r"\MetricChip{Numbered SANs}{" + str(san_summary["numbered_count"]) + r"}" + " "
+ r"\MetricChip{DNS zones}{" + str(san_summary["zone_count"]) + r"}",
r"\par\medskip",
rf"\textbf{{Dominant zones}}: {latex_escape(', '.join(f'{zone} ({count})' for zone, count in san_summary['top_zones']) if san_summary['top_zones'] else 'none')}",
r"\par",
rf"\textbf{{Repeating host schemas}}: {latex_escape(', '.join(f'{pattern} ({count})' for pattern, count in san_summary['repeating_patterns']) if san_summary['repeating_patterns'] else 'mostly one-off SAN hostnames')}",
(
rf"\par\medskip\textcolor{{Muted}}{{The SAN structure below is shown in {len(tree_chunks)} intact panels so the visual grouping is not broken across a page.}}"
if len(tree_chunks) > 1
else ""
),
r"\end{tcolorbox}",
]
)
for tree_chunk_index, tree_lines in enumerate(tree_chunks, start=1):
tree_title = (
"SAN Structure"
if len(tree_chunks) == 1
else f"SAN Structure ({tree_chunk_index}/{len(tree_chunks)})"
)
tree_needspace = max(12, min(len(tree_lines) + 7, 32))
lines.extend(
[
rf"\Needspace{{{tree_needspace}\baselineskip}}",
rf"\begin{{tcolorbox}}[treepanel,title={{{latex_escape(tree_title)}}}]",
r"\begin{Verbatim}[fontsize=\footnotesize]",
]
)
lines.extend(tree_lines)
lines.extend(
[
r"\end{Verbatim}",
r"\end{tcolorbox}",
]
)
lines.extend(
[
r"\clearpage",
r"\section*{Statistics}",
r"\addcontentsline{toc}{section}{Statistics}",
r"\begin{tcolorbox}[summary]",
r"\MetricChip{Unique leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " "
+ r"\MetricChip{CN-family chapters}{" + str(stats.groups_total) + r"}" + " "
+ r"\MetricChip{Multi-certificate chapters}{" + str(stats.groups_multi_member) + r"}" + " "
+ r"\MetricChip{Singleton chapters}{" + str(stats.groups_singleton) + r"}",
r"\par\medskip",
r"\MetricChip{Numbered CN patterns}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " "
+ r"\MetricChip{Exact endpoint families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}" + " "
+ r"\MetricChip{Non-leaf filtered}{" + str(stats.verification.non_leaf_filtered) + r"}" + " "
+ r"\MetricChip{Precert poison filtered}{" + str(stats.verification.precertificate_poison_filtered) + r"}",
r"\end{tcolorbox}",
r"\end{document}",
]
)
path.write_text("\n".join(line for line in lines if line != "") + "\n", encoding="utf-8")
def cleanup_latex_auxiliary_files(tex_path: Path, pdf_output: Path) -> None:
generated_base = pdf_output.parent / tex_path.stem
for suffix in (".aux", ".log", ".out", ".toc"):
candidate = generated_base.with_suffix(suffix)
if candidate.exists():
candidate.unlink()
def compile_latex_to_pdf(tex_path: Path, pdf_output: Path, engine: str) -> None:
engine_path = shutil.which(engine)
if engine_path is None:
raise RuntimeError(f"LaTeX engine not found: {engine}")
tex_path = tex_path.resolve()
pdf_output = pdf_output.resolve()
pdf_output.parent.mkdir(parents=True, exist_ok=True)
compile_cmd = [
engine_path,
"-interaction=nonstopmode",
"-halt-on-error",
"-output-directory",
str(pdf_output.parent),
str(tex_path),
]
for _ in range(2):
result = subprocess.run(
compile_cmd,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
message = (result.stdout + "\n" + result.stderr).strip()
raise RuntimeError(
"LaTeX compilation failed.\n"
+ "\n".join(message.splitlines()[-40:])
)
generated_pdf = pdf_output.parent / f"{tex_path.stem}.pdf"
if generated_pdf != pdf_output:
generated_pdf.replace(pdf_output)
cleanup_latex_auxiliary_files(tex_path, pdf_output)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Search crt.sh for currently valid certificates matching configured domain fragments.",
)
parser.add_argument(
"--domains-file",
type=Path,
default=Path("domains.local.txt"),
help="Text file containing one domain fragment per line.",
)
parser.add_argument(
"--output",
type=Path,
default=Path("output/current-valid-certificates.md"),
help="Readable single-file markdown report to write.",
)
parser.add_argument(
"--latex-output",
type=Path,
default=Path("output/current-valid-certificates.tex"),
help="Readable single-file LaTeX report to write.",
)
parser.add_argument(
"--pdf-output",
type=Path,
default=Path("output/current-valid-certificates.pdf"),
help="Compiled PDF report to write.",
)
parser.add_argument(
"--pdf-engine",
default="xelatex",
help="LaTeX engine used to compile the PDF report.",
)
parser.add_argument(
"--skip-pdf",
action="store_true",
help="Write Markdown and LaTeX outputs but skip PDF compilation.",
)
parser.add_argument(
"--cache-dir",
type=Path,
default=Path(".cache/ct-search"),
help="Directory for cached per-domain query results.",
)
parser.add_argument(
"--cache-ttl-seconds",
type=int,
default=900,
help="Reuse cached database results younger than this many seconds.",
)
parser.add_argument(
"--max-candidates-per-domain",
type=int,
default=10000,
help="Maximum raw crt.sh identity rows to inspect per domain fragment.",
)
parser.add_argument(
"--retries",
type=int,
default=3,
help="Retry count for replica/recovery conflicts from crt.sh.",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress progress output.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
domains = load_domains(args.domains_file)
all_records: list[DatabaseRecord] = []
for domain in domains:
cached = load_cached_records(
cache_dir=args.cache_dir,
domain=domain,
ttl_seconds=args.cache_ttl_seconds,
max_candidates=args.max_candidates_per_domain,
)
if cached is not None:
if not args.quiet:
print(f"[cache] domain={domain} records={len(cached)}", file=sys.stderr)
all_records.extend(cached)
continue
if not args.quiet:
print(f"[query] domain={domain}", file=sys.stderr)
records = query_domain(
domain=domain,
max_candidates=args.max_candidates_per_domain,
attempts=args.retries,
verbose=not args.quiet,
)
if not args.quiet:
print(f"[done] domain={domain} records={len(records)}", file=sys.stderr)
store_cached_records(args.cache_dir, domain, args.max_candidates_per_domain, records)
all_records.extend(records)
hits, verification = build_hits(all_records)
groups = build_groups(hits)
scan_stats = ScanStats(
generated_at_utc=utc_iso(datetime.now(UTC)),
configured_domains=domains,
unique_leaf_certificates=len(hits),
groups_total=len(groups),
groups_multi_member=sum(1 for group in groups if group.member_count > 1),
groups_singleton=sum(1 for group in groups if group.member_count == 1),
groups_by_type=dict(Counter(group.group_type for group in groups)),
verification=verification,
)
issuer_trust = query_issuer_trust(hits)
render_markdown_report(args.output, hits, groups, scan_stats, issuer_trust)
render_latex_report(args.latex_output, hits, groups, scan_stats, issuer_trust)
if not args.skip_pdf:
compile_latex_to_pdf(args.latex_output, args.pdf_output, args.pdf_engine)
if not args.quiet:
print(
f"[report] hits={len(hits)} groups={len(groups)} markdown={args.output} latex={args.latex_output}"
+ ("" if args.skip_pdf else f" pdf={args.pdf_output}"),
file=sys.stderr,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())