#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import hashlib import json import re import shutil import subprocess import sys import time from collections import Counter, defaultdict from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from typing import Any import psycopg from cryptography import x509 from cryptography.x509 import general_name from cryptography.x509.oid import ExtensionOID from cryptography.x509.oid import NameOID from psycopg.rows import dict_row QUERY_SQL = """ WITH ci AS ( SELECT min(sub.certificate_id) AS id, min(sub.issuer_ca_id) AS issuer_ca_id, x509_commonName(sub.certificate) AS common_name, x509_subjectName(sub.certificate) AS subject_dn, x509_notBefore(sub.certificate) AS not_before, x509_notAfter(sub.certificate) AS not_after, encode(x509_serialNumber(sub.certificate), 'hex') AS serial_number, sub.certificate AS certificate FROM ( SELECT cai.* FROM certificate_and_identities cai WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate) AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\' LIMIT %(max_candidates)s ) sub GROUP BY sub.certificate ) SELECT ci.id, ci.issuer_ca_id, ca.name AS issuer_name, ci.common_name, ci.subject_dn, ci.not_before, ci.not_after, cl.first_seen, ci.serial_number, coalesce(cl.revoked, 0) AS revoked_count, rev.revocation_date, rev.reason_code, rev.last_seen_check_date, crl_state.active_crl_count, crl_state.last_checked AS crl_last_checked, ci.certificate FROM ci JOIN ca ON ca.id = ci.issuer_ca_id JOIN certificate_lifecycle cl ON cl.certificate_id = ci.id LEFT JOIN LATERAL ( SELECT cr.revocation_date, cr.reason_code, cr.last_seen_check_date FROM crl_revoked cr WHERE cr.ca_id = ci.issuer_ca_id AND cr.serial_number = decode(ci.serial_number, 'hex') ORDER BY cr.last_seen_check_date DESC NULLS LAST LIMIT 1 ) rev ON TRUE LEFT JOIN LATERAL ( SELECT count(*) FILTER ( WHERE crl.error_message IS NULL AND crl.next_update > now() AT TIME ZONE 'UTC' ) AS active_crl_count, max(crl.last_checked) AS last_checked FROM crl WHERE crl.ca_id = ci.issuer_ca_id ) crl_state ON TRUE WHERE ci.not_before <= now() AT TIME ZONE 'UTC' AND ci.not_after >= now() AT TIME ZONE 'UTC' AND cl.certificate_type = 'Certificate' ORDER BY cl.first_seen DESC NULLS LAST, ci.id DESC; """ RAW_MATCH_COUNT_SQL = """ SELECT count(*) FROM certificate_and_identities cai WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate) AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\' """ REVOCATION_REASONS = { 1: "keyCompromise", 2: "cACompromise", 3: "affiliationChanged", 4: "superseded", 5: "cessationOfOperation", 6: "certificateHold", 8: "removeFromCRL", 9: "privilegeWithdrawn", 10: "aACompromise", } PRECERT_POISON_OID = x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3") @dataclass class DatabaseRecord: domain: str certificate_id: int issuer_ca_id: int issuer_name: str common_name: str | None subject_dn: str | None not_before: datetime not_after: datetime first_seen: datetime | None serial_number: str revoked_count: int revocation_date: datetime | None reason_code: int | None last_seen_check_date: datetime | None active_crl_count: int crl_last_checked: datetime | None certificate_der: bytes @dataclass class CertificateHit: fingerprint_sha256: str subject_cn: str validity_not_before: datetime validity_not_after: datetime san_entries: list[str] revocation_status: str revocation_date: datetime | None revocation_reason: str | None revocation_note: str | None crtsh_crl_timestamp: datetime | None matched_domains: set[str] = field(default_factory=set) first_seen: datetime | None = None crtsh_certificate_ids: set[int] = field(default_factory=set) serial_numbers: set[str] = field(default_factory=set) issuer_names: set[str] = field(default_factory=set) issuer_ca_ids: set[int] = field(default_factory=set) @dataclass class VerificationStats: input_rows: int = 0 unique_leaf_certificates: int = 0 non_leaf_filtered: int = 0 precertificate_poison_filtered: int = 0 @dataclass class CertificateGroup: group_id: str group_type: str member_indices: list[int] member_count: int distinct_subject_cn_count: int distinct_exact_content_count: int numbered_cn_patterns: set[str] matched_domains: set[str] subject_cns: set[str] first_seen_min: datetime | None first_seen_max: datetime | None valid_from_min: datetime valid_to_max: datetime revocation_counts: Counter @dataclass class ScanStats: generated_at_utc: str configured_domains: list[str] unique_leaf_certificates: int groups_total: int groups_multi_member: int groups_singleton: int groups_by_type: dict[str, int] verification: VerificationStats @dataclass class IssuerTrustInfo: issuer_name: str issuer_ca_ids: set[int] server_auth_contexts: set[str] major_webpki: bool def load_domains(path: Path) -> list[str]: domains: list[str] = [] for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip().lower() if not line or line.startswith("#"): continue if line.startswith("*."): line = line[2:] domains.append(line) unique_domains = sorted(set(domains)) if not unique_domains: raise ValueError(f"No domains found in {path}") return unique_domains def escape_like(value: str) -> str: return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") def utc_iso(value: datetime | None) -> str: if value is None: return "n/a" if value.tzinfo is None: value = value.replace(tzinfo=UTC) else: value = value.astimezone(UTC) return value.isoformat(timespec="seconds").replace("+00:00", "Z") def serialize_datetime(value: datetime | None) -> str | None: return utc_iso(value) if value is not None else None def parse_datetime(value: str | None) -> datetime | None: if value is None: return None return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) def cache_path(cache_dir: Path, domain: str) -> Path: safe_domain = "".join(ch if ch.isalnum() or ch in "-._" else "_" for ch in domain) return cache_dir / f"{safe_domain}.json" def record_to_cache_payload(record: DatabaseRecord) -> dict[str, Any]: return { "domain": record.domain, "certificate_id": record.certificate_id, "issuer_ca_id": record.issuer_ca_id, "issuer_name": record.issuer_name, "common_name": record.common_name, "subject_dn": record.subject_dn, "not_before": serialize_datetime(record.not_before), "not_after": serialize_datetime(record.not_after), "first_seen": serialize_datetime(record.first_seen), "serial_number": record.serial_number, "revoked_count": record.revoked_count, "revocation_date": serialize_datetime(record.revocation_date), "reason_code": record.reason_code, "last_seen_check_date": serialize_datetime(record.last_seen_check_date), "active_crl_count": record.active_crl_count, "crl_last_checked": serialize_datetime(record.crl_last_checked), "certificate_der_b64": base64.b64encode(record.certificate_der).decode("ascii"), } def record_from_cache_payload(payload: dict[str, Any]) -> DatabaseRecord: return DatabaseRecord( domain=payload["domain"], certificate_id=int(payload["certificate_id"]), issuer_ca_id=int(payload["issuer_ca_id"]), issuer_name=payload["issuer_name"], common_name=payload.get("common_name"), subject_dn=payload.get("subject_dn"), not_before=parse_datetime(payload["not_before"]) or datetime.min, not_after=parse_datetime(payload["not_after"]) or datetime.min, first_seen=parse_datetime(payload.get("first_seen")), serial_number=payload["serial_number"], revoked_count=int(payload["revoked_count"]), revocation_date=parse_datetime(payload.get("revocation_date")), reason_code=payload.get("reason_code"), last_seen_check_date=parse_datetime(payload.get("last_seen_check_date")), active_crl_count=int(payload["active_crl_count"]), crl_last_checked=parse_datetime(payload.get("crl_last_checked")), certificate_der=base64.b64decode(payload["certificate_der_b64"]), ) def load_cached_records(cache_dir: Path, domain: str, ttl_seconds: int, max_candidates: int) -> list[DatabaseRecord] | None: path = cache_path(cache_dir, domain) if not path.exists(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return None if payload.get("version") != 1: return None if payload.get("max_candidates") != max_candidates: return None cached_at = parse_datetime(payload.get("cached_at")) if cached_at is None: return None age = time.time() - cached_at.replace(tzinfo=UTC).timestamp() if age > ttl_seconds: return None return [record_from_cache_payload(item) for item in payload.get("records", [])] def store_cached_records(cache_dir: Path, domain: str, max_candidates: int, records: list[DatabaseRecord]) -> None: cache_dir.mkdir(parents=True, exist_ok=True) payload = { "version": 1, "cached_at": utc_iso(datetime.now(UTC)), "max_candidates": max_candidates, "records": [record_to_cache_payload(record) for record in records], } cache_path(cache_dir, domain).write_text( json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8", ) def connect() -> psycopg.Connection: return psycopg.connect( host="crt.sh", port=5432, dbname="certwatch", user="guest", password="guest", connect_timeout=5, sslmode="disable", autocommit=True, application_name="ct_transparency_search", ) def query_domain(domain: str, max_candidates: int, attempts: int, verbose: bool) -> list[DatabaseRecord]: params = { "domain": domain, "name_pattern": f"%{escape_like(domain)}%", "max_candidates": max_candidates, } raw_match_count = query_raw_match_count(domain=domain, attempts=attempts, verbose=verbose) if raw_match_count > max_candidates: raise ValueError( f"domain={domain} raw identity matches={raw_match_count} exceed max_candidates={max_candidates}; " f"increase --max-candidates-per-domain to at least {raw_match_count} for a complete result set" ) last_error: Exception | None = None for attempt in range(1, attempts + 1): try: with connect() as conn, conn.cursor(row_factory=dict_row) as cur: cur.execute(QUERY_SQL, params) rows = cur.fetchall() return [row_to_record(domain, row) for row in rows] except Exception as exc: last_error = exc if attempt == attempts: break if verbose: print( f"[warn] domain={domain} attempt={attempt}/{attempts} failed: {exc}", file=sys.stderr, ) time.sleep(min(2 ** attempt, 10)) assert last_error is not None raise last_error def query_raw_match_count(domain: str, attempts: int, verbose: bool) -> int: params = { "domain": domain, "name_pattern": f"%{escape_like(domain)}%", } last_error: Exception | None = None for attempt in range(1, attempts + 1): try: with connect() as conn, conn.cursor() as cur: cur.execute(RAW_MATCH_COUNT_SQL, params) row = cur.fetchone() return int(row[0]) except Exception as exc: last_error = exc if attempt == attempts: break if verbose: print( f"[warn] domain={domain} raw-count attempt={attempt}/{attempts} failed: {exc}", file=sys.stderr, ) time.sleep(min(2 ** attempt, 10)) assert last_error is not None raise last_error def row_to_record(domain: str, row: dict[str, Any]) -> DatabaseRecord: return DatabaseRecord( domain=domain, certificate_id=int(row["id"]), issuer_ca_id=int(row["issuer_ca_id"]), issuer_name=row["issuer_name"], common_name=row["common_name"], subject_dn=row["subject_dn"], not_before=row["not_before"], not_after=row["not_after"], first_seen=row["first_seen"], serial_number=row["serial_number"], revoked_count=int(row["revoked_count"]), revocation_date=row["revocation_date"], reason_code=row["reason_code"], last_seen_check_date=row["last_seen_check_date"], active_crl_count=int(row["active_crl_count"] or 0), crl_last_checked=row["crl_last_checked"], certificate_der=bytes(row["certificate"]), ) def extract_san_entries(cert: x509.Certificate) -> list[str]: try: extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) except x509.ExtensionNotFound: return [] entries: list[str] = [] for name in extension.value: entries.append(format_general_name(name)) return sorted(set(entries), key=str.casefold) def format_general_name(name: general_name.GeneralName) -> str: if isinstance(name, x509.DNSName): return f"DNS:{name.value}" if isinstance(name, x509.RFC822Name): return f"EMAIL:{name.value}" if isinstance(name, x509.UniformResourceIdentifier): return f"URI:{name.value}" if isinstance(name, x509.IPAddress): return f"IP:{name.value}" if isinstance(name, x509.RegisteredID): return f"RID:{name.value.dotted_string}" if isinstance(name, x509.DirectoryName): return f"DIR:{name.value.rfc4514_string()}" if isinstance(name, x509.OtherName): encoded = base64.b64encode(name.value).decode("ascii") return f"OTHER:{name.type_id.dotted_string}:{encoded}" return str(name) def extract_common_name(cert: x509.Certificate) -> str | None: attributes = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) if not attributes: return None return attributes[0].value def has_precertificate_poison(cert: x509.Certificate) -> bool: try: cert.extensions.get_extension_for_oid(PRECERT_POISON_OID) except x509.ExtensionNotFound: return False return True def is_leaf_certificate(cert: x509.Certificate) -> tuple[bool, str]: if has_precertificate_poison(cert): return (False, "precertificate_poison") try: basic_constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value if basic_constraints.ca: return (False, "basic_constraints_ca") except x509.ExtensionNotFound: pass try: key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value if key_usage.key_cert_sign: return (False, "key_cert_sign") except x509.ExtensionNotFound: pass return (True, "leaf") def revocation_fields(record: DatabaseRecord) -> tuple[str, datetime | None, str | None, datetime | None, str | None]: if record.revoked_count > 0: reason: str | None = None if record.reason_code in REVOCATION_REASONS: reason = REVOCATION_REASONS[record.reason_code] elif record.reason_code not in (None, 0): reason = f"unknown({record.reason_code})" return ("revoked", record.revocation_date, reason, record.last_seen_check_date, None) if record.active_crl_count > 0: return ("not_revoked", None, None, record.crl_last_checked, None) return ("unknown", None, None, record.crl_last_checked, "no fresh crt.sh CRL data") def revocation_priority(status: str) -> int: return { "unknown": 0, "not_revoked": 1, "revoked": 2, }[status] def build_hits(records: list[DatabaseRecord]) -> tuple[list[CertificateHit], VerificationStats]: verification = VerificationStats(input_rows=len(records)) hits: dict[str, CertificateHit] = {} for record in records: cert = x509.load_der_x509_certificate(record.certificate_der) is_leaf, reason = is_leaf_certificate(cert) if not is_leaf: if reason == "precertificate_poison": verification.precertificate_poison_filtered += 1 else: verification.non_leaf_filtered += 1 continue fingerprint_hex = hashlib.sha256(record.certificate_der).hexdigest() subject_cn = record.common_name or extract_common_name(cert) or "-" revocation_status, revocation_date, revocation_reason, crtsh_crl_timestamp, revocation_note = revocation_fields(record) hit = hits.get(fingerprint_hex) if hit is None: hit = CertificateHit( fingerprint_sha256=fingerprint_hex, subject_cn=subject_cn, validity_not_before=record.not_before, validity_not_after=record.not_after, san_entries=extract_san_entries(cert), revocation_status=revocation_status, revocation_date=revocation_date, revocation_reason=revocation_reason, revocation_note=revocation_note, crtsh_crl_timestamp=crtsh_crl_timestamp, matched_domains={record.domain}, first_seen=record.first_seen, crtsh_certificate_ids={record.certificate_id}, serial_numbers={record.serial_number}, issuer_names={record.issuer_name}, issuer_ca_ids={record.issuer_ca_id}, ) hits[fingerprint_hex] = hit continue hit.matched_domains.add(record.domain) hit.crtsh_certificate_ids.add(record.certificate_id) hit.serial_numbers.add(record.serial_number) hit.issuer_names.add(record.issuer_name) hit.issuer_ca_ids.add(record.issuer_ca_id) if hit.first_seen is None or (record.first_seen is not None and record.first_seen < hit.first_seen): hit.first_seen = record.first_seen if revocation_priority(revocation_status) > revocation_priority(hit.revocation_status): hit.revocation_status = revocation_status hit.revocation_date = revocation_date hit.revocation_reason = revocation_reason hit.revocation_note = revocation_note hit.crtsh_crl_timestamp = crtsh_crl_timestamp elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is not None and crtsh_crl_timestamp is not None: if crtsh_crl_timestamp > hit.crtsh_crl_timestamp: hit.crtsh_crl_timestamp = crtsh_crl_timestamp elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is None: hit.crtsh_crl_timestamp = crtsh_crl_timestamp ordered_hits = sorted( hits.values(), key=lambda hit: ( sorted(hit.matched_domains), hit.subject_cn.casefold(), hit.validity_not_before, hit.fingerprint_sha256, ), ) verification.unique_leaf_certificates = len(ordered_hits) return (ordered_hits, verification) def canonicalize_subject_cn(subject_cn: str) -> str: subject_cn = subject_cn.lower() if subject_cn.startswith("www."): return subject_cn[4:] return subject_cn def normalize_counter_pattern(hostname: str) -> str | None: normalized = re.sub(r"\d+", "#", canonicalize_subject_cn(hostname)) if normalized == canonicalize_subject_cn(hostname): return None return normalized class UnionFind: def __init__(self, size: int) -> None: self.parent = list(range(size)) self.rank = [0] * size def find(self, value: int) -> int: while self.parent[value] != value: self.parent[value] = self.parent[self.parent[value]] value = self.parent[value] return value def union(self, left: int, right: int) -> None: left_root = self.find(left) right_root = self.find(right) if left_root == right_root: return if self.rank[left_root] < self.rank[right_root]: left_root, right_root = right_root, left_root self.parent[right_root] = left_root if self.rank[left_root] == self.rank[right_root]: self.rank[left_root] += 1 def build_groups(hits: list[CertificateHit]) -> list[CertificateGroup]: if not hits: return [] canonical_cns_by_pattern: dict[str, set[str]] = defaultdict(set) for hit in hits: pattern = normalize_counter_pattern(hit.subject_cn) if pattern is not None: canonical_cns_by_pattern[pattern].add(canonicalize_subject_cn(hit.subject_cn)) qualifying_patterns = { pattern for pattern, canonical_cns in canonical_cns_by_pattern.items() if len(canonical_cns) > 1 } components: dict[tuple[str, str], list[int]] = defaultdict(list) for index, hit in enumerate(hits): canonical_cn = canonicalize_subject_cn(hit.subject_cn) pattern = normalize_counter_pattern(hit.subject_cn) if pattern in qualifying_patterns: components[("pattern", pattern)].append(index) else: components[("exact", canonical_cn)].append(index) provisional_groups: list[CertificateGroup] = [] for (family_kind, family_key), member_indices in components.items(): member_hits = [hits[index] for index in member_indices] subject_cns = {hit.subject_cn for hit in member_hits} unique_san_profiles = {tuple(hit.san_entries) for hit in member_hits} numbered_patterns = {family_key} if family_kind == "pattern" else set() group_type = "numbered_cn_pattern" if family_kind == "pattern" else "exact_endpoint_family" first_seen_values = [hit.first_seen for hit in member_hits if hit.first_seen is not None] provisional_groups.append( CertificateGroup( group_id="", group_type=group_type, member_indices=sorted(member_indices), member_count=len(member_indices), distinct_subject_cn_count=len(subject_cns), distinct_exact_content_count=len(unique_san_profiles), numbered_cn_patterns=numbered_patterns, matched_domains={domain for hit in member_hits for domain in hit.matched_domains}, subject_cns=subject_cns, first_seen_min=min(first_seen_values) if first_seen_values else None, first_seen_max=max(first_seen_values) if first_seen_values else None, valid_from_min=min(hit.validity_not_before for hit in member_hits), valid_to_max=max(hit.validity_not_after for hit in member_hits), revocation_counts=Counter(hit.revocation_status for hit in member_hits), ) ) provisional_groups.sort( key=lambda group: ( -group.member_count, group.group_type, min(canonicalize_subject_cn(value) for value in group.subject_cns), ) ) for position, group in enumerate(provisional_groups, start=1): group.group_id = f"G{position:04d}" return provisional_groups def describe_group_basis(group: CertificateGroup) -> str: if group.group_type == "numbered_cn_pattern": pattern = next(iter(group.numbered_cn_patterns)) return f"CN pattern with running-number slot: `{pattern}`" base = min(canonicalize_subject_cn(value) for value in group.subject_cns) return f"Same endpoint CN family (exact CN; `www.` grouped with base name): `{base}`" def primary_issuer_name(hit: CertificateHit) -> str: return sorted(hit.issuer_names)[0] def query_issuer_trust(hits: list[CertificateHit]) -> dict[str, IssuerTrustInfo]: issuer_name_to_ca_ids: dict[str, set[int]] = defaultdict(set) for hit in hits: issuer_name_to_ca_ids[primary_issuer_name(hit)].update(hit.issuer_ca_ids) all_ca_ids = sorted({ca_id for ca_ids in issuer_name_to_ca_ids.values() for ca_id in ca_ids}) contexts_by_ca_id: dict[int, set[str]] = defaultdict(set) if all_ca_ids: query = """ SELECT ctp.ca_id, tc.ctx FROM ca_trust_purpose ctp JOIN trust_context tc ON tc.id = ctp.trust_context_id JOIN trust_purpose tp ON tp.id = ctp.trust_purpose_id WHERE ctp.ca_id = ANY(%s) AND tp.purpose = 'Server Authentication' AND ctp.is_time_valid = TRUE AND ctp.disabled_from IS NULL """ with connect() as conn, conn.cursor() as cur: cur.execute(query, (all_ca_ids,)) for ca_id, trust_context in cur.fetchall(): contexts_by_ca_id[int(ca_id)].add(str(trust_context)) major_contexts = {"Mozilla", "Chrome", "Apple", "Microsoft", "Android"} results: dict[str, IssuerTrustInfo] = {} for issuer_name, ca_ids in issuer_name_to_ca_ids.items(): merged_contexts = {ctx for ca_id in ca_ids for ctx in contexts_by_ca_id.get(ca_id, set())} results[issuer_name] = IssuerTrustInfo( issuer_name=issuer_name, issuer_ca_ids=set(ca_ids), server_auth_contexts=merged_contexts, major_webpki=major_contexts.issubset(merged_contexts), ) return results def status_marker(status: str) -> str: return { "not_revoked": "OK ", "revoked": "REV", "unknown": "UNK", }[status] def one_line_revocation(hit: CertificateHit) -> str: if hit.revocation_status == "revoked": detail = f"revoked {utc_iso(hit.revocation_date)}" if hit.revocation_date else "revoked" if hit.revocation_reason: detail += f", reason={hit.revocation_reason}" return detail if hit.revocation_status == "unknown": if hit.revocation_note: return f"unknown, {hit.revocation_note}" return "unknown" return "not revoked" def san_tail_split(domain: str) -> tuple[list[str], str]: labels = domain.split(".") common_second_level = {"ac", "co", "com", "edu", "gov", "net", "org"} suffix_len = 2 if len(labels) >= 3 and len(labels[-1]) == 2 and labels[-2] in common_second_level: suffix_len = 3 if len(labels) <= suffix_len: return ([], domain) return (labels[:-suffix_len], ".".join(labels[-suffix_len:])) def build_san_tree_lines(san_entries: list[str]) -> list[str]: return build_san_tree_lines_with_style(san_entries, ascii_only=False) def build_san_tree_units_with_style(san_entries: list[str], ascii_only: bool) -> list[list[str]]: dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}) other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}) tree: dict[str, Any] = {} for domain in dns_entries: prefix_labels, tail = san_tail_split(domain) cursor = tree for label in prefix_labels: cursor = cursor.setdefault(label, {}) cursor.setdefault(tail, {}) def render(node: dict[str, Any], prefix: str = "") -> list[str]: lines: list[str] = [] keys = sorted(node.keys(), key=str.casefold) for index, key in enumerate(keys): is_last = index == len(keys) - 1 if ascii_only: connector = "`- " if is_last else "|- " else: connector = "└─ " if is_last else "├─ " lines.append(prefix + connector + key) child = node[key] if ascii_only: child_prefix = prefix + (" " if is_last else "| ") else: child_prefix = prefix + (" " if is_last else "│ ") lines.extend(render(child, child_prefix)) return lines units: list[list[str]] = [] for key in sorted(tree.keys(), key=str.casefold): units.append(render({key: tree[key]})) for entry in other_entries: units.append([f"{'*' if ascii_only else '•'} {entry}"]) if not units: units.append([f"{'*' if ascii_only else '•'} -"]) return units def build_san_tree_chunks_with_style( san_entries: list[str], ascii_only: bool, max_lines_per_chunk: int = 24, ) -> list[list[str]]: chunks: list[list[str]] = [] current_chunk: list[str] = [] current_lines = 0 def flush_current_chunk() -> None: nonlocal current_chunk, current_lines if current_chunk: chunks.append(current_chunk) current_chunk = [] current_lines = 0 for unit in build_san_tree_units_with_style(san_entries, ascii_only=ascii_only): if len(unit) > max_lines_per_chunk: flush_current_chunk() for start in range(0, len(unit), max_lines_per_chunk): chunks.append(unit[start : start + max_lines_per_chunk]) continue if current_chunk and current_lines + len(unit) > max_lines_per_chunk: flush_current_chunk() current_chunk.extend(unit) current_lines += len(unit) flush_current_chunk() return chunks def build_san_tree_lines_with_style(san_entries: list[str], ascii_only: bool) -> list[str]: lines: list[str] = [] for chunk in build_san_tree_chunks_with_style( san_entries, ascii_only=ascii_only, max_lines_per_chunk=10_000, ): lines.extend(chunk) return lines def group_hits_by_issuer(hits: list[CertificateHit]) -> tuple[dict[str, list[CertificateHit]], list[str]]: issuer_hits: dict[str, list[CertificateHit]] = defaultdict(list) for hit in hits: issuer_hits[primary_issuer_name(hit)].append(hit) ordered_issuers = sorted( issuer_hits, key=lambda issuer_name: (-len(issuer_hits[issuer_name]), issuer_name.casefold()), ) return issuer_hits, ordered_issuers def latex_escape(value: str) -> str: replacements = { "\\": r"\textbackslash{}", "&": r"\&", "%": r"\%", "$": r"\$", "#": r"\#", "_": r"\_", "{": r"\{", "}": r"\}", "~": r"\textasciitilde{}", "^": r"\textasciicircum{}", } return "".join(replacements.get(char, char) for char in value) def summarize_san_patterns(san_entries: list[str]) -> dict[str, Any]: dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}, key=str.casefold) other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}, key=str.casefold) zone_counts: Counter[str] = Counter() normalized_pattern_counts: Counter[str] = Counter() wildcard_count = 0 numbered_count = 0 for domain in dns_entries: normalized_domain = domain[2:] if domain.startswith("*.") else domain if domain.startswith("*."): wildcard_count += 1 if re.search(r"\d", normalized_domain): numbered_count += 1 prefix_labels, tail = san_tail_split(normalized_domain) zone_counts[tail] += 1 normalized_prefix = ".".join(re.sub(r"\d+", "#", label) for label in prefix_labels if label) if normalized_prefix: normalized_pattern_counts[f"{normalized_prefix}.{tail}"] += 1 else: normalized_pattern_counts[tail] += 1 repeating_patterns = [ (pattern, count) for pattern, count in normalized_pattern_counts.most_common(6) if count > 1 ] return { "dns_count": len(dns_entries), "other_count": len(other_entries), "wildcard_count": wildcard_count, "numbered_count": numbered_count, "zone_count": len(zone_counts), "top_zones": zone_counts.most_common(6), "repeating_patterns": repeating_patterns, } def latex_status_badge(status: str) -> str: return { "not_revoked": r"\StatusOK{}", "revoked": r"\StatusREV{}", "unknown": r"\StatusUNK{}", }[status] def latex_webpki_badge(value: bool) -> str: return r"\WebPKIYes{}" if value else r"\WebPKINo{}" def render_markdown_report( path: Path, hits: list[CertificateHit], groups: list[CertificateGroup], stats: ScanStats, issuer_trust: dict[str, IssuerTrustInfo], ) -> None: path.parent.mkdir(parents=True, exist_ok=True) issuer_hits, ordered_issuers = group_hits_by_issuer(hits) lines: list[str] = [] lines.append("# Certificate CN Family Report") lines.append("") lines.append(f"Generated: {stats.generated_at_utc}") lines.append(f"Configured domains: {', '.join(stats.configured_domains)}") lines.append("") lines.append("## What This File Contains") lines.append("") lines.append("- Chapters are built from Subject CN construction only.") lines.append("- If multiple concrete CNs share the same numbered schema, they are grouped together.") lines.append("- Otherwise the chapter is one endpoint family; `www.` is grouped with the base name as a low-signal convenience.") lines.append("- SAN entries are shown only inside each Subject CN subsection.") lines.append("- All certificates shown here are verified leaf certificates.") lines.append("") lines.append("## Issuer Overview") lines.append("") for issuer_name in ordered_issuers: trust = issuer_trust[issuer_name] ca_ids = ", ".join(str(value) for value in sorted(trust.issuer_ca_ids)) trust_label = "YES" if trust.major_webpki else "NO" lines.append( f"- {issuer_name} | certificates={len(issuer_hits[issuer_name])} | WebPKI server-auth in major stores={trust_label} | ca_id={ca_ids}" ) lines.append("") lines.append("## Leaf-Certificate Assurance") lines.append("") lines.append("- SQL filter: `certificate_lifecycle.certificate_type = 'Certificate'`") lines.append("- Local filter: precertificate poison absent, `BasicConstraints.ca != true`, `KeyUsage.keyCertSign != true`") lines.append(f"- Verified leaf certificates kept: {stats.unique_leaf_certificates}") lines.append(f"- Non-leaf filtered after download: {stats.verification.non_leaf_filtered}") lines.append(f"- Precertificate poison filtered after download: {stats.verification.precertificate_poison_filtered}") lines.append("") for issuer_position, issuer_name in enumerate(ordered_issuers, start=1): trust = issuer_trust[issuer_name] issuer_title = f"Issuer {issuer_position:02d} {issuer_name}" lines.append(f"## {issuer_title}") lines.append("") lines.append(f"- Certificates under issuer: {len(issuer_hits[issuer_name])}") lines.append( f"- WebPKI server-auth in major stores (Mozilla, Chrome, Apple, Microsoft, Android): {'YES' if trust.major_webpki else 'NO'}" ) lines.append( f"- Server-auth trust contexts seen in crt.sh live trust data: {', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none'}" ) lines.append(f"- Issuer CA IDs: {', '.join(str(value) for value in sorted(trust.issuer_ca_ids))}") lines.append("") issuer_groups = build_groups(issuer_hits[issuer_name]) for family_index, group in enumerate(issuer_groups, start=1): member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices] chapter_title = f"Family {family_index:02d} {describe_group_basis(group)}" lines.append(f"### {chapter_title}") lines.append("") lines.append(f"- Certificates in chapter: {group.member_count}") lines.append(f"- Concrete Subject CNs: {group.distinct_subject_cn_count}") lines.append(f"- Distinct SAN profiles in chapter: {group.distinct_exact_content_count}") lines.append(f"- Matched domains: {', '.join(sorted(group.matched_domains))}") lines.append(f"- Family validity span: {utc_iso(group.valid_from_min)} -> {utc_iso(group.valid_to_max)}") if group.first_seen_min and group.first_seen_max: lines.append(f"- First seen span: {utc_iso(group.first_seen_min)} -> {utc_iso(group.first_seen_max)}") lines.append(f"- Revocation mix: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown") lines.append("") hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list) for hit in member_hits: hits_by_subject[hit.subject_cn].append(hit) ordered_subjects = sorted( hits_by_subject.keys(), key=lambda value: (canonicalize_subject_cn(value), value.casefold()), ) for subject_cn in ordered_subjects: subject_hits = sorted( hits_by_subject[subject_cn], key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256), ) lines.append(f"#### Subject CN: `{subject_cn}`") lines.append("") lines.append(f"- Certificates under this CN: {len(subject_hits)}") lines.append(f"- Validity span under this CN: {utc_iso(min(hit.validity_not_before for hit in subject_hits))} -> {utc_iso(max(hit.validity_not_after for hit in subject_hits))}") san_profiles: dict[tuple[str, ...], list[CertificateHit]] = defaultdict(list) for hit in subject_hits: san_profiles[tuple(hit.san_entries)].append(hit) profile_size_counts = Counter(len(profile) for profile in san_profiles) unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries}) lines.append(f"- Distinct SAN profiles under this CN: {len(san_profiles)}") lines.append( "- SAN profile sizes seen: " + ", ".join( f"{size} SAN x {count}" for size, count in sorted(profile_size_counts.items()) ) ) lines.append("") lines.append("Validity history") lines.append("") for hit in subject_hits: crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids)) lines.append( f"- [{status_marker(hit.revocation_status)}] {utc_iso(hit.validity_not_before)} -> {utc_iso(hit.validity_not_after)} | SANs={len(hit.san_entries)} | crt.sh={crtsh_ids} | {one_line_revocation(hit)}" ) lines.append("") lines.append("SAN structure") lines.append("") lines.append("```text") for tree_line in build_san_tree_lines(unique_san_entries): lines.append(tree_line) lines.append("```") lines.append("") lines.append("---") lines.append("") lines.append("## Statistics") lines.append("") lines.append(f"- Unique leaf certificates: {stats.unique_leaf_certificates}") lines.append(f"- CN-family chapters: {stats.groups_total}") lines.append(f"- Chapters with more than one certificate: {stats.groups_multi_member}") lines.append(f"- Single-certificate chapters: {stats.groups_singleton}") lines.append(f"- Numbered CN pattern chapters: {stats.groups_by_type.get('numbered_cn_pattern', 0)}") lines.append(f"- Exact endpoint chapters: {stats.groups_by_type.get('exact_endpoint_family', 0)}") lines.append("") path.write_text("\n".join(lines) + "\n", encoding="utf-8") def render_latex_report( path: Path, hits: list[CertificateHit], groups: list[CertificateGroup], stats: ScanStats, issuer_trust: dict[str, IssuerTrustInfo], show_page_numbers: bool = True, ) -> None: path.parent.mkdir(parents=True, exist_ok=True) issuer_hits, ordered_issuers = group_hits_by_issuer(hits) revoked_total = sum(1 for hit in hits if hit.revocation_status == "revoked") unknown_total = sum(1 for hit in hits if hit.revocation_status == "unknown") not_revoked_total = sum(1 for hit in hits if hit.revocation_status == "not_revoked") lines: list[str] = [ r"\documentclass[11pt]{article}", r"\usepackage[a4paper,margin=18mm]{geometry}", r"\usepackage{fontspec}", r"\usepackage[table]{xcolor}", r"\usepackage{microtype}", r"\usepackage{hyperref}", r"\usepackage{xurl}", r"\usepackage{array}", r"\usepackage{booktabs}", r"\usepackage{tabularx}", r"\usepackage{longtable}", r"\usepackage{enumitem}", r"\usepackage{titlesec}", r"\usepackage[most]{tcolorbox}", r"\usepackage{fancyvrb}", r"\usepackage{needspace}", r"\defaultfontfeatures{Ligatures=TeX,Scale=MatchLowercase}", r"\definecolor{Ink}{HTML}{17202A}", r"\definecolor{Muted}{HTML}{667085}", r"\definecolor{Line}{HTML}{D0D5DD}", r"\definecolor{Panel}{HTML}{F8FAFC}", r"\definecolor{Accent}{HTML}{0F766E}", r"\definecolor{AccentSoft}{HTML}{E6F4F1}", r"\definecolor{AccentLine}{HTML}{74C4B8}", r"\definecolor{Warn}{HTML}{9A6700}", r"\definecolor{WarnSoft}{HTML}{FFF4DB}", r"\definecolor{Danger}{HTML}{B42318}", r"\definecolor{DangerSoft}{HTML}{FEE4E2}", r"\definecolor{OkText}{HTML}{065F46}", r"\definecolor{OkSoft}{HTML}{DCFCE7}", r"\definecolor{UnknownText}{HTML}{9A6700}", r"\definecolor{UnknownSoft}{HTML}{FEF3C7}", r"\hypersetup{colorlinks=true,linkcolor=Accent,urlcolor=Accent,pdfauthor={CertTransparencySearch},pdftitle={Certificate Transparency Endpoint Atlas}}", r"\setlength{\parindent}{0pt}", r"\setlength{\parskip}{6pt}", r"\setlength{\emergencystretch}{3em}", r"\setlength{\footskip}{24pt}", r"\setlength{\tabcolsep}{4.2pt}", r"\renewcommand{\arraystretch}{1.12}", r"\raggedbottom", r"\setcounter{tocdepth}{2}", rf"\pagestyle{{{'plain' if show_page_numbers else 'empty'}}}", r"\titleformat{\section}{\sffamily\bfseries\LARGE\color{Ink}\raggedright}{\thesection}{0.8em}{}", r"\titleformat{\subsection}{\sffamily\bfseries\Large\color{Ink}\raggedright}{\thesubsection}{0.8em}{}", r"\titleformat{\subsubsection}{\sffamily\bfseries\normalsize\color{Ink}\raggedright}{\thesubsubsection}{0.8em}{}", r"\tcbset{", r" panel/.style={enhanced,breakable,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=white,colframe=Line},", r" hero/.style={panel,colback=Ink,colframe=Ink,left=14pt,right=14pt,top=14pt,bottom=14pt},", r" summary/.style={panel,colback=Panel,colframe=Line},", r" issuerpanel/.style={panel,colback=Panel,colframe=Ink!45},", r" familypanel/.style={panel,colback=AccentSoft,colframe=AccentLine},", r" subjectpanel/.style={panel,colback=white,colframe=Line},", r" treepanel/.style={enhanced,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=Panel,colframe=AccentLine},", r"}", r"\newcommand{\DomainChip}[1]{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=AccentSoft]{\sffamily\footnotesize\texttt{#1}}}", r"\newcommand{\MetricChip}[2]{\tcbox[on line,boxrule=0pt,arc=3pt,left=6pt,right=6pt,top=3pt,bottom=3pt,colback=Panel]{\sffamily\footnotesize\textcolor{Muted}{#1}\hspace{0.45em}\textbf{#2}}}", r"\newcommand{\StatusOK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{OK}}}", r"\newcommand{\StatusREV}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{REV}}}", r"\newcommand{\StatusUNK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=UnknownSoft]{\sffamily\bfseries\footnotesize\textcolor{UnknownText}{UNK}}}", r"\newcommand{\WebPKIYes}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{WebPKI: YES}}}", r"\newcommand{\WebPKINo}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{WebPKI: NO}}}", r"\begin{document}", r"\begin{titlepage}", r"\thispagestyle{empty}", r"\vspace*{20mm}", r"\begin{tcolorbox}[hero]", r"{\color{white}\sffamily\bfseries\fontsize{24}{28}\selectfont Certificate Transparency Endpoint Atlas\par}", r"\vspace{4pt}", r"{\color{white}\Large Currently valid leaf certificates matching the configured domains\par}", r"\vspace{12pt}", r"{\color{white}\sffamily\small This artefact is optimized for review: issuer-first navigation, CN-family grouping, certificate timelines, and SAN structure blocks designed to be read rather than decoded.}", r"\end{tcolorbox}", r"\vspace{10mm}", r"\begin{tcolorbox}[summary]", rf"\textbf{{Generated}}: {latex_escape(stats.generated_at_utc)}\par", r"\textbf{Configured domains}: " + " ".join( rf"\DomainChip{{{latex_escape(domain)}}}" for domain in stats.configured_domains ), r"\par\medskip", r"\MetricChip{Leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " " + r"\MetricChip{CN families}{" + str(stats.groups_total) + r"}" + " " + r"\MetricChip{Numbered families}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " " + r"\MetricChip{Exact families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}", r"\par\medskip", r"\MetricChip{Not revoked}{" + str(not_revoked_total) + r"}" + " " + r"\MetricChip{Revoked}{" + str(revoked_total) + r"}" + " " + r"\MetricChip{Unknown}{" + str(unknown_total) + r"}", r"\end{tcolorbox}", r"\vfill", r"{\sffamily\small\textcolor{Muted}{Same scan, three outputs: Markdown for editor preview, LaTeX for source control, PDF for distribution.}}", r"\end{titlepage}", r"\tableofcontents", r"\clearpage", r"\section*{Executive Summary}", r"\addcontentsline{toc}{section}{Executive Summary}", r"\begin{tcolorbox}[summary]", r"\textbf{Reading guide}\par", r"Major chapters are exact issuer names. Inside each issuer, families are derived only from the construction of the Subject CN. Each concrete Subject CN then gets its own certificate timeline and a SAN structure panel.\par", r"\medskip", r"\textbf{Leaf-only assurance}\par", r"SQL excludes entries whose lifecycle type is not \texttt{Certificate}. Local parsing then rejects any artifact with precertificate poison, \texttt{BasicConstraints.ca = true}, or \texttt{KeyUsage.keyCertSign = true}.", r"\end{tcolorbox}", r"\begin{tcolorbox}[summary]", r"\textbf{Issuer landscape}\par", r"\medskip", r"\begin{tabularx}{\linewidth}{>{\raggedright\arraybackslash}X >{\raggedleft\arraybackslash}p{1.7cm} >{\raggedleft\arraybackslash}p{1.9cm} >{\raggedleft\arraybackslash}p{2.0cm}}", r"\toprule", r"Issuer & Certificates & Share & WebPKI \\", r"\midrule", ] total_hits = len(hits) if hits else 1 for issuer_name in ordered_issuers: issuer_count = len(issuer_hits[issuer_name]) share = f"{issuer_count / total_hits:.1%}" lines.append( rf"{latex_escape(issuer_name)} & {issuer_count} & {latex_escape(share)} & {latex_webpki_badge(issuer_trust[issuer_name].major_webpki)} \\" ) lines.extend( [ r"\bottomrule", r"\end{tabularx}", r"\end{tcolorbox}", ] ) for issuer_position, issuer_name in enumerate(ordered_issuers, start=1): trust = issuer_trust[issuer_name] issuer_groups = build_groups(issuer_hits[issuer_name]) lines.extend( [ r"\clearpage", rf"\section{{Issuer {issuer_position:02d}: {latex_escape(issuer_name)}}}", r"\begin{tcolorbox}[issuerpanel]", r"\MetricChip{Certificates}{" + str(len(issuer_hits[issuer_name])) + r"}" + " " + r"\MetricChip{Families}{" + str(len(issuer_groups)) + r"}" + " " + latex_webpki_badge(trust.major_webpki), r"\par\medskip", rf"\textbf{{Trust contexts seen in crt.sh live data}}: {latex_escape(', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none')}\par", rf"\textbf{{Issuer CA IDs}}: {latex_escape(', '.join(str(value) for value in sorted(trust.issuer_ca_ids)))}", r"\end{tcolorbox}", ] ) for family_index, group in enumerate(issuer_groups, start=1): member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices] lines.extend( [ r"\Needspace{14\baselineskip}", rf"\subsection{{Family {family_index:02d}: {latex_escape(describe_group_basis(group).replace('`', ''))}}}", r"\begin{tcolorbox}[familypanel]", r"\MetricChip{Certificates}{" + str(group.member_count) + r"}" + " " + r"\MetricChip{Concrete CNs}{" + str(group.distinct_subject_cn_count) + r"}" + " " + r"\MetricChip{Distinct SAN profiles}{" + str(group.distinct_exact_content_count) + r"}", r"\par\medskip", rf"\textbf{{Matched domains}}: {' '.join(rf'\DomainChip{{{latex_escape(domain)}}}' for domain in sorted(group.matched_domains))}\par", rf"\textbf{{Family validity span}}: \texttt{{{latex_escape(utc_iso(group.valid_from_min))}}} to \texttt{{{latex_escape(utc_iso(group.valid_to_max))}}}\par", ( rf"\textbf{{First seen span}}: \texttt{{{latex_escape(utc_iso(group.first_seen_min))}}} to \texttt{{{latex_escape(utc_iso(group.first_seen_max))}}}\par" if group.first_seen_min and group.first_seen_max else "" ), rf"\textbf{{Revocation mix}}: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown", r"\end{tcolorbox}", ] ) hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list) for hit in member_hits: hits_by_subject[hit.subject_cn].append(hit) ordered_subjects = sorted( hits_by_subject.keys(), key=lambda value: (canonicalize_subject_cn(value), value.casefold()), ) for subject_cn in ordered_subjects: subject_hits = sorted( hits_by_subject[subject_cn], key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256), ) san_summary = summarize_san_patterns(sorted({entry for hit in subject_hits for entry in hit.san_entries})) unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries}) lines.extend( [ r"\Needspace{18\baselineskip}", rf"\subsubsection{{Subject CN: {latex_escape(subject_cn)}}}", r"\begin{tcolorbox}[subjectpanel]", r"\MetricChip{Certificates under this CN}{" + str(len(subject_hits)) + r"}" + " " + r"\MetricChip{Distinct SAN profiles}{" + str(len({tuple(hit.san_entries) for hit in subject_hits})) + r"}" + " " + r"\MetricChip{Unique SAN entries}{" + str(len(unique_san_entries)) + r"}", r"\par\medskip", rf"\textbf{{Validity span under this CN}}: \texttt{{{latex_escape(utc_iso(min(hit.validity_not_before for hit in subject_hits)))}}} to \texttt{{{latex_escape(utc_iso(max(hit.validity_not_after for hit in subject_hits)))}}}", r"\par\medskip", r"\textbf{Certificate timeline}", r"\begin{itemize}[leftmargin=1.4em,itemsep=0.55em,topsep=0.4em]", ] ) for hit in subject_hits: crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids)) lines.extend( [ r"\item " + latex_status_badge(hit.revocation_status) + " " + rf"\texttt{{{latex_escape(utc_iso(hit.validity_not_before))}}} to \texttt{{{latex_escape(utc_iso(hit.validity_not_after))}}}", rf"\newline \textcolor{{Muted}}{{SANs: {len(hit.san_entries)} \quad crt.sh: {latex_escape(crtsh_ids)} \quad {latex_escape(one_line_revocation(hit))}}}", ] ) tree_chunks = build_san_tree_chunks_with_style( unique_san_entries, ascii_only=True, max_lines_per_chunk=24, ) lines.extend( [ r"\end{itemize}", r"\medskip", r"\textbf{SAN pattern snapshot}", r"\par\medskip", r"\MetricChip{DNS SANs}{" + str(san_summary["dns_count"]) + r"}" + " " + r"\MetricChip{Other SANs}{" + str(san_summary["other_count"]) + r"}" + " " + r"\MetricChip{Wildcard SANs}{" + str(san_summary["wildcard_count"]) + r"}" + " " + r"\MetricChip{Numbered SANs}{" + str(san_summary["numbered_count"]) + r"}" + " " + r"\MetricChip{DNS zones}{" + str(san_summary["zone_count"]) + r"}", r"\par\medskip", rf"\textbf{{Dominant zones}}: {latex_escape(', '.join(f'{zone} ({count})' for zone, count in san_summary['top_zones']) if san_summary['top_zones'] else 'none')}", r"\par", rf"\textbf{{Repeating host schemas}}: {latex_escape(', '.join(f'{pattern} ({count})' for pattern, count in san_summary['repeating_patterns']) if san_summary['repeating_patterns'] else 'mostly one-off SAN hostnames')}", ( rf"\par\medskip\textcolor{{Muted}}{{The SAN structure below is shown in {len(tree_chunks)} intact panels so the visual grouping is not broken across a page.}}" if len(tree_chunks) > 1 else "" ), r"\end{tcolorbox}", ] ) for tree_chunk_index, tree_lines in enumerate(tree_chunks, start=1): tree_title = ( "SAN Structure" if len(tree_chunks) == 1 else f"SAN Structure ({tree_chunk_index}/{len(tree_chunks)})" ) tree_needspace = max(12, min(len(tree_lines) + 7, 32)) lines.extend( [ rf"\Needspace{{{tree_needspace}\baselineskip}}", rf"\begin{{tcolorbox}}[treepanel,title={{{latex_escape(tree_title)}}}]", r"\begin{Verbatim}[fontsize=\footnotesize]", ] ) lines.extend(tree_lines) lines.extend( [ r"\end{Verbatim}", r"\end{tcolorbox}", ] ) lines.extend( [ r"\clearpage", r"\section*{Statistics}", r"\addcontentsline{toc}{section}{Statistics}", r"\begin{tcolorbox}[summary]", r"\MetricChip{Unique leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " " + r"\MetricChip{CN-family chapters}{" + str(stats.groups_total) + r"}" + " " + r"\MetricChip{Multi-certificate chapters}{" + str(stats.groups_multi_member) + r"}" + " " + r"\MetricChip{Singleton chapters}{" + str(stats.groups_singleton) + r"}", r"\par\medskip", r"\MetricChip{Numbered CN patterns}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " " + r"\MetricChip{Exact endpoint families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}" + " " + r"\MetricChip{Non-leaf filtered}{" + str(stats.verification.non_leaf_filtered) + r"}" + " " + r"\MetricChip{Precert poison filtered}{" + str(stats.verification.precertificate_poison_filtered) + r"}", r"\end{tcolorbox}", r"\end{document}", ] ) path.write_text("\n".join(line for line in lines if line != "") + "\n", encoding="utf-8") def cleanup_latex_auxiliary_files(tex_path: Path, pdf_output: Path) -> None: generated_base = pdf_output.parent / tex_path.stem for suffix in (".aux", ".log", ".out", ".toc"): candidate = generated_base.with_suffix(suffix) if candidate.exists(): candidate.unlink() def compile_latex_to_pdf(tex_path: Path, pdf_output: Path, engine: str) -> None: engine_path = shutil.which(engine) if engine_path is None: raise RuntimeError(f"LaTeX engine not found: {engine}") tex_path = tex_path.resolve() pdf_output = pdf_output.resolve() pdf_output.parent.mkdir(parents=True, exist_ok=True) compile_cmd = [ engine_path, "-interaction=nonstopmode", "-halt-on-error", "-output-directory", str(pdf_output.parent), str(tex_path), ] for _ in range(2): result = subprocess.run( compile_cmd, capture_output=True, text=True, check=False, ) if result.returncode != 0: message = (result.stdout + "\n" + result.stderr).strip() raise RuntimeError( "LaTeX compilation failed.\n" + "\n".join(message.splitlines()[-40:]) ) generated_pdf = pdf_output.parent / f"{tex_path.stem}.pdf" if generated_pdf != pdf_output: generated_pdf.replace(pdf_output) cleanup_latex_auxiliary_files(tex_path, pdf_output) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Search crt.sh for currently valid certificates matching configured domain fragments.", ) parser.add_argument( "--domains-file", type=Path, default=Path("domains.local.txt"), help="Text file containing one domain fragment per line.", ) parser.add_argument( "--output", type=Path, default=Path("output/current-valid-certificates.md"), help="Readable single-file markdown report to write.", ) parser.add_argument( "--latex-output", type=Path, default=Path("output/current-valid-certificates.tex"), help="Readable single-file LaTeX report to write.", ) parser.add_argument( "--pdf-output", type=Path, default=Path("output/current-valid-certificates.pdf"), help="Compiled PDF report to write.", ) parser.add_argument( "--pdf-engine", default="xelatex", help="LaTeX engine used to compile the PDF report.", ) parser.add_argument( "--skip-pdf", action="store_true", help="Write Markdown and LaTeX outputs but skip PDF compilation.", ) parser.add_argument( "--cache-dir", type=Path, default=Path(".cache/ct-search"), help="Directory for cached per-domain query results.", ) parser.add_argument( "--cache-ttl-seconds", type=int, default=900, help="Reuse cached database results younger than this many seconds.", ) parser.add_argument( "--max-candidates-per-domain", type=int, default=10000, help="Maximum raw crt.sh identity rows to inspect per domain fragment.", ) parser.add_argument( "--retries", type=int, default=3, help="Retry count for replica/recovery conflicts from crt.sh.", ) parser.add_argument( "--quiet", action="store_true", help="Suppress progress output.", ) return parser.parse_args() def main() -> int: args = parse_args() domains = load_domains(args.domains_file) all_records: list[DatabaseRecord] = [] for domain in domains: cached = load_cached_records( cache_dir=args.cache_dir, domain=domain, ttl_seconds=args.cache_ttl_seconds, max_candidates=args.max_candidates_per_domain, ) if cached is not None: if not args.quiet: print(f"[cache] domain={domain} records={len(cached)}", file=sys.stderr) all_records.extend(cached) continue if not args.quiet: print(f"[query] domain={domain}", file=sys.stderr) records = query_domain( domain=domain, max_candidates=args.max_candidates_per_domain, attempts=args.retries, verbose=not args.quiet, ) if not args.quiet: print(f"[done] domain={domain} records={len(records)}", file=sys.stderr) store_cached_records(args.cache_dir, domain, args.max_candidates_per_domain, records) all_records.extend(records) hits, verification = build_hits(all_records) groups = build_groups(hits) scan_stats = ScanStats( generated_at_utc=utc_iso(datetime.now(UTC)), configured_domains=domains, unique_leaf_certificates=len(hits), groups_total=len(groups), groups_multi_member=sum(1 for group in groups if group.member_count > 1), groups_singleton=sum(1 for group in groups if group.member_count == 1), groups_by_type=dict(Counter(group.group_type for group in groups)), verification=verification, ) issuer_trust = query_issuer_trust(hits) render_markdown_report(args.output, hits, groups, scan_stats, issuer_trust) render_latex_report(args.latex_output, hits, groups, scan_stats, issuer_trust) if not args.skip_pdf: compile_latex_to_pdf(args.latex_output, args.pdf_output, args.pdf_engine) if not args.quiet: print( f"[report] hits={len(hits)} groups={len(groups)} markdown={args.output} latex={args.latex_output}" + ("" if args.skip_pdf else f" pdf={args.pdf_output}"), file=sys.stderr, ) return 0 if __name__ == "__main__": raise SystemExit(main())