commit 0cf86b0587caa3419e8353e9e995130ff5acbd36 Author: saymrwulf Date: Sun Mar 29 11:40:06 2026 +0200 Initial public release diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5e097be --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.venv/ +__pycache__/ +.cache/ +output/ +*.local.txt +domains.txt +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a0b3e5 --- /dev/null +++ b/README.md @@ -0,0 +1,85 @@ +# Certificate Transparency Search + +This project scans Certificate Transparency for currently valid leaf certificates whose SAN sets contain configured search terms, verifies the certificates locally, inspects revocation state, classifies intended usage from EKU and KeyUsage, and scans the public DNS names exposed by the certificate corpus. + +The repository is designed for public source control: + +- real search terms live only in `domains.local.txt` +- generated artefacts live only in `output/` +- caches live only in `.cache/` + +None of those paths should be committed. + +## Setup + +```bash +python3 -m venv .venv +.venv/bin/python -m pip install -r requirements.txt +cp domains.example.txt domains.local.txt +``` + +Edit `domains.local.txt` with the real search terms you want to scan. + +## Safety Against Silent Undercounts + +The scanner now refuses to run if the configured per-domain candidate cap is lower than the live raw match count from crt.sh. This prevents silent truncation when the raw identity set is larger than the cap. + +## Core Inventory Report + +```bash +.venv/bin/python ct_scan.py \ + --domains-file domains.local.txt \ + --cache-ttl-seconds 0 \ + --output output/current-valid-certificates.md \ + --latex-output output/current-valid-certificates.tex \ + --pdf-output output/current-valid-certificates.pdf +``` + +This report is the issuer-first inventory view. + +## Purpose Assessment + +```bash +.venv/bin/python ct_usage_assessment.py \ + --domains-file domains.local.txt \ + --cache-ttl-seconds 0 \ + --markdown-output output/certificate-purpose-assessment.md \ + --json-output output/certificate-purpose-assessment.json +``` + +This assessment classifies the current corpus into: + +- TLS server only +- TLS server and client auth +- client auth only +- S/MIME only +- code signing only + +## Consolidated Master Report + +```bash +.venv/bin/python ct_master_report.py \ + --domains-file domains.local.txt \ + --cache-ttl-seconds 0 \ + --dns-cache-ttl-seconds 86400 \ + --markdown-output output/consolidated-corpus-report.md \ + --latex-output output/consolidated-corpus-report.tex \ + --pdf-output output/consolidated-corpus-report.pdf +``` + +This is the main document for readers. It combines: + +- data-integrity and completeness proof +- certificate inventory and issuer analysis +- purpose assessment +- naming-pattern interpretation +- public DNS delivery analysis +- crosswalk between certificate structure and DNS structure +- confidence and limit statements + +## Public Repo Rules + +- Keep `domains.local.txt` local only. +- Never commit `output/`. +- Never commit `.cache/`. +- If you need a sample config in git, update `domains.example.txt`, not `domains.local.txt`. diff --git a/ct_dns_utils.py b/ct_dns_utils.py new file mode 100644 index 0000000..01feaaf --- /dev/null +++ b/ct_dns_utils.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import hashlib +import ipaddress +import json +import re +import subprocess +import time +from dataclasses import asdict, dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import ct_scan + + +@dataclass +class DnsObservation: + original_name: str + original_status: str + cname_chain: list[str] + terminal_name: str + terminal_status: str + a_records: list[str] + aaaa_records: list[str] + ptr_records: list[str] + classification: str + stack_signature: str + provider_hints: list[str] + + +def normalize_name(name: str) -> str: + return name.rstrip(".").lower() + + +def cache_key(value: str) -> str: + digest = hashlib.sha256(value.encode("utf-8")).hexdigest()[:16] + slug = re.sub(r"[^a-z0-9.-]+", "-", value.lower()).strip("-") + slug = slug[:80] or "item" + return f"v1-{slug}-{digest}.json" + + +def load_json_cache(cache_dir: Path, key: str, ttl_seconds: int) -> dict[str, Any] | None: + path = cache_dir / key + if not path.exists(): + return None + payload = json.loads(path.read_text(encoding="utf-8")) + cached_at = datetime.fromisoformat(payload["cached_at"].replace("Z", "+00:00")) + age = time.time() - cached_at.astimezone(UTC).timestamp() + if age > ttl_seconds: + return None + return payload + + +def store_json_cache(cache_dir: Path, key: str, payload: dict[str, Any]) -> None: + cache_dir.mkdir(parents=True, exist_ok=True) + enriched = dict(payload) + enriched["cached_at"] = ct_scan.utc_iso(datetime.now(UTC)) + (cache_dir / key).write_text(json.dumps(enriched, indent=2, sort_keys=True), encoding="utf-8") + + +def run_dig(name: str, rrtype: str, short: bool) -> str: + cmd = ["dig", "+time=2", "+tries=1"] + if short: + cmd.append("+short") + else: + cmd.extend(["+noall", "+comments", "+answer"]) + cmd.extend([name, rrtype]) + result = subprocess.run(cmd, capture_output=True, text=True, check=False) + return result.stdout + + +def dig_status(name: str, rrtype: str = "A") -> str: + output = run_dig(name, rrtype, short=False) + match = re.search(r"status:\s*([A-Z]+)", output) + if match: + return match.group(1) + if output.strip(): + return "NOERROR" + return "UNKNOWN" + + +def dig_short(name: str, rrtype: str) -> list[str]: + output = run_dig(name, rrtype, short=True) + return [normalize_name(line) for line in output.splitlines() if line.strip()] + + +def parse_answer_section(output: str) -> list[tuple[str, str]]: + in_answer = False + parsed: list[tuple[str, str]] = [] + for raw_line in output.splitlines(): + line = raw_line.strip() + if not line: + continue + if line.startswith(";; ANSWER SECTION:"): + in_answer = True + continue + if not in_answer or line.startswith(";;"): + continue + match = re.match(r"^\S+\s+\d+\s+IN\s+(\S+)\s+(.+)$", line) + if not match: + continue + rrtype, rdata = match.groups() + parsed.append((rrtype.upper(), normalize_name(rdata))) + return parsed + + +def is_ip_address(value: str) -> bool: + try: + ipaddress.ip_address(value) + return True + except ValueError: + return False + + +def classify_observation(chain: list[str], terminal_status: str, a_records: list[str], aaaa_records: list[str]) -> str: + has_addresses = bool(a_records or aaaa_records) + if chain and has_addresses: + return "cname_to_address" + if chain and not has_addresses: + return "dangling_cname" + if has_addresses: + return "direct_address" + if terminal_status == "NXDOMAIN": + return "nxdomain" + if terminal_status == "NOERROR": + return "no_data" + return "other" + + +def infer_provider_hints(observation: DnsObservation) -> list[str]: + text = " ".join( + [ + observation.original_name, + *observation.cname_chain, + observation.terminal_name, + *observation.ptr_records, + ] + ).lower() + hints: list[str] = [] + if "campaign.adobe.com" in text: + hints.append("Adobe Campaign") + if "cloudfront.net" in text: + hints.append("AWS CloudFront") + if "elb.amazonaws.com" in text or "compute.amazonaws.com" in text: + hints.append("AWS") + if "apigee.net" in text or "googleusercontent.com" in text: + hints.append("Google Apigee") + if "pegacloud.net" in text or ".pega.net" in text: + hints.append("Pega Cloud") + if "useinfinite.io" in text: + hints.append("Infinite / agency alias") + if any(ip.startswith("13.107.") for ip in observation.a_records) or any(ip.startswith("2620:1ec:") for ip in observation.aaaa_records): + hints.append("Microsoft Edge") + if not hints: + hints.append("Unclassified") + return hints + + +def infer_stack_signature(observation: DnsObservation) -> str: + hints = infer_provider_hints(observation) + if observation.classification == "nxdomain": + return "No public DNS (NXDOMAIN)" + if observation.classification == "no_data": + return "No public address data" + if "Adobe Campaign" in hints and "AWS CloudFront" in hints: + return "Adobe Campaign -> AWS CloudFront" + if "Adobe Campaign" in hints and "AWS" in hints: + return "Adobe Campaign -> AWS ALB" + if "Adobe Campaign" in hints and observation.a_records: + return "Adobe Campaign direct IP" + if "AWS CloudFront" in hints: + return "AWS CloudFront" + if "Google Apigee" in hints: + return "Google Apigee" + if "Pega Cloud" in hints and "AWS" in hints: + return "Pega Cloud -> AWS ALB" + if "Infinite / agency alias" in hints and observation.classification == "dangling_cname": + return "Dangling agency alias" + if "Microsoft Edge" in hints: + return "Direct Microsoft edge" + if "AWS" in hints: + return "Direct AWS" + if observation.classification == "direct_address": + return "Direct address (provider unclear)" + if observation.classification == "cname_to_address": + return "CNAME to address (provider unclear)" + return hints[0] + + +def scan_name_live(name: str) -> DnsObservation: + name = normalize_name(name) + a_output = run_dig(name, "A", short=False) + aaaa_output = run_dig(name, "AAAA", short=False) + original_status = dig_status(name, "A") + a_answers = parse_answer_section(a_output) + aaaa_answers = parse_answer_section(aaaa_output) + chain: list[str] = [] + for rrtype, rdata in a_answers + aaaa_answers: + if rrtype == "CNAME" and rdata not in chain: + chain.append(rdata) + a_records = sorted({rdata for rrtype, rdata in a_answers if rrtype == "A" and is_ip_address(rdata)}) + aaaa_records = sorted({rdata for rrtype, rdata in aaaa_answers if rrtype == "AAAA" and is_ip_address(rdata)}) + terminal_name = chain[-1] if chain else name + terminal_status = original_status + observation = DnsObservation( + original_name=name, + original_status=original_status, + cname_chain=chain, + terminal_name=terminal_name, + terminal_status=terminal_status, + a_records=a_records, + aaaa_records=aaaa_records, + ptr_records=[], + classification=classify_observation(chain, terminal_status, a_records, aaaa_records), + stack_signature="", + provider_hints=[], + ) + observation.provider_hints = infer_provider_hints(observation) + observation.stack_signature = infer_stack_signature(observation) + return observation + + +def scan_name_cached(name: str, cache_dir: Path, ttl_seconds: int) -> DnsObservation: + key = cache_key(name) + cached = load_json_cache(cache_dir, key, ttl_seconds) + if cached is not None: + payload = dict(cached) + payload.pop("cached_at", None) + return DnsObservation(**payload) + observation = scan_name_live(name) + store_json_cache(cache_dir, key, asdict(observation)) + return observation + + +def ptr_lookup(ip: str, cache_dir: Path, ttl_seconds: int) -> list[str]: + key = cache_key(f"ptr-{ip}") + cached = load_json_cache(cache_dir, key, ttl_seconds) + if cached is not None: + return list(cached.get("answers", [])) + output = subprocess.run( + ["dig", "+time=2", "+tries=1", "+short", "-x", ip, "PTR"], + capture_output=True, + text=True, + check=False, + ).stdout + answers = [normalize_name(line) for line in output.splitlines() if line.strip()] + store_json_cache(cache_dir, key, {"answers": answers}) + return answers + + +def provider_explanations() -> dict[str, str]: + return { + "Adobe Campaign": "A marketing and communication platform often used to send customer messages, email journeys, and campaign traffic. In DNS terms, it can sit in front of cloud infrastructure rather than hosting the final application by itself.", + "AWS": "Amazon Web Services, a large public cloud platform. In this report it usually means the endpoint ultimately lands on Amazon-hosted compute or load-balancing infrastructure.", + "AWS ALB": "AWS Application Load Balancer. A traffic-distribution front door that sends incoming web requests to one or more backend services.", + "AWS CloudFront": "Amazon's global content-delivery and edge network. It is often used to front websites, APIs, and static assets close to users.", + "Google Apigee": "An API gateway and API-management layer. If a hostname lands here, it usually means the public endpoint is being governed as an API product rather than being exposed directly from an application server.", + "Pega Cloud": "A managed hosting platform for Pega applications and workflow systems. It often fronts case-management or process-heavy applications.", + "Microsoft Edge": "Microsoft-operated edge infrastructure. In DNS this usually means the public name lands on Microsoft's front-door network rather than directly on a private application host.", + "Infinite / agency alias": "A third-party aliasing pattern typically used by an agency or service intermediary. It points traffic onward to the actual delivery platform.", + "CNAME": "A DNS alias record. It says one hostname is really another hostname, rather than directly mapping to an IP address.", + "A record": "A DNS record that maps a hostname to an IPv4 address.", + "AAAA record": "A DNS record that maps a hostname to an IPv6 address.", + "PTR record": "A reverse-DNS record. It maps an IP address back to a hostname and is useful as a provider clue, not as proof of ownership.", + "NXDOMAIN": "A DNS response meaning the name does not exist publicly.", + } diff --git a/ct_master_report.py b/ct_master_report.py new file mode 100644 index 0000000..fa9b2f6 --- /dev/null +++ b/ct_master_report.py @@ -0,0 +1,806 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path + +import ct_dns_utils +import ct_scan +import ct_usage_assessment + + +ENV_TOKENS = [ + "api", + "auth", + "developer", + "webbanking", + "sandbox", + "dev", + "test", + "qa", + "uat", + "preprod", + "prod", + "stage", + "stg", + "release", + "replica", + "support", + "hotfix", + "monitoring", + "mail", + "statement", + "update", + "secure", +] + + +@dataclass +class ExampleBlock: + title: str + subject_cn: str + why_it_matters: str + evidence: list[str] + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Generate a single consolidated CT, DNS, and naming report." + ) + parser.add_argument("--domains-file", type=Path, default=Path("domains.local.txt")) + parser.add_argument("--cache-dir", type=Path, default=Path(".cache/ct-search")) + parser.add_argument("--dns-cache-dir", type=Path, default=Path(".cache/dns-scan")) + parser.add_argument("--cache-ttl-seconds", type=int, default=0) + parser.add_argument("--dns-cache-ttl-seconds", type=int, default=86400) + parser.add_argument("--max-candidates-per-domain", type=int, default=10000) + parser.add_argument("--retries", type=int, default=3) + parser.add_argument("--markdown-output", type=Path, default=Path("output/consolidated-corpus-report.md")) + parser.add_argument("--latex-output", type=Path, default=Path("output/consolidated-corpus-report.tex")) + parser.add_argument("--pdf-output", type=Path, default=Path("output/consolidated-corpus-report.pdf")) + parser.add_argument("--skip-pdf", action="store_true") + parser.add_argument("--pdf-engine", default="xelatex") + parser.add_argument("--quiet", action="store_true") + return parser.parse_args() + + +def load_records(args: argparse.Namespace) -> tuple[list[str], list[ct_scan.DatabaseRecord], dict[str, int]]: + domains = ct_scan.load_domains(args.domains_file) + records: list[ct_scan.DatabaseRecord] = [] + raw_match_counts: dict[str, int] = {} + for domain in domains: + raw_match_counts[domain] = ct_scan.query_raw_match_count(domain=domain, attempts=args.retries, verbose=not args.quiet) + cached = ct_scan.load_cached_records( + cache_dir=args.cache_dir, + domain=domain, + ttl_seconds=args.cache_ttl_seconds, + max_candidates=args.max_candidates_per_domain, + ) + if cached is not None: + if not args.quiet: + print(f"[cache] domain={domain} records={len(cached)}", file=__import__("sys").stderr) + records.extend(cached) + continue + if not args.quiet: + print(f"[query] domain={domain}", file=__import__("sys").stderr) + queried = ct_scan.query_domain( + domain=domain, + max_candidates=args.max_candidates_per_domain, + attempts=args.retries, + verbose=not args.quiet, + ) + ct_scan.store_cached_records(args.cache_dir, domain, args.max_candidates_per_domain, queried) + records.extend(queried) + return domains, records, raw_match_counts + + +def dns_names_from_hits(hits: list[ct_scan.CertificateHit]) -> list[str]: + names = sorted( + { + ct_dns_utils.normalize_name(entry[4:]) + for hit in hits + for entry in hit.san_entries + if entry.startswith("DNS:") + } + ) + return names + + +def enrich_dns(names: list[str], args: argparse.Namespace) -> list[ct_dns_utils.DnsObservation]: + observations = [ct_dns_utils.scan_name_cached(name, args.dns_cache_dir, args.dns_cache_ttl_seconds) for name in names] + unique_ips = sorted({ip for observation in observations for ip in (*observation.a_records, *observation.aaaa_records)}) + ptr_cache_dir = args.dns_cache_dir / "ptr" + ip_ptrs = {ip: ct_dns_utils.ptr_lookup(ip, ptr_cache_dir, args.dns_cache_ttl_seconds) for ip in unique_ips} + for observation in observations: + observation.ptr_records = sorted( + { + ptr + for ip in (*observation.a_records, *observation.aaaa_records) + for ptr in ip_ptrs.get(ip, []) + } + ) + observation.provider_hints = ct_dns_utils.infer_provider_hints(observation) + observation.stack_signature = ct_dns_utils.infer_stack_signature(observation) + return observations + + +def short_issuer_family(issuer_name: str) -> str: + lowered = issuer_name.lower() + if "amazon" in lowered: + return "Amazon" + if "sectigo" in lowered: + return "Sectigo" + if "comodo" in lowered: + return "COMODO" + if "google trust services" in lowered or "cn=we1" in lowered: + return "Google Trust Services" + return "Other" + + +def revocation_counts(hits: list[ct_scan.CertificateHit]) -> Counter[str]: + return Counter(hit.revocation_status for hit in hits) + + +def is_www_pair(hit: ct_scan.CertificateHit) -> bool: + dns_names = sorted(entry[4:] for entry in hit.san_entries if entry.startswith("DNS:")) + if len(dns_names) != 2: + return False + plain = [name for name in dns_names if not name.startswith("www.")] + return len(plain) == 1 and f"www.{plain[0]}" in dns_names + + +def env_token_count(name: str) -> int: + lowered = name.lower() + return sum(1 for token in ENV_TOKENS if token in lowered) + + +def dns_zone_count(hit: ct_scan.CertificateHit) -> int: + zones = {ct_scan.san_tail_split(entry[4:])[1] for entry in hit.san_entries if entry.startswith("DNS:")} + return len(zones) + + +def group_member_hits(groups: list[ct_scan.CertificateGroup], hits: list[ct_scan.CertificateHit]) -> dict[str, list[ct_scan.CertificateHit]]: + mapping: dict[str, list[ct_scan.CertificateHit]] = {} + for group in groups: + mapping[group.group_id] = [hits[index] for index in group.member_indices] + return mapping + + +def stack_counts_for_hits(member_hits: list[ct_scan.CertificateHit], observation_by_name: dict[str, ct_dns_utils.DnsObservation]) -> Counter[str]: + counts: Counter[str] = Counter() + for hit in member_hits: + for entry in hit.san_entries: + if not entry.startswith("DNS:"): + continue + name = ct_dns_utils.normalize_name(entry[4:]) + observation = observation_by_name.get(name) + if observation is not None: + counts[observation.stack_signature] += 1 + return counts + + +def confirm_search_premise(hits: list[ct_scan.CertificateHit], domains: list[str]) -> tuple[int, int]: + missing_matching_san = 0 + subject_not_in_san = 0 + for hit in hits: + dns_names = [entry[4:].lower() for entry in hit.san_entries if entry.startswith("DNS:")] + if not any(any(domain in dns_name for domain in domains) for dns_name in dns_names): + missing_matching_san += 1 + if hit.subject_cn.lower() not in dns_names: + subject_not_in_san += 1 + return missing_matching_san, subject_not_in_san + + +def provider_counts(observations: list[ct_dns_utils.DnsObservation]) -> Counter[str]: + counts: Counter[str] = Counter() + for observation in observations: + for hint in observation.provider_hints: + if hint != "Unclassified": + counts[hint] += 1 + return counts + + +def top_suffixes(hits: list[ct_scan.CertificateHit], limit: int = 8) -> list[tuple[str, int]]: + counts: Counter[str] = Counter() + for hit in hits: + labels = hit.subject_cn.lower().split(".") + suffix = ".".join(labels[1:]) if len(labels) > 1 else hit.subject_cn.lower() + counts[suffix] += 1 + return counts.most_common(limit) + + +def top_env_tokens(hits: list[ct_scan.CertificateHit], limit: int = 10) -> list[tuple[str, int]]: + counts: Counter[str] = Counter() + for hit in hits: + lowered = hit.subject_cn.lower() + for token in ENV_TOKENS: + if token in lowered: + counts[token] += 1 + return counts.most_common(limit) + + +def pick_examples( + hits: list[ct_scan.CertificateHit], + groups: list[ct_scan.CertificateGroup], + observation_by_name: dict[str, ct_dns_utils.DnsObservation], +) -> list[ExampleBlock]: + examples: list[ExampleBlock] = [] + group_map = group_member_hits(groups, hits) + + numbered_groups = [group for group in groups if group.group_type == "numbered_cn_pattern"] + if numbered_groups: + group = max(numbered_groups, key=lambda item: item.member_count) + member_hits = group_map[group.group_id] + stack_counts = stack_counts_for_hits(member_hits, observation_by_name) + example_hit = max(member_hits, key=lambda item: (len(item.san_entries), len(item.subject_cn))) + examples.append( + ExampleBlock( + title="Shared operational rail", + subject_cn=example_hit.subject_cn, + why_it_matters="A numbered CN family usually signals a reusable service rail rather than a one-off branded page. It tends to expose fleet-style naming, repeated validity cycles, and many sibling hostnames.", + evidence=[ + f"Group basis: {ct_scan.describe_group_basis(group).replace('`', '')}.", + f"Certificates in family: {group.member_count}.", + f"Distinct Subject CNs in family: {group.distinct_subject_cn_count}.", + f"Top observed DNS delivery stacks: {', '.join(f'{label} ({count})' for label, count in stack_counts.most_common(3)) or 'none'}.", + ], + ) + ) + + matrix_hits = [hit for hit in hits if len(hit.san_entries) >= 12 and env_token_count(hit.subject_cn) >= 1] + if matrix_hits: + hit = max(matrix_hits, key=lambda item: (len(item.san_entries), dns_zone_count(item), item.subject_cn)) + zones = sorted({ct_scan.san_tail_split(entry[4:])[1] for entry in hit.san_entries if entry.startswith("DNS:")}) + examples.append( + ExampleBlock( + title="Environment matrix certificate", + subject_cn=hit.subject_cn, + why_it_matters="A large SAN set with environment-style labels usually means one certificate is covering a coordinated platform surface across test, release, support, or tenant slices.", + evidence=[ + f"SAN entries: {len(hit.san_entries)}.", + f"Distinct DNS zones in SAN set: {len(zones)}.", + f"Environment tokens visible in Subject CN: {env_token_count(hit.subject_cn)}.", + f"First DNS zones in SAN set: {', '.join(zones[:6])}.", + ], + ) + ) + + www_hits = [hit for hit in hits if is_www_pair(hit)] + if www_hits: + hit = min(www_hits, key=lambda item: (item.subject_cn.count("."), item.subject_cn)) + examples.append( + ExampleBlock( + title="Clean public front door", + subject_cn=hit.subject_cn, + why_it_matters="A two-name SAN pairing of the apex hostname with its www form is usually a deliberate customer-facing presentation rule rather than an internal platform rail.", + evidence=[ + f"SAN entries: {', '.join(entry[4:] for entry in hit.san_entries if entry.startswith('DNS:'))}.", + f"Issuer: {sorted(hit.issuer_names)[0]}.", + f"Revocation status: {hit.revocation_status}.", + ], + ) + ) + + cross_zone_hits = [hit for hit in hits if dns_zone_count(hit) > 1] + if cross_zone_hits: + hit = max(cross_zone_hits, key=lambda item: (dns_zone_count(item), len(item.san_entries), item.subject_cn)) + zones = sorted({ct_scan.san_tail_split(entry[4:])[1] for entry in hit.san_entries if entry.startswith("DNS:")}) + examples.append( + ExampleBlock( + title="Cross-zone bridge", + subject_cn=hit.subject_cn, + why_it_matters="When one certificate spans several DNS zones, it often reveals a shared service or a migration bridge between branded fronts and underlying service domains.", + evidence=[ + f"Distinct DNS zones in SAN set: {len(zones)}.", + f"Representative zones: {', '.join(zones[:8])}.", + f"SAN entries: {len(hit.san_entries)}.", + ], + ) + ) + + return examples + + +def build_group_digest( + groups: list[ct_scan.CertificateGroup], + hits: list[ct_scan.CertificateHit], + observation_by_name: dict[str, ct_dns_utils.DnsObservation], + limit: int = 20, +) -> list[dict[str, str]]: + digest: list[dict[str, str]] = [] + group_map = group_member_hits(groups, hits) + for group in groups[:limit]: + member_hits = group_map[group.group_id] + stack_counts = stack_counts_for_hits(member_hits, observation_by_name) + digest.append( + { + "group_id": group.group_id, + "basis": ct_scan.describe_group_basis(group).replace("`", ""), + "type": group.group_type, + "certificates": str(group.member_count), + "subjects": str(group.distinct_subject_cn_count), + "top_stacks": ", ".join(f"{label} ({count})" for label, count in stack_counts.most_common(3)) or "none", + } + ) + return digest + + +def summarize_for_report(args: argparse.Namespace) -> dict[str, object]: + domains, records, raw_match_counts = load_records(args) + hits, verification = ct_scan.build_hits(records) + groups = ct_scan.build_groups(hits) + issuer_trust = ct_scan.query_issuer_trust(hits) + classifications = ct_usage_assessment.build_classifications(hits, records) + purpose_summary = ct_usage_assessment.summarize(classifications, domains) + unique_dns_names = dns_names_from_hits(hits) + observations = enrich_dns(unique_dns_names, args) + observation_by_name = {observation.original_name: observation for observation in observations} + rev_counts = revocation_counts(hits) + provider_hint_counts = provider_counts(observations) + dns_class_counts = Counter(observation.classification for observation in observations) + dns_stack_counts = Counter(observation.stack_signature for observation in observations) + issuer_counts = Counter(ct_scan.primary_issuer_name(hit) for hit in hits) + issuer_family_counts = Counter(short_issuer_family(name) for name in issuer_counts.elements()) + missing_matching_san, subject_not_in_san = confirm_search_premise(hits, domains) + numbered_groups = [group for group in groups if group.group_type == "numbered_cn_pattern"] + public_www_pair_count = sum(1 for hit in hits if is_www_pair(hit)) + multi_zone_hit_count = sum(1 for hit in hits if dns_zone_count(hit) > 1) + examples = pick_examples(hits, groups, observation_by_name) + digest = build_group_digest(groups, hits, observation_by_name) + trusted_major = sum(1 for info in issuer_trust.values() if info.major_webpki) + current_day = datetime.now(UTC).date().isoformat() + + return { + "generated_at_utc": ct_scan.utc_iso(datetime.now(UTC)), + "current_day": current_day, + "domains": domains, + "raw_match_counts": raw_match_counts, + "cap": args.max_candidates_per_domain, + "hits": hits, + "groups": groups, + "verification": verification, + "issuer_trust": issuer_trust, + "purpose_summary": purpose_summary, + "classifications": classifications, + "unique_dns_names": unique_dns_names, + "observations": observations, + "observation_by_name": observation_by_name, + "rev_counts": rev_counts, + "provider_hint_counts": provider_hint_counts, + "dns_class_counts": dns_class_counts, + "dns_stack_counts": dns_stack_counts, + "issuer_counts": issuer_counts, + "issuer_family_counts": issuer_family_counts, + "missing_matching_san": missing_matching_san, + "subject_not_in_san": subject_not_in_san, + "numbered_groups": numbered_groups, + "public_www_pair_count": public_www_pair_count, + "multi_zone_hit_count": multi_zone_hit_count, + "examples": examples, + "top_suffixes": top_suffixes(hits), + "top_env_tokens": top_env_tokens(hits), + "group_digest": digest, + "trusted_major": trusted_major, + } + + +def md_bullets(items: list[str]) -> list[str]: + return [f"- {item}" for item in items] + + +def render_markdown(path: Path, report: dict[str, object]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + hits = report["hits"] + groups = report["groups"] + rev_counts = report["rev_counts"] + purpose_summary = report["purpose_summary"] + lines: list[str] = [] + lines.append("# Consolidated CT, Certificate, and DNS Report") + lines.append("") + lines.append(f"Generated: {report['generated_at_utc']}") + lines.append(f"Configured search terms file: `{report['domains']}`") + lines.append("") + lines.append("## Executive Overview") + lines.append("") + lines.extend( + md_bullets( + [ + f"{len(hits)} current leaf certificates are in scope after local leaf-only verification.", + f"{len(groups)} CN families reduce the raw certificate list into readable naming clusters.", + f"{purpose_summary.category_counts.get('tls_server_only', 0)} certificates are strict server-auth and {purpose_summary.category_counts.get('tls_server_and_client', 0)} also allow client auth.", + f"{len(report['unique_dns_names'])} unique DNS SAN names were scanned live; the estate collapses into a small number of recurring delivery stacks.", + "The strongest overall reading is a layered operating model: branded public names on top, reusable service rails underneath, and cloud or vendor delivery platforms at the edge.", + ] + ) + ) + lines.append("") + lines.append("## Chapter 1: Method, Integrity, and How To Read This") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + f"The scan now fails fast if the candidate cap is lower than the live raw match count. Current raw counts: {', '.join(f'{domain}={count}' for domain, count in report['raw_match_counts'].items())}.", + f"The live candidate cap used for this run was {report['cap']}, which is safely above the current raw counts.", + f"Leaf-only verification kept {report['verification'].unique_leaf_certificates} certificates and filtered {report['verification'].non_leaf_filtered} CA-style certificates and {report['verification'].precertificate_poison_filtered} precertificate-poison objects.", + f"Every certificate in scope still contains at least one DNS SAN containing one of the configured search terms; exceptions found: {report['missing_matching_san']}.", + ] + ) + ) + lines.append("") + lines.append("Certificate Transparency is the public logging layer for issued certificates. The scan starts there, then reads the actual X.509 certificate bytes, verifies that each object is a real leaf certificate, extracts SAN and Subject CN values, checks revocation state from crt.sh data, and then scans the DNS names seen in SANs.") + lines.append("") + lines.append("A **Subject CN** is the traditional primary name in a certificate. A **SAN** list is the modern list of all names the certificate covers. A **leaf certificate** is the endpoint certificate presented by a service, as distinct from a CA certificate used to sign other certificates.") + lines.append("") + lines.append("## Chapter 2: Certificate Corpus") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + f"The issuer landscape is concentrated: {', '.join(f'{name} ({count})' for name, count in report['issuer_family_counts'].most_common())}.", + f"Revocation mix: {rev_counts.get('not_revoked', 0)} not revoked, {rev_counts.get('revoked', 0)} revoked, {rev_counts.get('unknown', 0)} unknown.", + f"Purpose split: {purpose_summary.category_counts.get('tls_server_only', 0)} server-only, {purpose_summary.category_counts.get('tls_server_and_client', 0)} server+client, and zero client-only, S/MIME, or code-signing certificates.", + f"All {len(hits)} Subject CN values appear literally in the SAN DNS set.", + ] + ) + ) + lines.append("") + lines.append("An **issuer CA** is the certificate authority that signed the endpoint certificate. A **WebPKI-trusted** issuer is one that browsers and operating systems currently trust for public TLS. In this corpus, all visible issuers are live server-auth issuers in the public trust ecosystem.") + lines.append("") + lines.append("### Issuer Breakdown") + lines.append("") + for issuer_name, count in report["issuer_counts"].most_common(): + trust = report["issuer_trust"][issuer_name] + lines.append(f"- `{issuer_name}`: {count} certificates | major WebPKI stores: {'yes' if trust.major_webpki else 'no'}") + lines.append("") + lines.append("### Purpose Assessment") + lines.append("") + for category, count in purpose_summary.category_counts.items(): + lines.append(f"- `{category}`: {count}") + lines.append("") + lines.append( + "An **Extended Key Usage (EKU)** value tells software what the certificate is allowed to do. " + f"Here the estate is entirely TLS-capable. The only nuance is that {purpose_summary.category_counts.get('tls_server_and_client', 0)} certificates also allow `clientAuth`. " + "That does not by itself prove a separate client-certificate estate; in context, they still look like hostname certificates issued from a permissive or older server template." + ) + lines.append("") + lines.append("## Chapter 3: Naming Architecture") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + f"{len(report['numbered_groups'])} numbered CN families point to reusable service rails rather than one-off pages.", + f"{report['public_www_pair_count']} certificates use the clean public front-door pattern of a base name paired with `www`.", + f"{report['multi_zone_hit_count']} certificates span more than one DNS zone in SAN, which is usually a sign of shared platforms, migrations, or multi-brand exposure.", + f"Most common suffixes: {', '.join(f'{suffix} ({count})' for suffix, count in report['top_suffixes'])}.", + ] + ) + ) + lines.append("") + lines.append("Hostnames often look arbitrary because they are doing several jobs at once. Some names are for customers, some are for engineers, some encode environment state, and some preserve older platform lineage because renaming working infrastructure is costly.") + lines.append("") + lines.append("### Frequent Naming Tokens") + lines.append("") + for token, count in report["top_env_tokens"]: + lines.append(f"- `{token}`: {count}") + lines.append("") + lines.append("### Dynamic Examples") + lines.append("") + for example in report["examples"]: + lines.append(f"#### {example.title}") + lines.append("") + lines.append(f"- Subject CN: `{example.subject_cn}`") + lines.append(f"- Why it matters: {example.why_it_matters}") + for point in example.evidence: + lines.append(f"- Evidence: {point}") + lines.append("") + lines.append("## Chapter 4: DNS Delivery Architecture") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + f"{len(report['unique_dns_names'])} unique DNS names were scanned from the SAN corpus.", + f"DNS classes: {', '.join(f'{label}={count}' for label, count in report['dns_class_counts'].most_common())}.", + f"Top delivery signatures: {', '.join(f'{label} ({count})' for label, count in report['dns_stack_counts'].most_common(6))}.", + "The DNS layer turns a large hostname set into a smaller number of delivery stacks: CDN edges, API gateways, load balancers, and specialist vendor platforms.", + ] + ) + ) + lines.append("") + lines.append("A **CNAME** is a DNS alias, meaning one hostname points to another hostname. An **A** or **AAAA** record is the final address mapping. An **NXDOMAIN** response means the public DNS name does not exist at the moment of the scan. That does not automatically invalidate the certificate-side finding, because certificate and DNS lifecycles can move at different speeds.") + lines.append("") + lines.append("### Delivery Stack Counts") + lines.append("") + for label, count in report["dns_stack_counts"].most_common(12): + lines.append(f"- `{label}`: {count}") + lines.append("") + lines.append("### Platform and Provider Explanations") + lines.append("") + glossary = ct_dns_utils.provider_explanations() + seen_terms = set() + for observation in report["observations"]: + seen_terms.update(observation.provider_hints) + for term in ["Adobe Campaign", "AWS", "AWS CloudFront", "AWS ALB", "Google Apigee", "Pega Cloud", "Microsoft Edge", "Infinite / agency alias", "CNAME", "A record", "AAAA record", "PTR record", "NXDOMAIN"]: + if term in glossary and (term in seen_terms or term in {"CNAME", "A record", "AAAA record", "PTR record", "NXDOMAIN", "AWS ALB"}): + lines.append(f"- **{term}**: {glossary[term]}") + lines.append("") + lines.append("## Chapter 5: Where The Certificate View and DNS View Meet") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + "The certificate layer describes naming and trust; the DNS layer describes delivery and reachability. The same estate becomes legible only when both are read together.", + "Numbered CN families usually behave like shared operational rails in certificates and collapse into repeatable delivery stacks in DNS.", + "Cleaner public names tend to be the presentation layer, while denser SAN sets and multi-zone families tend to expose the platform layer underneath.", + ] + ) + ) + lines.append("") + lines.append("The common ground is operational reality. A brand or product team wants a recognisable public name. A platform team wants a stable service rail. A delivery team wants environment labels and routable front doors. Certificates and DNS show those layers from different angles, which is why the estate looks messy when read from only one side.") + lines.append("") + lines.append("### Top Family Digest") + lines.append("") + for row in report["group_digest"]: + lines.append( + f"- `{row['group_id']}` | {row['basis']} | type={row['type']} | certs={row['certificates']} | subjects={row['subjects']} | stacks={row['top_stacks']}" + ) + lines.append("") + lines.append("## Chapter 6: Confidence, Limits, and Claims") + lines.append("") + lines.append("**Management Summary**") + lines.append("") + lines.extend( + md_bullets( + [ + "Strongest claims: issuer trust, leaf-only status, SAN and Subject CN structure, purpose EKU split, DNS stack signatures, and recurring family patterns.", + "Medium-confidence claims: that the estate reflects a layered organisation with brand, platform, and delivery concerns superimposed on each other.", + "Lower-confidence claims: exact meanings of internal abbreviations or exact organisation-chart boundaries inferred from naming alone.", + ] + ) + ) + lines.append("") + lines.append("This report can prove what is visible in public certificate and DNS data. It cannot prove internal governance charts or the exact human meaning of every abbreviation. Where the report interprets rather than measures, it does so by tying the interpretation to repeated observable patterns.") + lines.append("") + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def tex_escape(value: str) -> str: + return ct_scan.latex_escape(value) + + +def render_latex(path: Path, report: dict[str, object]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + hits = report["hits"] + groups = report["groups"] + rev_counts = report["rev_counts"] + purpose_summary = report["purpose_summary"] + + lines: list[str] = [ + r"\documentclass[11pt]{article}", + r"\usepackage[a4paper,margin=18mm]{geometry}", + r"\usepackage{fontspec}", + r"\usepackage[table]{xcolor}", + r"\usepackage{microtype}", + r"\usepackage{hyperref}", + r"\usepackage{xurl}", + r"\usepackage{array}", + r"\usepackage{booktabs}", + r"\usepackage{tabularx}", + r"\usepackage{longtable}", + r"\usepackage{enumitem}", + r"\usepackage{fancyhdr}", + r"\usepackage{titlesec}", + r"\usepackage[most]{tcolorbox}", + r"\defaultfontfeatures{Ligatures=TeX,Scale=MatchLowercase}", + r"\setmainfont{Palatino}", + r"\setsansfont{Avenir Next}", + r"\setmonofont{Menlo}", + r"\definecolor{Ink}{HTML}{17202A}", + r"\definecolor{Muted}{HTML}{667085}", + r"\definecolor{Line}{HTML}{D0D5DD}", + r"\definecolor{Panel}{HTML}{F8FAFC}", + r"\definecolor{Accent}{HTML}{0F766E}", + r"\definecolor{AccentSoft}{HTML}{E6F4F1}", + r"\hypersetup{colorlinks=true,linkcolor=Accent,urlcolor=Accent,pdfauthor={CertTransparencySearch},pdftitle={Consolidated CT, Certificate, and DNS Report}}", + r"\setlength{\parindent}{0pt}", + r"\setlength{\parskip}{6pt}", + r"\setcounter{tocdepth}{2}", + r"\pagestyle{fancy}", + r"\fancyhf{}", + r"\fancyhead[L]{\sffamily\footnotesize Consolidated CT Report}", + r"\fancyhead[R]{\sffamily\footnotesize \nouppercase{\leftmark}}", + r"\fancyfoot[C]{\sffamily\footnotesize \thepage}", + r"\titleformat{\section}{\sffamily\bfseries\LARGE\color{Ink}}{\thesection}{0.8em}{}", + r"\titleformat{\subsection}{\sffamily\bfseries\Large\color{Ink}}{\thesubsection}{0.8em}{}", + r"\tcbset{panel/.style={enhanced,breakable,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=white,colframe=Line}}", + r"\newcommand{\SummaryBox}[1]{\begin{tcolorbox}[panel,colback=Panel]#1\end{tcolorbox}}", + r"\begin{document}", + r"\begin{titlepage}", + r"\vspace*{18mm}", + r"{\sffamily\bfseries\fontsize{24}{28}\selectfont Consolidated CT, Certificate, and DNS Report\par}", + r"\vspace{6pt}", + r"{\Large One document for the certificate corpus, naming system, DNS delivery view, and proof boundaries\par}", + r"\vspace{18pt}", + rf"\textbf{{Generated}}: {tex_escape(report['generated_at_utc'])}\par", + rf"\textbf{{Configured search terms file}}: {tex_escape(str(report['domains']))}\par", + r"\vspace{12pt}", + r"\SummaryBox{" + + rf"\textbf{{Headline}}: {len(hits)} leaf certificates, {len(groups)} CN families, {len(report['unique_dns_names'])} DNS names, " + + rf"{purpose_summary.category_counts.get('tls_server_only', 0)} strict server-auth certificates, " + + rf"{purpose_summary.category_counts.get('tls_server_and_client', 0)} dual-EKU certificates." + + r"}", + r"\end{titlepage}", + r"\tableofcontents", + r"\clearpage", + ] + + def add_summary(items: list[str]) -> None: + lines.append(r"\SummaryBox{\textbf{Management Summary}\begin{itemize}[leftmargin=1.4em]") + for item in items: + lines.append(rf"\item {tex_escape(item)}") + lines.append(r"\end{itemize}}") + + lines.append(r"\section{Method, Integrity, and How To Read This}") + add_summary( + [ + f"The scanner now refuses to run if the candidate cap is lower than the live raw match count; current counts are {', '.join(f'{domain}={count}' for domain, count in report['raw_match_counts'].items())}.", + f"The live cap used for this run was {report['cap']}.", + f"Leaf-only verification kept {report['verification'].unique_leaf_certificates} certificates.", + f"Configured search-term coverage failures: {report['missing_matching_san']}.", + ] + ) + lines.append( + r"Certificate Transparency is the public logging layer for issued certificates. The report starts there, validates the actual X.509 certificate bytes, and then scans the DNS names exposed in SANs. A Subject CN is the traditional primary name in a certificate; a SAN list is the modern set of all names the certificate covers." + ) + + lines.append(r"\section{Certificate Corpus}") + add_summary( + [ + f"{len(hits)} current leaf certificates are in scope.", + f"Revocation mix: not revoked={rev_counts.get('not_revoked', 0)}, revoked={rev_counts.get('revoked', 0)}, unknown={rev_counts.get('unknown', 0)}.", + f"Purpose split: server-only={purpose_summary.category_counts.get('tls_server_only', 0)}, server+client={purpose_summary.category_counts.get('tls_server_and_client', 0)}.", + f"All Subject CN values appear in SAN DNS names.", + ] + ) + lines.extend( + [ + r"\subsection{Issuer Breakdown}", + r"\begin{longtable}{>{\raggedright\arraybackslash}p{0.67\linewidth} >{\raggedleft\arraybackslash}p{0.12\linewidth} >{\raggedleft\arraybackslash}p{0.12\linewidth}}", + r"\toprule", + r"Issuer & Count & WebPKI \\", + r"\midrule", + ] + ) + for issuer_name, count in report["issuer_counts"].most_common(): + trust = report["issuer_trust"][issuer_name] + lines.append(rf"{tex_escape(issuer_name)} & {count} & {'yes' if trust.major_webpki else 'no'} \\") + lines.extend([r"\bottomrule", r"\end{longtable}"]) + lines.append(r"\subsection{Purpose Assessment}") + lines.append(r"\begin{itemize}[leftmargin=1.4em]") + for category, count in purpose_summary.category_counts.items(): + lines.append(rf"\item \texttt{{{tex_escape(category)}}}: {count}") + lines.append(r"\end{itemize}") + + lines.append(r"\section{Naming Architecture}") + add_summary( + [ + f"{len(report['numbered_groups'])} numbered CN families indicate reusable service rails.", + f"{report['public_www_pair_count']} certificates use a base-name plus www pairing.", + f"{report['multi_zone_hit_count']} certificates span more than one DNS zone in SAN.", + f"Most common suffixes are {', '.join(f'{suffix} ({count})' for suffix, count in report['top_suffixes'][:4])}.", + ] + ) + lines.append(r"\subsection{Representative Examples}") + for example in report["examples"]: + lines.append(r"\SummaryBox{") + lines.append(rf"\textbf{{{tex_escape(example.title)}}}\par") + lines.append(rf"\textbf{{Subject CN}}: \texttt{{{tex_escape(example.subject_cn)}}}\par") + lines.append(tex_escape(example.why_it_matters) + r"\par") + lines.append(r"\begin{itemize}[leftmargin=1.4em]") + for point in example.evidence: + lines.append(rf"\item {tex_escape(point)}") + lines.append(r"\end{itemize}}") + + lines.append(r"\section{DNS Delivery Architecture}") + add_summary( + [ + f"{len(report['unique_dns_names'])} unique DNS names were scanned from SAN.", + f"Top delivery signatures are {', '.join(f'{label} ({count})' for label, count in report['dns_stack_counts'].most_common(5))}.", + "The DNS view reduces many hostnames into a smaller set of recurring delivery platforms.", + ] + ) + lines.extend( + [ + r"\subsection{Delivery Stack Counts}", + r"\begin{longtable}{>{\raggedright\arraybackslash}p{0.72\linewidth} >{\raggedleft\arraybackslash}p{0.16\linewidth}}", + r"\toprule", + r"Stack signature & Count \\", + r"\midrule", + ] + ) + for label, count in report["dns_stack_counts"].most_common(12): + lines.append(rf"{tex_escape(label)} & {count} \\") + lines.extend([r"\bottomrule", r"\end{longtable}"]) + + lines.append(r"\subsection{Platform Glossary}") + lines.append(r"\begin{longtable}{>{\raggedright\arraybackslash}p{0.22\linewidth} >{\raggedright\arraybackslash}p{0.70\linewidth}}") + lines.append(r"\toprule") + lines.append(r"Term & Explanation \\") + lines.append(r"\midrule") + glossary = ct_dns_utils.provider_explanations() + seen_terms = set() + for observation in report["observations"]: + seen_terms.update(observation.provider_hints) + for term in ["Adobe Campaign", "AWS", "AWS CloudFront", "AWS ALB", "Google Apigee", "Pega Cloud", "Microsoft Edge", "Infinite / agency alias", "CNAME", "A record", "AAAA record", "PTR record", "NXDOMAIN"]: + if term in glossary and (term in seen_terms or term in {"CNAME", "A record", "AAAA record", "PTR record", "NXDOMAIN", "AWS ALB"}): + lines.append(rf"{tex_escape(term)} & {tex_escape(glossary[term])} \\") + lines.extend([r"\bottomrule", r"\end{longtable}"]) + + lines.append(r"\section{Where The Certificate View and DNS View Meet}") + add_summary( + [ + "Certificates explain naming, trust, and purpose; DNS explains routing, reachability, and platform landing points.", + "Numbered families usually behave like shared service rails, while clean two-name SAN pairs usually behave like public presentation fronts.", + "The estate becomes coherent when brand, platform, and delivery are treated as different layers of the same system.", + ] + ) + lines.extend( + [ + r"\subsection{Top Family Digest}", + r"\begin{longtable}{>{\raggedright\arraybackslash}p{0.09\linewidth} >{\raggedright\arraybackslash}p{0.39\linewidth} >{\raggedright\arraybackslash}p{0.15\linewidth} >{\raggedleft\arraybackslash}p{0.09\linewidth} >{\raggedleft\arraybackslash}p{0.09\linewidth} >{\raggedright\arraybackslash}p{0.13\linewidth}}", + r"\toprule", + r"ID & Basis & Type & Certs & CNs & Top stacks \\", + r"\midrule", + ] + ) + for row in report["group_digest"]: + lines.append( + rf"{tex_escape(row['group_id'])} & {tex_escape(row['basis'])} & {tex_escape(row['type'])} & {row['certificates']} & {row['subjects']} & {tex_escape(row['top_stacks'])} \\" + ) + lines.extend([r"\bottomrule", r"\end{longtable}"]) + + lines.append(r"\section{Confidence, Limits, and Claims}") + add_summary( + [ + "Strong claims in this report are the ones tied directly to certificate fields, DNS answers, and trust records.", + "Interpretive claims are constrained to repeated patterns and are stated as readings, not as internal-org certainties.", + "The exact meaning of internal abbreviations cannot be proven from CT and DNS alone.", + ] + ) + lines.append( + r"The report can prove which issuers are used, which EKU patterns exist, which DNS stacks are visible, and which naming families repeat. It cannot prove the exact internal org chart or the exact human expansion of every short token." + ) + lines.append(r"\end{document}") + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def main() -> int: + args = parse_args() + report = summarize_for_report(args) + render_markdown(args.markdown_output, report) + render_latex(args.latex_output, report) + if not args.skip_pdf: + ct_scan.compile_latex_to_pdf(args.latex_output, args.pdf_output, args.pdf_engine) + if not args.quiet: + print( + f"[report] markdown={args.markdown_output} latex={args.latex_output}" + + ("" if args.skip_pdf else f" pdf={args.pdf_output}"), + file=__import__("sys").stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/ct_scan.py b/ct_scan.py new file mode 100644 index 0000000..f34e1f1 --- /dev/null +++ b/ct_scan.py @@ -0,0 +1,1445 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import base64 +import hashlib +import json +import re +import shutil +import subprocess +import sys +import time +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import psycopg +from cryptography import x509 +from cryptography.x509 import general_name +from cryptography.x509.oid import ExtensionOID +from cryptography.x509.oid import NameOID +from psycopg.rows import dict_row + + +QUERY_SQL = """ +WITH ci AS ( + SELECT + min(sub.certificate_id) AS id, + min(sub.issuer_ca_id) AS issuer_ca_id, + x509_commonName(sub.certificate) AS common_name, + x509_subjectName(sub.certificate) AS subject_dn, + x509_notBefore(sub.certificate) AS not_before, + x509_notAfter(sub.certificate) AS not_after, + encode(x509_serialNumber(sub.certificate), 'hex') AS serial_number, + sub.certificate AS certificate + FROM ( + SELECT cai.* + FROM certificate_and_identities cai + WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate) + AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\' + LIMIT %(max_candidates)s + ) sub + GROUP BY sub.certificate +) +SELECT + ci.id, + ci.issuer_ca_id, + ca.name AS issuer_name, + ci.common_name, + ci.subject_dn, + ci.not_before, + ci.not_after, + cl.first_seen, + ci.serial_number, + coalesce(cl.revoked, 0) AS revoked_count, + rev.revocation_date, + rev.reason_code, + rev.last_seen_check_date, + crl_state.active_crl_count, + crl_state.last_checked AS crl_last_checked, + ci.certificate +FROM ci +JOIN ca ON ca.id = ci.issuer_ca_id +JOIN certificate_lifecycle cl ON cl.certificate_id = ci.id +LEFT JOIN LATERAL ( + SELECT + cr.revocation_date, + cr.reason_code, + cr.last_seen_check_date + FROM crl_revoked cr + WHERE cr.ca_id = ci.issuer_ca_id + AND cr.serial_number = decode(ci.serial_number, 'hex') + ORDER BY cr.last_seen_check_date DESC NULLS LAST + LIMIT 1 +) rev ON TRUE +LEFT JOIN LATERAL ( + SELECT + count(*) FILTER ( + WHERE crl.error_message IS NULL + AND crl.next_update > now() AT TIME ZONE 'UTC' + ) AS active_crl_count, + max(crl.last_checked) AS last_checked + FROM crl + WHERE crl.ca_id = ci.issuer_ca_id +) crl_state ON TRUE +WHERE ci.not_before <= now() AT TIME ZONE 'UTC' + AND ci.not_after >= now() AT TIME ZONE 'UTC' + AND cl.certificate_type = 'Certificate' +ORDER BY cl.first_seen DESC NULLS LAST, ci.id DESC; +""" + + +RAW_MATCH_COUNT_SQL = """ +SELECT count(*) +FROM certificate_and_identities cai +WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate) + AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\' +""" + + +REVOCATION_REASONS = { + 1: "keyCompromise", + 2: "cACompromise", + 3: "affiliationChanged", + 4: "superseded", + 5: "cessationOfOperation", + 6: "certificateHold", + 8: "removeFromCRL", + 9: "privilegeWithdrawn", + 10: "aACompromise", +} + + +PRECERT_POISON_OID = x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3") + + +@dataclass +class DatabaseRecord: + domain: str + certificate_id: int + issuer_ca_id: int + issuer_name: str + common_name: str | None + subject_dn: str | None + not_before: datetime + not_after: datetime + first_seen: datetime | None + serial_number: str + revoked_count: int + revocation_date: datetime | None + reason_code: int | None + last_seen_check_date: datetime | None + active_crl_count: int + crl_last_checked: datetime | None + certificate_der: bytes + + +@dataclass +class CertificateHit: + fingerprint_sha256: str + subject_cn: str + validity_not_before: datetime + validity_not_after: datetime + san_entries: list[str] + revocation_status: str + revocation_date: datetime | None + revocation_reason: str | None + revocation_note: str | None + crtsh_crl_timestamp: datetime | None + matched_domains: set[str] = field(default_factory=set) + first_seen: datetime | None = None + crtsh_certificate_ids: set[int] = field(default_factory=set) + serial_numbers: set[str] = field(default_factory=set) + issuer_names: set[str] = field(default_factory=set) + issuer_ca_ids: set[int] = field(default_factory=set) + + +@dataclass +class VerificationStats: + input_rows: int = 0 + unique_leaf_certificates: int = 0 + non_leaf_filtered: int = 0 + precertificate_poison_filtered: int = 0 + + +@dataclass +class CertificateGroup: + group_id: str + group_type: str + member_indices: list[int] + member_count: int + distinct_subject_cn_count: int + distinct_exact_content_count: int + numbered_cn_patterns: set[str] + matched_domains: set[str] + subject_cns: set[str] + first_seen_min: datetime | None + first_seen_max: datetime | None + valid_from_min: datetime + valid_to_max: datetime + revocation_counts: Counter + + +@dataclass +class ScanStats: + generated_at_utc: str + configured_domains: list[str] + unique_leaf_certificates: int + groups_total: int + groups_multi_member: int + groups_singleton: int + groups_by_type: dict[str, int] + verification: VerificationStats + + +@dataclass +class IssuerTrustInfo: + issuer_name: str + issuer_ca_ids: set[int] + server_auth_contexts: set[str] + major_webpki: bool + + +def load_domains(path: Path) -> list[str]: + domains: list[str] = [] + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip().lower() + if not line or line.startswith("#"): + continue + if line.startswith("*."): + line = line[2:] + domains.append(line) + unique_domains = sorted(set(domains)) + if not unique_domains: + raise ValueError(f"No domains found in {path}") + return unique_domains + + +def escape_like(value: str) -> str: + return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + + +def utc_iso(value: datetime | None) -> str: + if value is None: + return "n/a" + if value.tzinfo is None: + value = value.replace(tzinfo=UTC) + else: + value = value.astimezone(UTC) + return value.isoformat(timespec="seconds").replace("+00:00", "Z") + + +def serialize_datetime(value: datetime | None) -> str | None: + return utc_iso(value) if value is not None else None + + +def parse_datetime(value: str | None) -> datetime | None: + if value is None: + return None + return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) + + +def cache_path(cache_dir: Path, domain: str) -> Path: + safe_domain = "".join(ch if ch.isalnum() or ch in "-._" else "_" for ch in domain) + return cache_dir / f"{safe_domain}.json" + + +def record_to_cache_payload(record: DatabaseRecord) -> dict[str, Any]: + return { + "domain": record.domain, + "certificate_id": record.certificate_id, + "issuer_ca_id": record.issuer_ca_id, + "issuer_name": record.issuer_name, + "common_name": record.common_name, + "subject_dn": record.subject_dn, + "not_before": serialize_datetime(record.not_before), + "not_after": serialize_datetime(record.not_after), + "first_seen": serialize_datetime(record.first_seen), + "serial_number": record.serial_number, + "revoked_count": record.revoked_count, + "revocation_date": serialize_datetime(record.revocation_date), + "reason_code": record.reason_code, + "last_seen_check_date": serialize_datetime(record.last_seen_check_date), + "active_crl_count": record.active_crl_count, + "crl_last_checked": serialize_datetime(record.crl_last_checked), + "certificate_der_b64": base64.b64encode(record.certificate_der).decode("ascii"), + } + + +def record_from_cache_payload(payload: dict[str, Any]) -> DatabaseRecord: + return DatabaseRecord( + domain=payload["domain"], + certificate_id=int(payload["certificate_id"]), + issuer_ca_id=int(payload["issuer_ca_id"]), + issuer_name=payload["issuer_name"], + common_name=payload.get("common_name"), + subject_dn=payload.get("subject_dn"), + not_before=parse_datetime(payload["not_before"]) or datetime.min, + not_after=parse_datetime(payload["not_after"]) or datetime.min, + first_seen=parse_datetime(payload.get("first_seen")), + serial_number=payload["serial_number"], + revoked_count=int(payload["revoked_count"]), + revocation_date=parse_datetime(payload.get("revocation_date")), + reason_code=payload.get("reason_code"), + last_seen_check_date=parse_datetime(payload.get("last_seen_check_date")), + active_crl_count=int(payload["active_crl_count"]), + crl_last_checked=parse_datetime(payload.get("crl_last_checked")), + certificate_der=base64.b64decode(payload["certificate_der_b64"]), + ) + + +def load_cached_records(cache_dir: Path, domain: str, ttl_seconds: int, max_candidates: int) -> list[DatabaseRecord] | None: + path = cache_path(cache_dir, domain) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + if payload.get("version") != 1: + return None + if payload.get("max_candidates") != max_candidates: + return None + cached_at = parse_datetime(payload.get("cached_at")) + if cached_at is None: + return None + age = time.time() - cached_at.replace(tzinfo=UTC).timestamp() + if age > ttl_seconds: + return None + return [record_from_cache_payload(item) for item in payload.get("records", [])] + + +def store_cached_records(cache_dir: Path, domain: str, max_candidates: int, records: list[DatabaseRecord]) -> None: + cache_dir.mkdir(parents=True, exist_ok=True) + payload = { + "version": 1, + "cached_at": utc_iso(datetime.now(UTC)), + "max_candidates": max_candidates, + "records": [record_to_cache_payload(record) for record in records], + } + cache_path(cache_dir, domain).write_text( + json.dumps(payload, indent=2, sort_keys=True), + encoding="utf-8", + ) + + +def connect() -> psycopg.Connection: + return psycopg.connect( + host="crt.sh", + port=5432, + dbname="certwatch", + user="guest", + password="guest", + connect_timeout=5, + sslmode="disable", + autocommit=True, + application_name="ct_transparency_search", + ) + + +def query_domain(domain: str, max_candidates: int, attempts: int, verbose: bool) -> list[DatabaseRecord]: + params = { + "domain": domain, + "name_pattern": f"%{escape_like(domain)}%", + "max_candidates": max_candidates, + } + raw_match_count = query_raw_match_count(domain=domain, attempts=attempts, verbose=verbose) + if raw_match_count > max_candidates: + raise ValueError( + f"domain={domain} raw identity matches={raw_match_count} exceed max_candidates={max_candidates}; " + f"increase --max-candidates-per-domain to at least {raw_match_count} for a complete result set" + ) + last_error: Exception | None = None + for attempt in range(1, attempts + 1): + try: + with connect() as conn, conn.cursor(row_factory=dict_row) as cur: + cur.execute(QUERY_SQL, params) + rows = cur.fetchall() + return [row_to_record(domain, row) for row in rows] + except Exception as exc: + last_error = exc + if attempt == attempts: + break + if verbose: + print( + f"[warn] domain={domain} attempt={attempt}/{attempts} failed: {exc}", + file=sys.stderr, + ) + time.sleep(min(2 ** attempt, 10)) + assert last_error is not None + raise last_error + + +def query_raw_match_count(domain: str, attempts: int, verbose: bool) -> int: + params = { + "domain": domain, + "name_pattern": f"%{escape_like(domain)}%", + } + last_error: Exception | None = None + for attempt in range(1, attempts + 1): + try: + with connect() as conn, conn.cursor() as cur: + cur.execute(RAW_MATCH_COUNT_SQL, params) + row = cur.fetchone() + return int(row[0]) + except Exception as exc: + last_error = exc + if attempt == attempts: + break + if verbose: + print( + f"[warn] domain={domain} raw-count attempt={attempt}/{attempts} failed: {exc}", + file=sys.stderr, + ) + time.sleep(min(2 ** attempt, 10)) + assert last_error is not None + raise last_error + + +def row_to_record(domain: str, row: dict[str, Any]) -> DatabaseRecord: + return DatabaseRecord( + domain=domain, + certificate_id=int(row["id"]), + issuer_ca_id=int(row["issuer_ca_id"]), + issuer_name=row["issuer_name"], + common_name=row["common_name"], + subject_dn=row["subject_dn"], + not_before=row["not_before"], + not_after=row["not_after"], + first_seen=row["first_seen"], + serial_number=row["serial_number"], + revoked_count=int(row["revoked_count"]), + revocation_date=row["revocation_date"], + reason_code=row["reason_code"], + last_seen_check_date=row["last_seen_check_date"], + active_crl_count=int(row["active_crl_count"] or 0), + crl_last_checked=row["crl_last_checked"], + certificate_der=bytes(row["certificate"]), + ) + + +def extract_san_entries(cert: x509.Certificate) -> list[str]: + try: + extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) + except x509.ExtensionNotFound: + return [] + entries: list[str] = [] + for name in extension.value: + entries.append(format_general_name(name)) + return sorted(set(entries), key=str.casefold) + + +def format_general_name(name: general_name.GeneralName) -> str: + if isinstance(name, x509.DNSName): + return f"DNS:{name.value}" + if isinstance(name, x509.RFC822Name): + return f"EMAIL:{name.value}" + if isinstance(name, x509.UniformResourceIdentifier): + return f"URI:{name.value}" + if isinstance(name, x509.IPAddress): + return f"IP:{name.value}" + if isinstance(name, x509.RegisteredID): + return f"RID:{name.value.dotted_string}" + if isinstance(name, x509.DirectoryName): + return f"DIR:{name.value.rfc4514_string()}" + if isinstance(name, x509.OtherName): + encoded = base64.b64encode(name.value).decode("ascii") + return f"OTHER:{name.type_id.dotted_string}:{encoded}" + return str(name) + + +def extract_common_name(cert: x509.Certificate) -> str | None: + attributes = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + if not attributes: + return None + return attributes[0].value + + +def has_precertificate_poison(cert: x509.Certificate) -> bool: + try: + cert.extensions.get_extension_for_oid(PRECERT_POISON_OID) + except x509.ExtensionNotFound: + return False + return True + + +def is_leaf_certificate(cert: x509.Certificate) -> tuple[bool, str]: + if has_precertificate_poison(cert): + return (False, "precertificate_poison") + try: + basic_constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value + if basic_constraints.ca: + return (False, "basic_constraints_ca") + except x509.ExtensionNotFound: + pass + try: + key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value + if key_usage.key_cert_sign: + return (False, "key_cert_sign") + except x509.ExtensionNotFound: + pass + return (True, "leaf") + + +def revocation_fields(record: DatabaseRecord) -> tuple[str, datetime | None, str | None, datetime | None, str | None]: + if record.revoked_count > 0: + reason: str | None = None + if record.reason_code in REVOCATION_REASONS: + reason = REVOCATION_REASONS[record.reason_code] + elif record.reason_code not in (None, 0): + reason = f"unknown({record.reason_code})" + return ("revoked", record.revocation_date, reason, record.last_seen_check_date, None) + if record.active_crl_count > 0: + return ("not_revoked", None, None, record.crl_last_checked, None) + return ("unknown", None, None, record.crl_last_checked, "no fresh crt.sh CRL data") + + +def revocation_priority(status: str) -> int: + return { + "unknown": 0, + "not_revoked": 1, + "revoked": 2, + }[status] + + +def build_hits(records: list[DatabaseRecord]) -> tuple[list[CertificateHit], VerificationStats]: + verification = VerificationStats(input_rows=len(records)) + hits: dict[str, CertificateHit] = {} + for record in records: + cert = x509.load_der_x509_certificate(record.certificate_der) + is_leaf, reason = is_leaf_certificate(cert) + if not is_leaf: + if reason == "precertificate_poison": + verification.precertificate_poison_filtered += 1 + else: + verification.non_leaf_filtered += 1 + continue + fingerprint_hex = hashlib.sha256(record.certificate_der).hexdigest() + subject_cn = record.common_name or extract_common_name(cert) or "-" + revocation_status, revocation_date, revocation_reason, crtsh_crl_timestamp, revocation_note = revocation_fields(record) + hit = hits.get(fingerprint_hex) + if hit is None: + hit = CertificateHit( + fingerprint_sha256=fingerprint_hex, + subject_cn=subject_cn, + validity_not_before=record.not_before, + validity_not_after=record.not_after, + san_entries=extract_san_entries(cert), + revocation_status=revocation_status, + revocation_date=revocation_date, + revocation_reason=revocation_reason, + revocation_note=revocation_note, + crtsh_crl_timestamp=crtsh_crl_timestamp, + matched_domains={record.domain}, + first_seen=record.first_seen, + crtsh_certificate_ids={record.certificate_id}, + serial_numbers={record.serial_number}, + issuer_names={record.issuer_name}, + issuer_ca_ids={record.issuer_ca_id}, + ) + hits[fingerprint_hex] = hit + continue + hit.matched_domains.add(record.domain) + hit.crtsh_certificate_ids.add(record.certificate_id) + hit.serial_numbers.add(record.serial_number) + hit.issuer_names.add(record.issuer_name) + hit.issuer_ca_ids.add(record.issuer_ca_id) + if hit.first_seen is None or (record.first_seen is not None and record.first_seen < hit.first_seen): + hit.first_seen = record.first_seen + if revocation_priority(revocation_status) > revocation_priority(hit.revocation_status): + hit.revocation_status = revocation_status + hit.revocation_date = revocation_date + hit.revocation_reason = revocation_reason + hit.revocation_note = revocation_note + hit.crtsh_crl_timestamp = crtsh_crl_timestamp + elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is not None and crtsh_crl_timestamp is not None: + if crtsh_crl_timestamp > hit.crtsh_crl_timestamp: + hit.crtsh_crl_timestamp = crtsh_crl_timestamp + elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is None: + hit.crtsh_crl_timestamp = crtsh_crl_timestamp + ordered_hits = sorted( + hits.values(), + key=lambda hit: ( + sorted(hit.matched_domains), + hit.subject_cn.casefold(), + hit.validity_not_before, + hit.fingerprint_sha256, + ), + ) + verification.unique_leaf_certificates = len(ordered_hits) + return (ordered_hits, verification) + + +def canonicalize_subject_cn(subject_cn: str) -> str: + subject_cn = subject_cn.lower() + if subject_cn.startswith("www."): + return subject_cn[4:] + return subject_cn + + +def normalize_counter_pattern(hostname: str) -> str | None: + normalized = re.sub(r"\d+", "#", canonicalize_subject_cn(hostname)) + if normalized == canonicalize_subject_cn(hostname): + return None + return normalized + + +class UnionFind: + def __init__(self, size: int) -> None: + self.parent = list(range(size)) + self.rank = [0] * size + + def find(self, value: int) -> int: + while self.parent[value] != value: + self.parent[value] = self.parent[self.parent[value]] + value = self.parent[value] + return value + + def union(self, left: int, right: int) -> None: + left_root = self.find(left) + right_root = self.find(right) + if left_root == right_root: + return + if self.rank[left_root] < self.rank[right_root]: + left_root, right_root = right_root, left_root + self.parent[right_root] = left_root + if self.rank[left_root] == self.rank[right_root]: + self.rank[left_root] += 1 + + +def build_groups(hits: list[CertificateHit]) -> list[CertificateGroup]: + if not hits: + return [] + canonical_cns_by_pattern: dict[str, set[str]] = defaultdict(set) + for hit in hits: + pattern = normalize_counter_pattern(hit.subject_cn) + if pattern is not None: + canonical_cns_by_pattern[pattern].add(canonicalize_subject_cn(hit.subject_cn)) + + qualifying_patterns = { + pattern + for pattern, canonical_cns in canonical_cns_by_pattern.items() + if len(canonical_cns) > 1 + } + components: dict[tuple[str, str], list[int]] = defaultdict(list) + for index, hit in enumerate(hits): + canonical_cn = canonicalize_subject_cn(hit.subject_cn) + pattern = normalize_counter_pattern(hit.subject_cn) + if pattern in qualifying_patterns: + components[("pattern", pattern)].append(index) + else: + components[("exact", canonical_cn)].append(index) + + provisional_groups: list[CertificateGroup] = [] + for (family_kind, family_key), member_indices in components.items(): + member_hits = [hits[index] for index in member_indices] + subject_cns = {hit.subject_cn for hit in member_hits} + unique_san_profiles = {tuple(hit.san_entries) for hit in member_hits} + numbered_patterns = {family_key} if family_kind == "pattern" else set() + group_type = "numbered_cn_pattern" if family_kind == "pattern" else "exact_endpoint_family" + first_seen_values = [hit.first_seen for hit in member_hits if hit.first_seen is not None] + provisional_groups.append( + CertificateGroup( + group_id="", + group_type=group_type, + member_indices=sorted(member_indices), + member_count=len(member_indices), + distinct_subject_cn_count=len(subject_cns), + distinct_exact_content_count=len(unique_san_profiles), + numbered_cn_patterns=numbered_patterns, + matched_domains={domain for hit in member_hits for domain in hit.matched_domains}, + subject_cns=subject_cns, + first_seen_min=min(first_seen_values) if first_seen_values else None, + first_seen_max=max(first_seen_values) if first_seen_values else None, + valid_from_min=min(hit.validity_not_before for hit in member_hits), + valid_to_max=max(hit.validity_not_after for hit in member_hits), + revocation_counts=Counter(hit.revocation_status for hit in member_hits), + ) + ) + + provisional_groups.sort( + key=lambda group: ( + -group.member_count, + group.group_type, + min(canonicalize_subject_cn(value) for value in group.subject_cns), + ) + ) + for position, group in enumerate(provisional_groups, start=1): + group.group_id = f"G{position:04d}" + return provisional_groups + + +def describe_group_basis(group: CertificateGroup) -> str: + if group.group_type == "numbered_cn_pattern": + pattern = next(iter(group.numbered_cn_patterns)) + return f"CN pattern with running-number slot: `{pattern}`" + base = min(canonicalize_subject_cn(value) for value in group.subject_cns) + return f"Same endpoint CN family (exact CN, with `www.` folded): `{base}`" + + +def primary_issuer_name(hit: CertificateHit) -> str: + return sorted(hit.issuer_names)[0] + + +def query_issuer_trust(hits: list[CertificateHit]) -> dict[str, IssuerTrustInfo]: + issuer_name_to_ca_ids: dict[str, set[int]] = defaultdict(set) + for hit in hits: + issuer_name_to_ca_ids[primary_issuer_name(hit)].update(hit.issuer_ca_ids) + all_ca_ids = sorted({ca_id for ca_ids in issuer_name_to_ca_ids.values() for ca_id in ca_ids}) + contexts_by_ca_id: dict[int, set[str]] = defaultdict(set) + if all_ca_ids: + query = """ + SELECT ctp.ca_id, tc.ctx + FROM ca_trust_purpose ctp + JOIN trust_context tc ON tc.id = ctp.trust_context_id + JOIN trust_purpose tp ON tp.id = ctp.trust_purpose_id + WHERE ctp.ca_id = ANY(%s) + AND tp.purpose = 'Server Authentication' + AND ctp.is_time_valid = TRUE + AND ctp.disabled_from IS NULL + """ + with connect() as conn, conn.cursor() as cur: + cur.execute(query, (all_ca_ids,)) + for ca_id, trust_context in cur.fetchall(): + contexts_by_ca_id[int(ca_id)].add(str(trust_context)) + major_contexts = {"Mozilla", "Chrome", "Apple", "Microsoft", "Android"} + results: dict[str, IssuerTrustInfo] = {} + for issuer_name, ca_ids in issuer_name_to_ca_ids.items(): + merged_contexts = {ctx for ca_id in ca_ids for ctx in contexts_by_ca_id.get(ca_id, set())} + results[issuer_name] = IssuerTrustInfo( + issuer_name=issuer_name, + issuer_ca_ids=set(ca_ids), + server_auth_contexts=merged_contexts, + major_webpki=major_contexts.issubset(merged_contexts), + ) + return results + + +def status_marker(status: str) -> str: + return { + "not_revoked": "OK ", + "revoked": "REV", + "unknown": "UNK", + }[status] + + +def one_line_revocation(hit: CertificateHit) -> str: + if hit.revocation_status == "revoked": + detail = f"revoked {utc_iso(hit.revocation_date)}" if hit.revocation_date else "revoked" + if hit.revocation_reason: + detail += f", reason={hit.revocation_reason}" + return detail + if hit.revocation_status == "unknown": + if hit.revocation_note: + return f"unknown, {hit.revocation_note}" + return "unknown" + return "not revoked" + + +def san_tail_split(domain: str) -> tuple[list[str], str]: + labels = domain.split(".") + common_second_level = {"ac", "co", "com", "edu", "gov", "net", "org"} + suffix_len = 2 + if len(labels) >= 3 and len(labels[-1]) == 2 and labels[-2] in common_second_level: + suffix_len = 3 + if len(labels) <= suffix_len: + return ([], domain) + return (labels[:-suffix_len], ".".join(labels[-suffix_len:])) + + +def build_san_tree_lines(san_entries: list[str]) -> list[str]: + return build_san_tree_lines_with_style(san_entries, ascii_only=False) + + +def build_san_tree_lines_with_style(san_entries: list[str], ascii_only: bool) -> list[str]: + dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}) + other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}) + tree: dict[str, Any] = {} + for domain in dns_entries: + prefix_labels, tail = san_tail_split(domain) + cursor = tree + for label in prefix_labels: + cursor = cursor.setdefault(label, {}) + cursor.setdefault(tail, {}) + + def render(node: dict[str, Any], prefix: str = "") -> list[str]: + lines: list[str] = [] + keys = sorted(node.keys(), key=str.casefold) + for index, key in enumerate(keys): + is_last = index == len(keys) - 1 + if ascii_only: + connector = "`- " if is_last else "|- " + else: + connector = "└─ " if is_last else "├─ " + lines.append(prefix + connector + key) + child = node[key] + if ascii_only: + child_prefix = prefix + (" " if is_last else "| ") + else: + child_prefix = prefix + (" " if is_last else "│ ") + lines.extend(render(child, child_prefix)) + return lines + + lines = render(tree) + for entry in other_entries: + lines.append(f"{'*' if ascii_only else '•'} {entry}") + if not lines: + lines.append(f"{'*' if ascii_only else '•'} -") + return lines + + +def group_hits_by_issuer(hits: list[CertificateHit]) -> tuple[dict[str, list[CertificateHit]], list[str]]: + issuer_hits: dict[str, list[CertificateHit]] = defaultdict(list) + for hit in hits: + issuer_hits[primary_issuer_name(hit)].append(hit) + ordered_issuers = sorted( + issuer_hits, + key=lambda issuer_name: (-len(issuer_hits[issuer_name]), issuer_name.casefold()), + ) + return issuer_hits, ordered_issuers + + +def latex_escape(value: str) -> str: + replacements = { + "\\": r"\textbackslash{}", + "&": r"\&", + "%": r"\%", + "$": r"\$", + "#": r"\#", + "_": r"\_", + "{": r"\{", + "}": r"\}", + "~": r"\textasciitilde{}", + "^": r"\textasciicircum{}", + } + return "".join(replacements.get(char, char) for char in value) + + +def summarize_san_patterns(san_entries: list[str]) -> dict[str, Any]: + dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}, key=str.casefold) + other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}, key=str.casefold) + zone_counts: Counter[str] = Counter() + normalized_pattern_counts: Counter[str] = Counter() + wildcard_count = 0 + numbered_count = 0 + for domain in dns_entries: + normalized_domain = domain[2:] if domain.startswith("*.") else domain + if domain.startswith("*."): + wildcard_count += 1 + if re.search(r"\d", normalized_domain): + numbered_count += 1 + prefix_labels, tail = san_tail_split(normalized_domain) + zone_counts[tail] += 1 + normalized_prefix = ".".join(re.sub(r"\d+", "#", label) for label in prefix_labels if label) + if normalized_prefix: + normalized_pattern_counts[f"{normalized_prefix}.{tail}"] += 1 + else: + normalized_pattern_counts[tail] += 1 + repeating_patterns = [ + (pattern, count) + for pattern, count in normalized_pattern_counts.most_common(6) + if count > 1 + ] + return { + "dns_count": len(dns_entries), + "other_count": len(other_entries), + "wildcard_count": wildcard_count, + "numbered_count": numbered_count, + "zone_count": len(zone_counts), + "top_zones": zone_counts.most_common(6), + "repeating_patterns": repeating_patterns, + } + + +def latex_status_badge(status: str) -> str: + return { + "not_revoked": r"\StatusOK{}", + "revoked": r"\StatusREV{}", + "unknown": r"\StatusUNK{}", + }[status] + + +def latex_webpki_badge(value: bool) -> str: + return r"\WebPKIYes{}" if value else r"\WebPKINo{}" + + +def render_markdown_report( + path: Path, + hits: list[CertificateHit], + groups: list[CertificateGroup], + stats: ScanStats, + issuer_trust: dict[str, IssuerTrustInfo], +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + issuer_hits, ordered_issuers = group_hits_by_issuer(hits) + lines: list[str] = [] + lines.append("# Certificate CN Family Report") + lines.append("") + lines.append(f"Generated: {stats.generated_at_utc}") + lines.append(f"Configured domains: {', '.join(stats.configured_domains)}") + lines.append("") + lines.append("## What This File Contains") + lines.append("") + lines.append("- Chapters are built from Subject CN construction only.") + lines.append("- If multiple concrete CNs share the same numbered schema, they are grouped together.") + lines.append("- Otherwise the chapter is one endpoint family, with `www.` folded into the same base endpoint.") + lines.append("- SAN entries are shown only inside each Subject CN subsection.") + lines.append("- All certificates shown here are verified leaf certificates.") + lines.append("") + lines.append("## Issuer Overview") + lines.append("") + for issuer_name in ordered_issuers: + trust = issuer_trust[issuer_name] + ca_ids = ", ".join(str(value) for value in sorted(trust.issuer_ca_ids)) + trust_label = "YES" if trust.major_webpki else "NO" + lines.append( + f"- {issuer_name} | certificates={len(issuer_hits[issuer_name])} | WebPKI server-auth in major stores={trust_label} | ca_id={ca_ids}" + ) + lines.append("") + lines.append("## Leaf-Certificate Assurance") + lines.append("") + lines.append("- SQL filter: `certificate_lifecycle.certificate_type = 'Certificate'`") + lines.append("- Local filter: precertificate poison absent, `BasicConstraints.ca != true`, `KeyUsage.keyCertSign != true`") + lines.append(f"- Verified leaf certificates kept: {stats.unique_leaf_certificates}") + lines.append(f"- Non-leaf filtered after download: {stats.verification.non_leaf_filtered}") + lines.append(f"- Precertificate poison filtered after download: {stats.verification.precertificate_poison_filtered}") + lines.append("") + for issuer_position, issuer_name in enumerate(ordered_issuers, start=1): + trust = issuer_trust[issuer_name] + issuer_title = f"Issuer {issuer_position:02d} {issuer_name}" + lines.append(f"## {issuer_title}") + lines.append("") + lines.append(f"- Certificates under issuer: {len(issuer_hits[issuer_name])}") + lines.append( + f"- WebPKI server-auth in major stores (Mozilla, Chrome, Apple, Microsoft, Android): {'YES' if trust.major_webpki else 'NO'}" + ) + lines.append( + f"- Server-auth trust contexts seen in crt.sh live trust data: {', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none'}" + ) + lines.append(f"- Issuer CA IDs: {', '.join(str(value) for value in sorted(trust.issuer_ca_ids))}") + lines.append("") + issuer_groups = build_groups(issuer_hits[issuer_name]) + for family_index, group in enumerate(issuer_groups, start=1): + member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices] + chapter_title = f"Family {family_index:02d} {describe_group_basis(group)}" + lines.append(f"### {chapter_title}") + lines.append("") + lines.append(f"- Certificates in chapter: {group.member_count}") + lines.append(f"- Concrete Subject CNs: {group.distinct_subject_cn_count}") + lines.append(f"- Distinct SAN profiles in chapter: {group.distinct_exact_content_count}") + lines.append(f"- Matched domains: {', '.join(sorted(group.matched_domains))}") + lines.append(f"- Family validity span: {utc_iso(group.valid_from_min)} -> {utc_iso(group.valid_to_max)}") + if group.first_seen_min and group.first_seen_max: + lines.append(f"- First seen span: {utc_iso(group.first_seen_min)} -> {utc_iso(group.first_seen_max)}") + lines.append(f"- Revocation mix: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown") + lines.append("") + + hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list) + for hit in member_hits: + hits_by_subject[hit.subject_cn].append(hit) + + ordered_subjects = sorted( + hits_by_subject.keys(), + key=lambda value: (canonicalize_subject_cn(value), value.casefold()), + ) + for subject_cn in ordered_subjects: + subject_hits = sorted( + hits_by_subject[subject_cn], + key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256), + ) + lines.append(f"#### Subject CN: `{subject_cn}`") + lines.append("") + lines.append(f"- Certificates under this CN: {len(subject_hits)}") + lines.append(f"- Validity span under this CN: {utc_iso(min(hit.validity_not_before for hit in subject_hits))} -> {utc_iso(max(hit.validity_not_after for hit in subject_hits))}") + san_profiles: dict[tuple[str, ...], list[CertificateHit]] = defaultdict(list) + for hit in subject_hits: + san_profiles[tuple(hit.san_entries)].append(hit) + profile_size_counts = Counter(len(profile) for profile in san_profiles) + unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries}) + lines.append(f"- Distinct SAN profiles under this CN: {len(san_profiles)}") + lines.append( + "- SAN profile sizes seen: " + + ", ".join( + f"{size} SAN x {count}" + for size, count in sorted(profile_size_counts.items()) + ) + ) + lines.append("") + lines.append("Validity history") + lines.append("") + + for hit in subject_hits: + crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids)) + lines.append( + f"- [{status_marker(hit.revocation_status)}] {utc_iso(hit.validity_not_before)} -> {utc_iso(hit.validity_not_after)} | SANs={len(hit.san_entries)} | crt.sh={crtsh_ids} | {one_line_revocation(hit)}" + ) + lines.append("") + lines.append("SAN structure") + lines.append("") + lines.append("```text") + for tree_line in build_san_tree_lines(unique_san_entries): + lines.append(tree_line) + lines.append("```") + lines.append("") + + lines.append("---") + lines.append("") + + lines.append("## Statistics") + lines.append("") + lines.append(f"- Unique leaf certificates: {stats.unique_leaf_certificates}") + lines.append(f"- CN-family chapters: {stats.groups_total}") + lines.append(f"- Chapters with more than one certificate: {stats.groups_multi_member}") + lines.append(f"- Single-certificate chapters: {stats.groups_singleton}") + lines.append(f"- Numbered CN pattern chapters: {stats.groups_by_type.get('numbered_cn_pattern', 0)}") + lines.append(f"- Exact endpoint chapters: {stats.groups_by_type.get('exact_endpoint_family', 0)}") + lines.append("") + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def render_latex_report( + path: Path, + hits: list[CertificateHit], + groups: list[CertificateGroup], + stats: ScanStats, + issuer_trust: dict[str, IssuerTrustInfo], +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + issuer_hits, ordered_issuers = group_hits_by_issuer(hits) + revoked_total = sum(1 for hit in hits if hit.revocation_status == "revoked") + unknown_total = sum(1 for hit in hits if hit.revocation_status == "unknown") + not_revoked_total = sum(1 for hit in hits if hit.revocation_status == "not_revoked") + + lines: list[str] = [ + r"\documentclass[11pt]{article}", + r"\usepackage[a4paper,margin=18mm]{geometry}", + r"\usepackage{fontspec}", + r"\usepackage[table]{xcolor}", + r"\usepackage{microtype}", + r"\usepackage{hyperref}", + r"\usepackage{xurl}", + r"\usepackage{array}", + r"\usepackage{booktabs}", + r"\usepackage{tabularx}", + r"\usepackage{longtable}", + r"\usepackage{enumitem}", + r"\usepackage{fancyhdr}", + r"\usepackage{titlesec}", + r"\usepackage[most]{tcolorbox}", + r"\usepackage{fancyvrb}", + r"\usepackage{needspace}", + r"\defaultfontfeatures{Ligatures=TeX,Scale=MatchLowercase}", + r"\setmainfont{Palatino}", + r"\setsansfont{Avenir Next}", + r"\setmonofont{Menlo}", + r"\definecolor{Ink}{HTML}{17202A}", + r"\definecolor{Muted}{HTML}{667085}", + r"\definecolor{Line}{HTML}{D0D5DD}", + r"\definecolor{Panel}{HTML}{F8FAFC}", + r"\definecolor{Accent}{HTML}{0F766E}", + r"\definecolor{AccentSoft}{HTML}{E6F4F1}", + r"\definecolor{AccentLine}{HTML}{74C4B8}", + r"\definecolor{Warn}{HTML}{9A6700}", + r"\definecolor{WarnSoft}{HTML}{FFF4DB}", + r"\definecolor{Danger}{HTML}{B42318}", + r"\definecolor{DangerSoft}{HTML}{FEE4E2}", + r"\definecolor{OkText}{HTML}{065F46}", + r"\definecolor{OkSoft}{HTML}{DCFCE7}", + r"\definecolor{UnknownText}{HTML}{9A6700}", + r"\definecolor{UnknownSoft}{HTML}{FEF3C7}", + r"\hypersetup{colorlinks=true,linkcolor=Accent,urlcolor=Accent,pdfauthor={CertTransparencySearch},pdftitle={Certificate Transparency Endpoint Atlas}}", + r"\setlength{\parindent}{0pt}", + r"\setlength{\parskip}{6pt}", + r"\setlength{\emergencystretch}{3em}", + r"\setcounter{tocdepth}{2}", + r"\pagestyle{fancy}", + r"\fancyhf{}", + r"\fancyhead[L]{\sffamily\footnotesize Certificate Transparency Endpoint Atlas}", + r"\fancyhead[R]{\sffamily\footnotesize \nouppercase{\leftmark}}", + r"\fancyfoot[C]{\sffamily\footnotesize \thepage}", + r"\titleformat{\section}{\sffamily\bfseries\LARGE\color{Ink}}{\thesection}{0.8em}{}", + r"\titleformat{\subsection}{\sffamily\bfseries\Large\color{Ink}}{\thesubsection}{0.8em}{}", + r"\titleformat{\subsubsection}{\sffamily\bfseries\normalsize\color{Ink}}{\thesubsubsection}{0.8em}{}", + r"\tcbset{", + r" panel/.style={enhanced,breakable,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=white,colframe=Line},", + r" hero/.style={panel,colback=Ink,colframe=Ink,left=14pt,right=14pt,top=14pt,bottom=14pt},", + r" summary/.style={panel,colback=Panel,colframe=Line},", + r" issuerpanel/.style={panel,colback=Panel,colframe=Ink!45},", + r" familypanel/.style={panel,colback=AccentSoft,colframe=AccentLine},", + r" subjectpanel/.style={panel,colback=white,colframe=Line},", + r" treepanel/.style={panel,colback=Panel,colframe=AccentLine},", + r"}", + r"\newcommand{\DomainChip}[1]{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=AccentSoft]{\sffamily\footnotesize\texttt{#1}}}", + r"\newcommand{\MetricChip}[2]{\tcbox[on line,boxrule=0pt,arc=3pt,left=6pt,right=6pt,top=3pt,bottom=3pt,colback=Panel]{\sffamily\footnotesize\textcolor{Muted}{#1}\hspace{0.45em}\textbf{#2}}}", + r"\newcommand{\StatusOK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{OK}}}", + r"\newcommand{\StatusREV}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{REV}}}", + r"\newcommand{\StatusUNK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=UnknownSoft]{\sffamily\bfseries\footnotesize\textcolor{UnknownText}{UNK}}}", + r"\newcommand{\WebPKIYes}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{WebPKI: YES}}}", + r"\newcommand{\WebPKINo}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{WebPKI: NO}}}", + r"\begin{document}", + r"\begin{titlepage}", + r"\thispagestyle{empty}", + r"\vspace*{20mm}", + r"\begin{tcolorbox}[hero]", + r"{\color{white}\sffamily\bfseries\fontsize{24}{28}\selectfont Certificate Transparency Endpoint Atlas\par}", + r"\vspace{4pt}", + r"{\color{white}\Large Currently valid leaf certificates matching the configured domains\par}", + r"\vspace{12pt}", + r"{\color{white}\sffamily\small This artefact is optimized for review: issuer-first navigation, CN-family grouping, certificate timelines, and SAN structure blocks designed to be read rather than decoded.}", + r"\end{tcolorbox}", + r"\vspace{10mm}", + r"\begin{tcolorbox}[summary]", + rf"\textbf{{Generated}}: {latex_escape(stats.generated_at_utc)}\par", + r"\textbf{Configured domains}: " + " ".join( + rf"\DomainChip{{{latex_escape(domain)}}}" for domain in stats.configured_domains + ), + r"\par\medskip", + r"\MetricChip{Leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " " + + r"\MetricChip{CN families}{" + str(stats.groups_total) + r"}" + " " + + r"\MetricChip{Numbered families}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " " + + r"\MetricChip{Exact families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}", + r"\par\medskip", + r"\MetricChip{Not revoked}{" + str(not_revoked_total) + r"}" + " " + + r"\MetricChip{Revoked}{" + str(revoked_total) + r"}" + " " + + r"\MetricChip{Unknown}{" + str(unknown_total) + r"}", + r"\end{tcolorbox}", + r"\vfill", + r"{\sffamily\small\textcolor{Muted}{Same scan, three outputs: Markdown for editor preview, LaTeX for source control, PDF for distribution.}}", + r"\end{titlepage}", + r"\tableofcontents", + r"\clearpage", + r"\section*{Executive Summary}", + r"\addcontentsline{toc}{section}{Executive Summary}", + r"\begin{tcolorbox}[summary]", + r"\textbf{Reading guide}\par", + r"Major chapters are exact issuer names. Inside each issuer, families are derived only from the construction of the Subject CN. Each concrete Subject CN then gets its own certificate timeline and a SAN structure panel.\par", + r"\medskip", + r"\textbf{Leaf-only assurance}\par", + r"SQL excludes entries whose lifecycle type is not \texttt{Certificate}. Local parsing then rejects any artifact with precertificate poison, \texttt{BasicConstraints.ca = true}, or \texttt{KeyUsage.keyCertSign = true}.", + r"\end{tcolorbox}", + r"\begin{tcolorbox}[summary]", + r"\textbf{Issuer landscape}\par", + r"\medskip", + r"\begin{tabularx}{\linewidth}{>{\raggedright\arraybackslash}X >{\raggedleft\arraybackslash}p{1.7cm} >{\raggedleft\arraybackslash}p{1.9cm} >{\raggedleft\arraybackslash}p{2.0cm}}", + r"\toprule", + r"Issuer & Certificates & Share & WebPKI \\", + r"\midrule", + ] + + total_hits = len(hits) if hits else 1 + for issuer_name in ordered_issuers: + issuer_count = len(issuer_hits[issuer_name]) + share = f"{issuer_count / total_hits:.1%}" + lines.append( + rf"{latex_escape(issuer_name)} & {issuer_count} & {latex_escape(share)} & {latex_webpki_badge(issuer_trust[issuer_name].major_webpki)} \\" + ) + lines.extend( + [ + r"\bottomrule", + r"\end{tabularx}", + r"\end{tcolorbox}", + ] + ) + + for issuer_position, issuer_name in enumerate(ordered_issuers, start=1): + trust = issuer_trust[issuer_name] + issuer_groups = build_groups(issuer_hits[issuer_name]) + lines.extend( + [ + r"\clearpage", + rf"\section{{Issuer {issuer_position:02d}: {latex_escape(issuer_name)}}}", + r"\begin{tcolorbox}[issuerpanel]", + r"\MetricChip{Certificates}{" + str(len(issuer_hits[issuer_name])) + r"}" + " " + + r"\MetricChip{Families}{" + str(len(issuer_groups)) + r"}" + " " + + latex_webpki_badge(trust.major_webpki), + r"\par\medskip", + rf"\textbf{{Trust contexts seen in crt.sh live data}}: {latex_escape(', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none')}\par", + rf"\textbf{{Issuer CA IDs}}: {latex_escape(', '.join(str(value) for value in sorted(trust.issuer_ca_ids)))}", + r"\end{tcolorbox}", + ] + ) + for family_index, group in enumerate(issuer_groups, start=1): + member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices] + lines.extend( + [ + r"\Needspace{14\baselineskip}", + rf"\subsection{{Family {family_index:02d}: {latex_escape(describe_group_basis(group).replace('`', ''))}}}", + r"\begin{tcolorbox}[familypanel]", + r"\MetricChip{Certificates}{" + str(group.member_count) + r"}" + " " + + r"\MetricChip{Concrete CNs}{" + str(group.distinct_subject_cn_count) + r"}" + " " + + r"\MetricChip{Distinct SAN profiles}{" + str(group.distinct_exact_content_count) + r"}", + r"\par\medskip", + rf"\textbf{{Matched domains}}: {' '.join(rf'\DomainChip{{{latex_escape(domain)}}}' for domain in sorted(group.matched_domains))}\par", + rf"\textbf{{Family validity span}}: \texttt{{{latex_escape(utc_iso(group.valid_from_min))}}} to \texttt{{{latex_escape(utc_iso(group.valid_to_max))}}}\par", + ( + rf"\textbf{{First seen span}}: \texttt{{{latex_escape(utc_iso(group.first_seen_min))}}} to \texttt{{{latex_escape(utc_iso(group.first_seen_max))}}}\par" + if group.first_seen_min and group.first_seen_max + else "" + ), + rf"\textbf{{Revocation mix}}: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown", + r"\end{tcolorbox}", + ] + ) + + hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list) + for hit in member_hits: + hits_by_subject[hit.subject_cn].append(hit) + ordered_subjects = sorted( + hits_by_subject.keys(), + key=lambda value: (canonicalize_subject_cn(value), value.casefold()), + ) + for subject_cn in ordered_subjects: + subject_hits = sorted( + hits_by_subject[subject_cn], + key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256), + ) + san_summary = summarize_san_patterns(sorted({entry for hit in subject_hits for entry in hit.san_entries})) + unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries}) + lines.extend( + [ + r"\Needspace{18\baselineskip}", + rf"\subsubsection{{Subject CN: {latex_escape(subject_cn)}}}", + r"\begin{tcolorbox}[subjectpanel]", + r"\MetricChip{Certificates under this CN}{" + str(len(subject_hits)) + r"}" + " " + + r"\MetricChip{Distinct SAN profiles}{" + str(len({tuple(hit.san_entries) for hit in subject_hits})) + r"}" + " " + + r"\MetricChip{Unique SAN entries}{" + str(len(unique_san_entries)) + r"}", + r"\par\medskip", + rf"\textbf{{Validity span under this CN}}: \texttt{{{latex_escape(utc_iso(min(hit.validity_not_before for hit in subject_hits)))}}} to \texttt{{{latex_escape(utc_iso(max(hit.validity_not_after for hit in subject_hits)))}}}", + r"\par\medskip", + r"\textbf{Certificate timeline}", + r"\begin{itemize}[leftmargin=1.4em,itemsep=0.55em,topsep=0.4em]", + ] + ) + for hit in subject_hits: + crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids)) + lines.extend( + [ + r"\item " + + latex_status_badge(hit.revocation_status) + + " " + + rf"\texttt{{{latex_escape(utc_iso(hit.validity_not_before))}}} to \texttt{{{latex_escape(utc_iso(hit.validity_not_after))}}}", + rf"\newline \textcolor{{Muted}}{{SANs: {len(hit.san_entries)} \quad crt.sh: {latex_escape(crtsh_ids)} \quad {latex_escape(one_line_revocation(hit))}}}", + ] + ) + lines.extend( + [ + r"\end{itemize}", + r"\medskip", + r"\textbf{SAN pattern snapshot}", + r"\par\medskip", + r"\MetricChip{DNS SANs}{" + str(san_summary["dns_count"]) + r"}" + " " + + r"\MetricChip{Other SANs}{" + str(san_summary["other_count"]) + r"}" + " " + + r"\MetricChip{Wildcard SANs}{" + str(san_summary["wildcard_count"]) + r"}" + " " + + r"\MetricChip{Numbered SANs}{" + str(san_summary["numbered_count"]) + r"}" + " " + + r"\MetricChip{DNS zones}{" + str(san_summary["zone_count"]) + r"}", + r"\par\medskip", + rf"\textbf{{Dominant zones}}: {latex_escape(', '.join(f'{zone} ({count})' for zone, count in san_summary['top_zones']) if san_summary['top_zones'] else 'none')}", + r"\par", + rf"\textbf{{Repeating host schemas}}: {latex_escape(', '.join(f'{pattern} ({count})' for pattern, count in san_summary['repeating_patterns']) if san_summary['repeating_patterns'] else 'mostly one-off SAN hostnames')}", + r"\end{tcolorbox}", + r"\begin{tcolorbox}[treepanel,title={SAN Structure}]", + r"\begin{Verbatim}[fontsize=\footnotesize]", + ] + ) + lines.extend(build_san_tree_lines_with_style(unique_san_entries, ascii_only=True)) + lines.extend( + [ + r"\end{Verbatim}", + r"\end{tcolorbox}", + ] + ) + + lines.extend( + [ + r"\clearpage", + r"\section*{Statistics}", + r"\addcontentsline{toc}{section}{Statistics}", + r"\begin{tcolorbox}[summary]", + r"\MetricChip{Unique leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " " + + r"\MetricChip{CN-family chapters}{" + str(stats.groups_total) + r"}" + " " + + r"\MetricChip{Multi-certificate chapters}{" + str(stats.groups_multi_member) + r"}" + " " + + r"\MetricChip{Singleton chapters}{" + str(stats.groups_singleton) + r"}", + r"\par\medskip", + r"\MetricChip{Numbered CN patterns}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " " + + r"\MetricChip{Exact endpoint families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}" + " " + + r"\MetricChip{Non-leaf filtered}{" + str(stats.verification.non_leaf_filtered) + r"}" + " " + + r"\MetricChip{Precert poison filtered}{" + str(stats.verification.precertificate_poison_filtered) + r"}", + r"\end{tcolorbox}", + r"\end{document}", + ] + ) + path.write_text("\n".join(line for line in lines if line != "") + "\n", encoding="utf-8") + + +def cleanup_latex_auxiliary_files(tex_path: Path, pdf_output: Path) -> None: + generated_base = pdf_output.parent / tex_path.stem + for suffix in (".aux", ".log", ".out", ".toc"): + candidate = generated_base.with_suffix(suffix) + if candidate.exists(): + candidate.unlink() + + +def compile_latex_to_pdf(tex_path: Path, pdf_output: Path, engine: str) -> None: + engine_path = shutil.which(engine) + if engine_path is None: + raise RuntimeError(f"LaTeX engine not found: {engine}") + tex_path = tex_path.resolve() + pdf_output = pdf_output.resolve() + pdf_output.parent.mkdir(parents=True, exist_ok=True) + compile_cmd = [ + engine_path, + "-interaction=nonstopmode", + "-halt-on-error", + "-output-directory", + str(pdf_output.parent), + str(tex_path), + ] + for _ in range(2): + result = subprocess.run( + compile_cmd, + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + message = (result.stdout + "\n" + result.stderr).strip() + raise RuntimeError( + "LaTeX compilation failed.\n" + + "\n".join(message.splitlines()[-40:]) + ) + generated_pdf = pdf_output.parent / f"{tex_path.stem}.pdf" + if generated_pdf != pdf_output: + generated_pdf.replace(pdf_output) + cleanup_latex_auxiliary_files(tex_path, pdf_output) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Search crt.sh for currently valid certificates matching configured domain fragments.", + ) + parser.add_argument( + "--domains-file", + type=Path, + default=Path("domains.local.txt"), + help="Text file containing one domain fragment per line.", + ) + parser.add_argument( + "--output", + type=Path, + default=Path("output/current-valid-certificates.md"), + help="Readable single-file markdown report to write.", + ) + parser.add_argument( + "--latex-output", + type=Path, + default=Path("output/current-valid-certificates.tex"), + help="Readable single-file LaTeX report to write.", + ) + parser.add_argument( + "--pdf-output", + type=Path, + default=Path("output/current-valid-certificates.pdf"), + help="Compiled PDF report to write.", + ) + parser.add_argument( + "--pdf-engine", + default="xelatex", + help="LaTeX engine used to compile the PDF report.", + ) + parser.add_argument( + "--skip-pdf", + action="store_true", + help="Write Markdown and LaTeX outputs but skip PDF compilation.", + ) + parser.add_argument( + "--cache-dir", + type=Path, + default=Path(".cache/ct-search"), + help="Directory for cached per-domain query results.", + ) + parser.add_argument( + "--cache-ttl-seconds", + type=int, + default=900, + help="Reuse cached database results younger than this many seconds.", + ) + parser.add_argument( + "--max-candidates-per-domain", + type=int, + default=10000, + help="Maximum raw crt.sh identity rows to inspect per domain fragment.", + ) + parser.add_argument( + "--retries", + type=int, + default=3, + help="Retry count for replica/recovery conflicts from crt.sh.", + ) + parser.add_argument( + "--quiet", + action="store_true", + help="Suppress progress output.", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + domains = load_domains(args.domains_file) + all_records: list[DatabaseRecord] = [] + for domain in domains: + cached = load_cached_records( + cache_dir=args.cache_dir, + domain=domain, + ttl_seconds=args.cache_ttl_seconds, + max_candidates=args.max_candidates_per_domain, + ) + if cached is not None: + if not args.quiet: + print(f"[cache] domain={domain} records={len(cached)}", file=sys.stderr) + all_records.extend(cached) + continue + if not args.quiet: + print(f"[query] domain={domain}", file=sys.stderr) + records = query_domain( + domain=domain, + max_candidates=args.max_candidates_per_domain, + attempts=args.retries, + verbose=not args.quiet, + ) + if not args.quiet: + print(f"[done] domain={domain} records={len(records)}", file=sys.stderr) + store_cached_records(args.cache_dir, domain, args.max_candidates_per_domain, records) + all_records.extend(records) + hits, verification = build_hits(all_records) + groups = build_groups(hits) + scan_stats = ScanStats( + generated_at_utc=utc_iso(datetime.now(UTC)), + configured_domains=domains, + unique_leaf_certificates=len(hits), + groups_total=len(groups), + groups_multi_member=sum(1 for group in groups if group.member_count > 1), + groups_singleton=sum(1 for group in groups if group.member_count == 1), + groups_by_type=dict(Counter(group.group_type for group in groups)), + verification=verification, + ) + issuer_trust = query_issuer_trust(hits) + render_markdown_report(args.output, hits, groups, scan_stats, issuer_trust) + render_latex_report(args.latex_output, hits, groups, scan_stats, issuer_trust) + if not args.skip_pdf: + compile_latex_to_pdf(args.latex_output, args.pdf_output, args.pdf_engine) + if not args.quiet: + print( + f"[report] hits={len(hits)} groups={len(groups)} markdown={args.output} latex={args.latex_output}" + + ("" if args.skip_pdf else f" pdf={args.pdf_output}"), + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/ct_usage_assessment.py b/ct_usage_assessment.py new file mode 100644 index 0000000..33915ed --- /dev/null +++ b/ct_usage_assessment.py @@ -0,0 +1,453 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import hashlib +import json +from collections import Counter, defaultdict +from dataclasses import asdict, dataclass +from datetime import UTC, datetime +from pathlib import Path + +from cryptography import x509 +from cryptography.x509.oid import ExtensionOID + +import ct_scan + + +SERVER_AUTH_OID = "1.3.6.1.5.5.7.3.1" +CLIENT_AUTH_OID = "1.3.6.1.5.5.7.3.2" +CODE_SIGNING_OID = "1.3.6.1.5.5.7.3.3" +EMAIL_PROTECTION_OID = "1.3.6.1.5.5.7.3.4" +TIME_STAMPING_OID = "1.3.6.1.5.5.7.3.8" +OCSP_SIGNING_OID = "1.3.6.1.5.5.7.3.9" +ANY_EXTENDED_KEY_USAGE_OID = "2.5.29.37.0" + +EKU_LABELS = { + SERVER_AUTH_OID: "serverAuth", + CLIENT_AUTH_OID: "clientAuth", + CODE_SIGNING_OID: "codeSigning", + EMAIL_PROTECTION_OID: "emailProtection", + TIME_STAMPING_OID: "timeStamping", + OCSP_SIGNING_OID: "OCSPSigning", + ANY_EXTENDED_KEY_USAGE_OID: "anyExtendedKeyUsage", +} + + +@dataclass +class PurposeClassification: + fingerprint_sha256: str + subject_cn: str + issuer_name: str + category: str + eku_oids: list[str] + key_usage_flags: list[str] + valid_from_utc: str + valid_to_utc: str + matched_domains: list[str] + san_dns_names: list[str] + + +@dataclass +class AssessmentSummary: + generated_at_utc: str + source_cache_domains: list[str] + unique_leaf_certificates: int + category_counts: dict[str, int] + eku_templates: dict[str, int] + key_usage_templates: dict[str, int] + issuer_breakdown: dict[str, dict[str, int]] + validity_start_years: dict[str, dict[str, int]] + san_type_counts: dict[str, int] + subject_cn_in_dns_san_count: int + subject_cn_not_in_dns_san_count: int + dual_eku_subject_cns_with_server_only_sibling: list[str] + dual_eku_subject_cns_without_server_only_sibling: list[str] + + +def utc_now_iso() -> str: + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Assess certificate intended usage from EKU and KeyUsage." + ) + parser.add_argument( + "--domains-file", + type=Path, + default=Path("domains.local.txt"), + help="Configurable list of search domains, one per line.", + ) + parser.add_argument( + "--cache-dir", + type=Path, + default=Path(".cache/ct-search"), + help="Directory used by ct_scan.py for cached CT results.", + ) + parser.add_argument( + "--cache-ttl-seconds", + type=int, + default=86400, + help="Reuse cached CT results up to this age before refreshing from crt.sh.", + ) + parser.add_argument( + "--max-candidates", + type=int, + default=10000, + help="Maximum raw crt.sh identity rows to inspect per configured domain.", + ) + parser.add_argument( + "--attempts", + type=int, + default=3, + help="Retry attempts for live crt.sh database queries.", + ) + parser.add_argument( + "--markdown-output", + type=Path, + default=Path("output/certificate-purpose-assessment.md"), + help="Human-readable assessment output.", + ) + parser.add_argument( + "--json-output", + type=Path, + default=Path("output/certificate-purpose-assessment.json"), + help="Machine-readable assessment output.", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Print refresh activity to stderr.", + ) + return parser.parse_args() + + +def load_records( + domains: list[str], + cache_dir: Path, + cache_ttl_seconds: int, + max_candidates: int, + attempts: int, + verbose: bool, +) -> list[ct_scan.DatabaseRecord]: + all_records: list[ct_scan.DatabaseRecord] = [] + for domain in domains: + records = ct_scan.load_cached_records(cache_dir, domain, cache_ttl_seconds, max_candidates) + if records is None: + records = ct_scan.query_domain(domain, max_candidates=max_candidates, attempts=attempts, verbose=verbose) + ct_scan.store_cached_records(cache_dir, domain, max_candidates=max_candidates, records=records) + all_records.extend(records) + return all_records + + +def extract_eku_oids(cert: x509.Certificate) -> list[str]: + try: + extension = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE) + except x509.ExtensionNotFound: + return [] + return sorted(oid.dotted_string for oid in extension.value) + + +def extract_key_usage_flags(cert: x509.Certificate) -> list[str]: + try: + key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value + except x509.ExtensionNotFound: + return [] + flags: list[str] = [] + for attribute in ( + "digital_signature", + "content_commitment", + "key_encipherment", + "data_encipherment", + "key_agreement", + "key_cert_sign", + "crl_sign", + ): + if getattr(key_usage, attribute): + flags.append(attribute) + if key_usage.key_agreement: + if key_usage.encipher_only: + flags.append("encipher_only") + if key_usage.decipher_only: + flags.append("decipher_only") + return flags + + +def classify_purpose(eku_oids: list[str]) -> str: + eku_set = set(eku_oids) + has_server = SERVER_AUTH_OID in eku_set or ANY_EXTENDED_KEY_USAGE_OID in eku_set + has_client = CLIENT_AUTH_OID in eku_set or ANY_EXTENDED_KEY_USAGE_OID in eku_set + has_code_signing = CODE_SIGNING_OID in eku_set + has_email = EMAIL_PROTECTION_OID in eku_set + + if not eku_oids: + return "no_eku" + if has_server and not has_client and not has_code_signing and not has_email: + return "tls_server_only" + if has_server and has_client and not has_code_signing and not has_email: + return "tls_server_and_client" + if has_client and not has_server and not has_code_signing and not has_email: + return "client_auth_only" + if has_email and not has_server and not has_client and not has_code_signing: + return "smime_only" + if has_code_signing and not has_server and not has_client and not has_email: + return "code_signing_only" + return "mixed_or_other" + + +def format_eku_template(eku_oids: list[str]) -> str: + if not eku_oids: + return "(none)" + return ", ".join(EKU_LABELS.get(oid, oid) for oid in eku_oids) + + +def format_key_usage_template(flags: list[str]) -> str: + if not flags: + return "(missing)" + return ", ".join(flags) + + +def build_classifications( + hits: list[ct_scan.CertificateHit], + records: list[ct_scan.DatabaseRecord], +) -> list[PurposeClassification]: + certificates_by_fingerprint: dict[str, x509.Certificate] = {} + for record in records: + cert = x509.load_der_x509_certificate(record.certificate_der) + is_leaf, _reason = ct_scan.is_leaf_certificate(cert) + if not is_leaf: + continue + fingerprint_sha256 = hashlib.sha256(record.certificate_der).hexdigest() + certificates_by_fingerprint.setdefault(fingerprint_sha256, cert) + + results: list[PurposeClassification] = [] + for hit in hits: + cert = certificates_by_fingerprint[hit.fingerprint_sha256] + san_dns_names = sorted(entry[4:] for entry in hit.san_entries if entry.startswith("DNS:")) + results.append( + PurposeClassification( + fingerprint_sha256=hit.fingerprint_sha256, + subject_cn=hit.subject_cn, + issuer_name=ct_scan.primary_issuer_name(hit), + category=classify_purpose(extract_eku_oids(cert)), + eku_oids=extract_eku_oids(cert), + key_usage_flags=extract_key_usage_flags(cert), + valid_from_utc=ct_scan.utc_iso(hit.validity_not_before), + valid_to_utc=ct_scan.utc_iso(hit.validity_not_after), + matched_domains=sorted(hit.matched_domains), + san_dns_names=san_dns_names, + ) + ) + results.sort( + key=lambda item: ( + item.category, + item.subject_cn.casefold(), + item.valid_from_utc, + item.fingerprint_sha256, + ) + ) + return results + + +def summarize(classifications: list[PurposeClassification], domains: list[str]) -> AssessmentSummary: + category_counts = Counter(item.category for item in classifications) + eku_templates = Counter(format_eku_template(item.eku_oids) for item in classifications) + key_usage_templates = Counter(format_key_usage_template(item.key_usage_flags) for item in classifications) + issuer_breakdown: dict[str, Counter[str]] = defaultdict(Counter) + validity_start_years: dict[str, Counter[str]] = defaultdict(Counter) + san_type_counts: Counter[str] = Counter() + subject_cn_in_dns_san_count = 0 + subject_cn_not_in_dns_san_count = 0 + categories_by_canonical_cn: dict[str, set[str]] = defaultdict(set) + + for item in classifications: + issuer_breakdown[item.category][item.issuer_name] += 1 + validity_start_years[item.category][item.valid_from_utc[:4]] += 1 + san_type_counts["DNSName"] += len(item.san_dns_names) + if item.subject_cn in set(item.san_dns_names): + subject_cn_in_dns_san_count += 1 + else: + subject_cn_not_in_dns_san_count += 1 + categories_by_canonical_cn[ct_scan.canonicalize_subject_cn(item.subject_cn)].add(item.category) + + dual_with_server_only = sorted( + canonical_cn + for canonical_cn, values in categories_by_canonical_cn.items() + if "tls_server_and_client" in values and "tls_server_only" in values + ) + dual_without_server_only = sorted( + canonical_cn + for canonical_cn, values in categories_by_canonical_cn.items() + if values == {"tls_server_and_client"} + ) + + return AssessmentSummary( + generated_at_utc=utc_now_iso(), + source_cache_domains=domains, + unique_leaf_certificates=len(classifications), + category_counts=dict(category_counts), + eku_templates=dict(eku_templates.most_common()), + key_usage_templates=dict(key_usage_templates.most_common()), + issuer_breakdown={category: dict(counter.most_common()) for category, counter in issuer_breakdown.items()}, + validity_start_years={ + category: dict(sorted(counter.items())) + for category, counter in validity_start_years.items() + }, + san_type_counts=dict(san_type_counts), + subject_cn_in_dns_san_count=subject_cn_in_dns_san_count, + subject_cn_not_in_dns_san_count=subject_cn_not_in_dns_san_count, + dual_eku_subject_cns_with_server_only_sibling=dual_with_server_only, + dual_eku_subject_cns_without_server_only_sibling=dual_without_server_only, + ) + + +def render_markdown(summary: AssessmentSummary, classifications: list[PurposeClassification]) -> str: + lines: list[str] = [] + lines.append("# Certificate Purpose Assessment") + lines.append("") + lines.append(f"Generated at: `{summary.generated_at_utc}`") + lines.append(f"Configured domains: `{', '.join(summary.source_cache_domains)}`") + lines.append("") + lines.append("## Headline Verdict") + lines.append("") + lines.append(f"- Unique current leaf certificates assessed: **{summary.unique_leaf_certificates}**") + lines.append(f"- TLS server only: **{summary.category_counts.get('tls_server_only', 0)}**") + lines.append(f"- TLS server and client auth: **{summary.category_counts.get('tls_server_and_client', 0)}**") + lines.append(f"- Client auth only: **{summary.category_counts.get('client_auth_only', 0)}**") + lines.append(f"- S/MIME only: **{summary.category_counts.get('smime_only', 0)}**") + lines.append(f"- Code signing only: **{summary.category_counts.get('code_signing_only', 0)}**") + lines.append(f"- Mixed or other: **{summary.category_counts.get('mixed_or_other', 0)}**") + lines.append(f"- No EKU: **{summary.category_counts.get('no_eku', 0)}**") + lines.append("") + lines.append("## What This Means") + lines.append("") + lines.append("- The corpus contains **only TLS-capable certificates**. There are no client-only, S/MIME, or code-signing certificates.") + lines.append("- All SAN entries seen in this corpus are DNS names.") + lines.append(f"- Subject CN appears literally in a DNS SAN for **{summary.subject_cn_in_dns_san_count} of {summary.unique_leaf_certificates}** certificates.") + lines.append("- The only ambiguity is whether to keep or set aside the certificates whose EKU allows both `serverAuth` and `clientAuth`.") + lines.append("") + lines.append("## Rework Options") + lines.append("") + lines.append(f"- Keep the full operational server corpus: **{summary.unique_leaf_certificates}** certificates.") + lines.append(f"- Keep only strict server-auth certificates: **{summary.category_counts.get('tls_server_only', 0)}** certificates.") + lines.append(f"- Create a review bucket for dual-EKU certificates: **{summary.category_counts.get('tls_server_and_client', 0)}** certificates.") + lines.append("") + lines.append("## EKU Templates") + lines.append("") + for template, count in summary.eku_templates.items(): + lines.append(f"- `{template}`: {count}") + lines.append("") + lines.append("## KeyUsage Templates") + lines.append("") + for template, count in summary.key_usage_templates.items(): + lines.append(f"- `{template}`: {count}") + lines.append("") + lines.append("## Issuer Breakdown") + lines.append("") + for category in sorted(summary.issuer_breakdown): + lines.append(f"### `{category}`") + lines.append("") + for issuer_name, count in summary.issuer_breakdown[category].items(): + lines.append(f"- `{issuer_name}`: {count}") + lines.append("") + lines.append("## Time Pattern") + lines.append("") + dual_years = set(summary.validity_start_years.get("tls_server_and_client", {})) + server_years = set(summary.validity_start_years.get("tls_server_only", {})) + if dual_years and len(dual_years) == 1: + lines.append( + f"- The dual-EKU bucket is entirely composed of certificates whose current validity starts in **{next(iter(sorted(dual_years)))}**." + ) + if dual_years and server_years and dual_years != server_years: + lines.append("- The year split suggests at least some change in issuance policy over time.") + else: + lines.append("- Time alone does not prove a migration. The stronger signal is the template split by issuer and EKU.") + lines.append("") + for category in sorted(summary.validity_start_years): + year_counts = ", ".join(f"{year}: {count}" for year, count in summary.validity_start_years[category].items()) + lines.append(f"- `{category}`: {year_counts}") + lines.append("") + lines.append("## Interpretation") + lines.append("") + lines.append("- The `tls_server_and_client` certificates still look like hostname certificates, not user or robot identity certificates.") + lines.append("- Evidence: public DNS-style Subject CNs, DNS-only SANs, public WebPKI server-auth issuers, and no email or personal-name SAN material.") + lines.append("- The most plausible reading is **legacy or permissive server certificate templates** that also included `clientAuth`, not a separate client-certificate estate.") + lines.append("") + lines.append("## Dual-EKU Hostname Overlap") + lines.append("") + lines.append( + f"- Dual-EKU subject CN families that also have a strict server-only sibling: **{len(summary.dual_eku_subject_cns_with_server_only_sibling)}**" + ) + lines.append( + f"- Dual-EKU subject CN families that currently appear only in the dual-EKU bucket: **{len(summary.dual_eku_subject_cns_without_server_only_sibling)}**" + ) + lines.append("") + if summary.dual_eku_subject_cns_with_server_only_sibling: + lines.append("### Dual-EKU Families With Server-Only Siblings") + lines.append("") + for subject_cn in summary.dual_eku_subject_cns_with_server_only_sibling: + lines.append(f"- `{subject_cn}`") + lines.append("") + if summary.dual_eku_subject_cns_without_server_only_sibling: + lines.append("### Dual-EKU Families Without Server-Only Siblings") + lines.append("") + for subject_cn in summary.dual_eku_subject_cns_without_server_only_sibling: + lines.append(f"- `{subject_cn}`") + lines.append("") + lines.append("## Detailed Dual-EKU Certificates") + lines.append("") + dual_items = [item for item in classifications if item.category == "tls_server_and_client"] + if not dual_items: + lines.append("- None") + lines.append("") + else: + for item in dual_items: + dns_sample = ", ".join(item.san_dns_names[:8]) + if len(item.san_dns_names) > 8: + dns_sample += ", ..." + lines.append(f"### `{item.subject_cn}`") + lines.append("") + lines.append(f"- Issuer: `{item.issuer_name}`") + lines.append(f"- Validity: `{item.valid_from_utc}` to `{item.valid_to_utc}`") + lines.append(f"- Matched search domains: `{', '.join(item.matched_domains)}`") + lines.append(f"- EKU: `{format_eku_template(item.eku_oids)}`") + lines.append(f"- KeyUsage: `{format_key_usage_template(item.key_usage_flags)}`") + lines.append(f"- DNS SAN count: `{len(item.san_dns_names)}`") + lines.append(f"- DNS SAN sample: `{dns_sample}`") + lines.append("") + return "\n".join(lines) + "\n" + + +def main() -> int: + args = parse_args() + domains = ct_scan.load_domains(args.domains_file) + records = load_records( + domains=domains, + cache_dir=args.cache_dir, + cache_ttl_seconds=args.cache_ttl_seconds, + max_candidates=args.max_candidates, + attempts=args.attempts, + verbose=args.verbose, + ) + hits, verification = ct_scan.build_hits(records) + classifications = build_classifications(hits, records) + summary = summarize(classifications, domains) + + markdown_payload = render_markdown(summary, classifications) + json_payload = { + "summary": asdict(summary), + "verification": asdict(verification), + "classifications": [asdict(item) for item in classifications], + } + + args.markdown_output.parent.mkdir(parents=True, exist_ok=True) + args.json_output.parent.mkdir(parents=True, exist_ok=True) + args.markdown_output.write_text(markdown_payload, encoding="utf-8") + args.json_output.write_text(json.dumps(json_payload, indent=2, sort_keys=True), encoding="utf-8") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/domains.example.txt b/domains.example.txt new file mode 100644 index 0000000..365252e --- /dev/null +++ b/domains.example.txt @@ -0,0 +1,6 @@ +# Copy this file to domains.local.txt and replace the placeholders. +# One search term per line. +# Do not commit domains.local.txt or any real search terms. + +brand.example +service.example diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8c4715c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +cryptography>=46,<47 +psycopg[binary]>=3.3,<4