mirror of
https://github.com/saymrwulf/CertTransparencySearch.git
synced 2026-05-14 20:37:52 +00:00
2168 lines
125 KiB
Markdown
2168 lines
125 KiB
Markdown
# ct_scan.py
|
|
|
|
Source file: [`ct_scan.py`](../ct_scan.py)
|
|
|
|
Core Certificate Transparency scanner. This file talks to crt.sh's public database, downloads the real certificate bytes, verifies that they are real leaf certificates, groups them into readable families, and can render the full inventory appendix.
|
|
|
|
Main flow in one line: `domains file -> raw CT query -> parsed leaf certificates -> CN families -> issuer trust -> appendix reports`
|
|
|
|
How to read this page:
|
|
|
|
- left side: the actual source code block
|
|
- right side: a plain-English explanation for a beginner
|
|
- read from top to bottom because later blocks depend on earlier ones
|
|
|
|
## Module setup
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import psycopg
|
|
from cryptography import x509
|
|
from cryptography.x509 import general_name
|
|
from cryptography.x509.oid import ExtensionOID
|
|
from cryptography.x509.oid import NameOID
|
|
from psycopg.rows import dict_row
|
|
|
|
|
|
QUERY_SQL = """
|
|
WITH ci AS (
|
|
SELECT
|
|
min(sub.certificate_id) AS id,
|
|
min(sub.issuer_ca_id) AS issuer_ca_id,
|
|
x509_commonName(sub.certificate) AS common_name,
|
|
x509_subjectName(sub.certificate) AS subject_dn,
|
|
x509_notBefore(sub.certificate) AS not_before,
|
|
x509_notAfter(sub.certificate) AS not_after,
|
|
encode(x509_serialNumber(sub.certificate), 'hex') AS serial_number,
|
|
sub.certificate AS certificate
|
|
FROM (
|
|
SELECT cai.*
|
|
FROM certificate_and_identities cai
|
|
WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate)
|
|
AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\'
|
|
LIMIT %(max_candidates)s
|
|
) sub
|
|
GROUP BY sub.certificate
|
|
)
|
|
SELECT
|
|
ci.id,
|
|
ci.issuer_ca_id,
|
|
ca.name AS issuer_name,
|
|
ci.common_name,
|
|
ci.subject_dn,
|
|
ci.not_before,
|
|
ci.not_after,
|
|
cl.first_seen,
|
|
ci.serial_number,
|
|
coalesce(cl.revoked, 0) AS revoked_count,
|
|
rev.revocation_date,
|
|
rev.reason_code,
|
|
rev.last_seen_check_date,
|
|
crl_state.active_crl_count,
|
|
crl_state.last_checked AS crl_last_checked,
|
|
ci.certificate
|
|
FROM ci
|
|
JOIN ca ON ca.id = ci.issuer_ca_id
|
|
JOIN certificate_lifecycle cl ON cl.certificate_id = ci.id
|
|
LEFT JOIN LATERAL (
|
|
SELECT
|
|
cr.revocation_date,
|
|
cr.reason_code,
|
|
cr.last_seen_check_date
|
|
FROM crl_revoked cr
|
|
WHERE cr.ca_id = ci.issuer_ca_id
|
|
AND cr.serial_number = decode(ci.serial_number, 'hex')
|
|
ORDER BY cr.last_seen_check_date DESC NULLS LAST
|
|
LIMIT 1
|
|
) rev ON TRUE
|
|
LEFT JOIN LATERAL (
|
|
SELECT
|
|
count(*) FILTER (
|
|
WHERE crl.error_message IS NULL
|
|
AND crl.next_update > now() AT TIME ZONE 'UTC'
|
|
) AS active_crl_count,
|
|
max(crl.last_checked) AS last_checked
|
|
FROM crl
|
|
WHERE crl.ca_id = ci.issuer_ca_id
|
|
) crl_state ON TRUE
|
|
WHERE ci.not_before <= now() AT TIME ZONE 'UTC'
|
|
AND ci.not_after >= now() AT TIME ZONE 'UTC'
|
|
AND cl.certificate_type = 'Certificate'
|
|
ORDER BY cl.first_seen DESC NULLS LAST, ci.id DESC;
|
|
"""
|
|
|
|
|
|
RAW_MATCH_COUNT_SQL = """
|
|
SELECT count(*)
|
|
FROM certificate_and_identities cai
|
|
WHERE plainto_tsquery('certwatch', %(domain)s) @@ identities(cai.certificate)
|
|
AND cai.name_value ILIKE %(name_pattern)s ESCAPE '\\'
|
|
"""
|
|
|
|
|
|
REVOCATION_REASONS = {
|
|
1: "keyCompromise",
|
|
2: "cACompromise",
|
|
3: "affiliationChanged",
|
|
4: "superseded",
|
|
5: "cessationOfOperation",
|
|
6: "certificateHold",
|
|
8: "removeFromCRL",
|
|
9: "privilegeWithdrawn",
|
|
10: "aACompromise",
|
|
}
|
|
|
|
|
|
PRECERT_POISON_OID = x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Imports, SQL, constants, and shared data shapes for the core CT scanner.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Nothing yet; this is the starting point. → <strong>Module setup</strong> → `connect`, `query_domain`, `build_hits`, and the report renderers use these shared definitions.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## DatabaseRecord
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class DatabaseRecord:
|
|
domain: str
|
|
certificate_id: int
|
|
issuer_ca_id: int
|
|
issuer_name: str
|
|
common_name: str | None
|
|
subject_dn: str | None
|
|
not_before: datetime
|
|
not_after: datetime
|
|
first_seen: datetime | None
|
|
serial_number: str
|
|
revoked_count: int
|
|
revocation_date: datetime | None
|
|
reason_code: int | None
|
|
last_seen_check_date: datetime | None
|
|
active_crl_count: int
|
|
crl_last_checked: datetime | None
|
|
certificate_der: bytes</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>A raw row as it comes back from the crt.sh database before local cleanup.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>DatabaseRecord</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## CertificateHit
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class CertificateHit:
|
|
fingerprint_sha256: str
|
|
subject_cn: str
|
|
validity_not_before: datetime
|
|
validity_not_after: datetime
|
|
san_entries: list[str]
|
|
revocation_status: str
|
|
revocation_date: datetime | None
|
|
revocation_reason: str | None
|
|
revocation_note: str | None
|
|
crtsh_crl_timestamp: datetime | None
|
|
matched_domains: set[str] = field(default_factory=set)
|
|
first_seen: datetime | None = None
|
|
crtsh_certificate_ids: set[int] = field(default_factory=set)
|
|
serial_numbers: set[str] = field(default_factory=set)
|
|
issuer_names: set[str] = field(default_factory=set)
|
|
issuer_ca_ids: set[int] = field(default_factory=set)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>The cleaned working object used by the rest of the analytics pipeline.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>CertificateHit</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## VerificationStats
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class VerificationStats:
|
|
input_rows: int = 0
|
|
unique_leaf_certificates: int = 0
|
|
non_leaf_filtered: int = 0
|
|
precertificate_poison_filtered: int = 0</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>A tiny running counter that proves how many rows were kept or rejected.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>VerificationStats</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## CertificateGroup
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class CertificateGroup:
|
|
group_id: str
|
|
group_type: str
|
|
member_indices: list[int]
|
|
member_count: int
|
|
distinct_subject_cn_count: int
|
|
distinct_exact_content_count: int
|
|
numbered_cn_patterns: set[str]
|
|
matched_domains: set[str]
|
|
subject_cns: set[str]
|
|
first_seen_min: datetime | None
|
|
first_seen_max: datetime | None
|
|
valid_from_min: datetime
|
|
valid_to_max: datetime
|
|
revocation_counts: Counter</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>One readable family of related certificates after grouping logic runs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>CertificateGroup</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## ScanStats
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class ScanStats:
|
|
generated_at_utc: str
|
|
configured_domains: list[str]
|
|
unique_leaf_certificates: int
|
|
groups_total: int
|
|
groups_multi_member: int
|
|
groups_singleton: int
|
|
groups_by_type: dict[str, int]
|
|
verification: VerificationStats</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Top-level summary numbers used in reports.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>ScanStats</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## IssuerTrustInfo
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">@dataclass
|
|
class IssuerTrustInfo:
|
|
issuer_name: str
|
|
issuer_ca_ids: set[int]
|
|
server_auth_contexts: set[str]
|
|
major_webpki: bool</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Stores the public-trust picture for one issuer family.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>IssuerTrustInfo</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## load_domains
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def load_domains(path: Path) -> list[str]:
|
|
domains: list[str] = []
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip().lower()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if line.startswith("*."):
|
|
line = line[2:]
|
|
domains.append(line)
|
|
unique_domains = sorted(set(domains))
|
|
if not unique_domains:
|
|
raise ValueError(f"No domains found in {path}")
|
|
return unique_domains</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block loads data from disk, cache, or an earlier stage so later code can work with it.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Operator's local config file. → <strong>load_domains</strong> → `query_domain` and the higher-level loaders use this cleaned domain list.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## escape_like
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def escape_like(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>escape_like</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## utc_iso
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def utc_iso(value: datetime | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
if value.tzinfo is None:
|
|
value = value.replace(tzinfo=UTC)
|
|
else:
|
|
value = value.astimezone(UTC)
|
|
return value.isoformat(timespec="seconds").replace("+00:00", "Z")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This is a small helper that keeps the larger analytical code cleaner and easier to reuse.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>utc_iso</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## serialize_datetime
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def serialize_datetime(value: datetime | None) -> str | None:
|
|
return utc_iso(value) if value is not None else None</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>serialize_datetime</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## parse_datetime
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def parse_datetime(value: str | None) -> datetime | None:
|
|
if value is None:
|
|
return None
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>parse_datetime</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## cache_path
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def cache_path(cache_dir: Path, domain: str) -> Path:
|
|
safe_domain = "".join(ch if ch.isalnum() or ch in "-._" else "_" for ch in domain)
|
|
return cache_dir / f"{safe_domain}.json"</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>cache_path</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## record_to_cache_payload
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def record_to_cache_payload(record: DatabaseRecord) -> dict[str, Any]:
|
|
return {
|
|
"domain": record.domain,
|
|
"certificate_id": record.certificate_id,
|
|
"issuer_ca_id": record.issuer_ca_id,
|
|
"issuer_name": record.issuer_name,
|
|
"common_name": record.common_name,
|
|
"subject_dn": record.subject_dn,
|
|
"not_before": serialize_datetime(record.not_before),
|
|
"not_after": serialize_datetime(record.not_after),
|
|
"first_seen": serialize_datetime(record.first_seen),
|
|
"serial_number": record.serial_number,
|
|
"revoked_count": record.revoked_count,
|
|
"revocation_date": serialize_datetime(record.revocation_date),
|
|
"reason_code": record.reason_code,
|
|
"last_seen_check_date": serialize_datetime(record.last_seen_check_date),
|
|
"active_crl_count": record.active_crl_count,
|
|
"crl_last_checked": serialize_datetime(record.crl_last_checked),
|
|
"certificate_der_b64": base64.b64encode(record.certificate_der).decode("ascii"),
|
|
}</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>record_to_cache_payload</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## record_from_cache_payload
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def record_from_cache_payload(payload: dict[str, Any]) -> DatabaseRecord:
|
|
return DatabaseRecord(
|
|
domain=payload["domain"],
|
|
certificate_id=int(payload["certificate_id"]),
|
|
issuer_ca_id=int(payload["issuer_ca_id"]),
|
|
issuer_name=payload["issuer_name"],
|
|
common_name=payload.get("common_name"),
|
|
subject_dn=payload.get("subject_dn"),
|
|
not_before=parse_datetime(payload["not_before"]) or datetime.min,
|
|
not_after=parse_datetime(payload["not_after"]) or datetime.min,
|
|
first_seen=parse_datetime(payload.get("first_seen")),
|
|
serial_number=payload["serial_number"],
|
|
revoked_count=int(payload["revoked_count"]),
|
|
revocation_date=parse_datetime(payload.get("revocation_date")),
|
|
reason_code=payload.get("reason_code"),
|
|
last_seen_check_date=parse_datetime(payload.get("last_seen_check_date")),
|
|
active_crl_count=int(payload["active_crl_count"]),
|
|
crl_last_checked=parse_datetime(payload.get("crl_last_checked")),
|
|
certificate_der=base64.b64decode(payload["certificate_der_b64"]),
|
|
)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>record_from_cache_payload</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## load_cached_records
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def load_cached_records(cache_dir: Path, domain: str, ttl_seconds: int, max_candidates: int) -> list[DatabaseRecord] | None:
|
|
path = cache_path(cache_dir, domain)
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
if payload.get("version") != 1:
|
|
return None
|
|
if payload.get("max_candidates") != max_candidates:
|
|
return None
|
|
cached_at = parse_datetime(payload.get("cached_at"))
|
|
if cached_at is None:
|
|
return None
|
|
age = time.time() - cached_at.replace(tzinfo=UTC).timestamp()
|
|
if age > ttl_seconds:
|
|
return None
|
|
return [record_from_cache_payload(item) for item in payload.get("records", [])]</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block loads data from disk, cache, or an earlier stage so later code can work with it.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>load_cached_records</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## store_cached_records
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def store_cached_records(cache_dir: Path, domain: str, max_candidates: int, records: list[DatabaseRecord]) -> None:
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"version": 1,
|
|
"cached_at": utc_iso(datetime.now(UTC)),
|
|
"max_candidates": max_candidates,
|
|
"records": [record_to_cache_payload(record) for record in records],
|
|
}
|
|
cache_path(cache_dir, domain).write_text(
|
|
json.dumps(payload, indent=2, sort_keys=True),
|
|
encoding="utf-8",
|
|
)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block saves an intermediate result so the next run can reuse it instead of recomputing everything.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>store_cached_records</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## connect
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def connect() -> psycopg.Connection:
|
|
return psycopg.connect(
|
|
host="crt.sh",
|
|
port=5432,
|
|
dbname="certwatch",
|
|
user="guest",
|
|
password="guest",
|
|
connect_timeout=5,
|
|
sslmode="disable",
|
|
autocommit=True,
|
|
application_name="ct_transparency_search",
|
|
)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Opens the direct guest PostgreSQL connection to crt.sh's certwatch backend.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Called by query functions that need live crt.sh data. → <strong>connect</strong> → `query_domain`, `query_raw_match_count`, and issuer-trust lookups all depend on this connection.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## query_domain
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def query_domain(domain: str, max_candidates: int, attempts: int, verbose: bool) -> list[DatabaseRecord]:
|
|
params = {
|
|
"domain": domain,
|
|
"name_pattern": f"%{escape_like(domain)}%",
|
|
"max_candidates": max_candidates,
|
|
}
|
|
raw_match_count = query_raw_match_count(domain=domain, attempts=attempts, verbose=verbose)
|
|
if raw_match_count > max_candidates:
|
|
raise ValueError(
|
|
f"domain={domain} raw identity matches={raw_match_count} exceed max_candidates={max_candidates}; "
|
|
f"increase --max-candidates-per-domain to at least {raw_match_count} for a complete result set"
|
|
)
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, attempts + 1):
|
|
try:
|
|
with connect() as conn, conn.cursor(row_factory=dict_row) as cur:
|
|
cur.execute(QUERY_SQL, params)
|
|
rows = cur.fetchall()
|
|
return [row_to_record(domain, row) for row in rows]
|
|
except Exception as exc:
|
|
last_error = exc
|
|
if attempt == attempts:
|
|
break
|
|
if verbose:
|
|
print(
|
|
f"[warn] domain={domain} attempt={attempt}/{attempts} failed: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
time.sleep(min(2 ** attempt, 10))
|
|
assert last_error is not None
|
|
raise last_error</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Runs the main certificate query for one search term and refuses silent undercounting.</p>
|
|
<p><strong>Flow arrows</strong></p><p>A domain plus the safety cap and retry settings. → <strong>query_domain</strong> → `build_hits` receives the raw records returned here.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## query_raw_match_count
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def query_raw_match_count(domain: str, attempts: int, verbose: bool) -> int:
|
|
params = {
|
|
"domain": domain,
|
|
"name_pattern": f"%{escape_like(domain)}%",
|
|
}
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, attempts + 1):
|
|
try:
|
|
with connect() as conn, conn.cursor() as cur:
|
|
cur.execute(RAW_MATCH_COUNT_SQL, params)
|
|
row = cur.fetchone()
|
|
return int(row[0])
|
|
except Exception as exc:
|
|
last_error = exc
|
|
if attempt == attempts:
|
|
break
|
|
if verbose:
|
|
print(
|
|
f"[warn] domain={domain} raw-count attempt={attempt}/{attempts} failed: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
time.sleep(min(2 ** attempt, 10))
|
|
assert last_error is not None
|
|
raise last_error</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Counts how many raw hits exist before the capped query runs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>A domain string from the local config. → <strong>query_raw_match_count</strong> → `query_domain` uses this count to refuse silent undercounting.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## row_to_record
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def row_to_record(domain: str, row: dict[str, Any]) -> DatabaseRecord:
|
|
return DatabaseRecord(
|
|
domain=domain,
|
|
certificate_id=int(row["id"]),
|
|
issuer_ca_id=int(row["issuer_ca_id"]),
|
|
issuer_name=row["issuer_name"],
|
|
common_name=row["common_name"],
|
|
subject_dn=row["subject_dn"],
|
|
not_before=row["not_before"],
|
|
not_after=row["not_after"],
|
|
first_seen=row["first_seen"],
|
|
serial_number=row["serial_number"],
|
|
revoked_count=int(row["revoked_count"]),
|
|
revocation_date=row["revocation_date"],
|
|
reason_code=row["reason_code"],
|
|
last_seen_check_date=row["last_seen_check_date"],
|
|
active_crl_count=int(row["active_crl_count"] or 0),
|
|
crl_last_checked=row["crl_last_checked"],
|
|
certificate_der=bytes(row["certificate"]),
|
|
)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>row_to_record</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## extract_san_entries
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def extract_san_entries(cert: x509.Certificate) -> list[str]:
|
|
try:
|
|
extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
|
except x509.ExtensionNotFound:
|
|
return []
|
|
entries: list[str] = []
|
|
for name in extension.value:
|
|
entries.append(format_general_name(name))
|
|
return sorted(set(entries), key=str.casefold)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block pulls one specific piece of information out of a larger object.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>extract_san_entries</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## format_general_name
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def format_general_name(name: general_name.GeneralName) -> str:
|
|
if isinstance(name, x509.DNSName):
|
|
return f"DNS:{name.value}"
|
|
if isinstance(name, x509.RFC822Name):
|
|
return f"EMAIL:{name.value}"
|
|
if isinstance(name, x509.UniformResourceIdentifier):
|
|
return f"URI:{name.value}"
|
|
if isinstance(name, x509.IPAddress):
|
|
return f"IP:{name.value}"
|
|
if isinstance(name, x509.RegisteredID):
|
|
return f"RID:{name.value.dotted_string}"
|
|
if isinstance(name, x509.DirectoryName):
|
|
return f"DIR:{name.value.rfc4514_string()}"
|
|
if isinstance(name, x509.OtherName):
|
|
encoded = base64.b64encode(name.value).decode("ascii")
|
|
return f"OTHER:{name.type_id.dotted_string}:{encoded}"
|
|
return str(name)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>format_general_name</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## extract_common_name
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def extract_common_name(cert: x509.Certificate) -> str | None:
|
|
attributes = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
|
|
if not attributes:
|
|
return None
|
|
return attributes[0].value</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block pulls one specific piece of information out of a larger object.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>extract_common_name</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## has_precertificate_poison
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def has_precertificate_poison(cert: x509.Certificate) -> bool:
|
|
try:
|
|
cert.extensions.get_extension_for_oid(PRECERT_POISON_OID)
|
|
except x509.ExtensionNotFound:
|
|
return False
|
|
return True</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>has_precertificate_poison</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## is_leaf_certificate
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def is_leaf_certificate(cert: x509.Certificate) -> tuple[bool, str]:
|
|
if has_precertificate_poison(cert):
|
|
return (False, "precertificate_poison")
|
|
try:
|
|
basic_constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value
|
|
if basic_constraints.ca:
|
|
return (False, "basic_constraints_ca")
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
try:
|
|
key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value
|
|
if key_usage.key_cert_sign:
|
|
return (False, "key_cert_sign")
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
return (True, "leaf")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>is_leaf_certificate</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## revocation_fields
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def revocation_fields(record: DatabaseRecord) -> tuple[str, datetime | None, str | None, datetime | None, str | None]:
|
|
if record.revoked_count > 0:
|
|
reason: str | None = None
|
|
if record.reason_code in REVOCATION_REASONS:
|
|
reason = REVOCATION_REASONS[record.reason_code]
|
|
elif record.reason_code not in (None, 0):
|
|
reason = f"unknown({record.reason_code})"
|
|
return ("revoked", record.revocation_date, reason, record.last_seen_check_date, None)
|
|
if record.active_crl_count > 0:
|
|
return ("not_revoked", None, None, record.crl_last_checked, None)
|
|
return ("unknown", None, None, record.crl_last_checked, "no fresh crt.sh CRL data")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>revocation_fields</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## revocation_priority
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def revocation_priority(status: str) -> int:
|
|
return {
|
|
"unknown": 0,
|
|
"not_revoked": 1,
|
|
"revoked": 2,
|
|
}[status]</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>revocation_priority</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_hits
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_hits(records: list[DatabaseRecord]) -> tuple[list[CertificateHit], VerificationStats]:
|
|
verification = VerificationStats(input_rows=len(records))
|
|
hits: dict[str, CertificateHit] = {}
|
|
for record in records:
|
|
cert = x509.load_der_x509_certificate(record.certificate_der)
|
|
is_leaf, reason = is_leaf_certificate(cert)
|
|
if not is_leaf:
|
|
if reason == "precertificate_poison":
|
|
verification.precertificate_poison_filtered += 1
|
|
else:
|
|
verification.non_leaf_filtered += 1
|
|
continue
|
|
fingerprint_hex = hashlib.sha256(record.certificate_der).hexdigest()
|
|
subject_cn = record.common_name or extract_common_name(cert) or "-"
|
|
revocation_status, revocation_date, revocation_reason, crtsh_crl_timestamp, revocation_note = revocation_fields(record)
|
|
hit = hits.get(fingerprint_hex)
|
|
if hit is None:
|
|
hit = CertificateHit(
|
|
fingerprint_sha256=fingerprint_hex,
|
|
subject_cn=subject_cn,
|
|
validity_not_before=record.not_before,
|
|
validity_not_after=record.not_after,
|
|
san_entries=extract_san_entries(cert),
|
|
revocation_status=revocation_status,
|
|
revocation_date=revocation_date,
|
|
revocation_reason=revocation_reason,
|
|
revocation_note=revocation_note,
|
|
crtsh_crl_timestamp=crtsh_crl_timestamp,
|
|
matched_domains={record.domain},
|
|
first_seen=record.first_seen,
|
|
crtsh_certificate_ids={record.certificate_id},
|
|
serial_numbers={record.serial_number},
|
|
issuer_names={record.issuer_name},
|
|
issuer_ca_ids={record.issuer_ca_id},
|
|
)
|
|
hits[fingerprint_hex] = hit
|
|
continue
|
|
hit.matched_domains.add(record.domain)
|
|
hit.crtsh_certificate_ids.add(record.certificate_id)
|
|
hit.serial_numbers.add(record.serial_number)
|
|
hit.issuer_names.add(record.issuer_name)
|
|
hit.issuer_ca_ids.add(record.issuer_ca_id)
|
|
if hit.first_seen is None or (record.first_seen is not None and record.first_seen < hit.first_seen):
|
|
hit.first_seen = record.first_seen
|
|
if revocation_priority(revocation_status) > revocation_priority(hit.revocation_status):
|
|
hit.revocation_status = revocation_status
|
|
hit.revocation_date = revocation_date
|
|
hit.revocation_reason = revocation_reason
|
|
hit.revocation_note = revocation_note
|
|
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
|
|
elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is not None and crtsh_crl_timestamp is not None:
|
|
if crtsh_crl_timestamp > hit.crtsh_crl_timestamp:
|
|
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
|
|
elif revocation_status == hit.revocation_status and hit.crtsh_crl_timestamp is None:
|
|
hit.crtsh_crl_timestamp = crtsh_crl_timestamp
|
|
ordered_hits = sorted(
|
|
hits.values(),
|
|
key=lambda hit: (
|
|
sorted(hit.matched_domains),
|
|
hit.subject_cn.casefold(),
|
|
hit.validity_not_before,
|
|
hit.fingerprint_sha256,
|
|
),
|
|
)
|
|
verification.unique_leaf_certificates = len(ordered_hits)
|
|
return (ordered_hits, verification)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Parses certificate bytes, rejects bad objects, and merges duplicate views of the same cert.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Raw `DatabaseRecord` rows from crt.sh. → <strong>build_hits</strong> → `build_groups`, purpose analysis, DNS analysis, and CAA analysis all consume these cleaned hits.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## canonicalize_subject_cn
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def canonicalize_subject_cn(subject_cn: str) -> str:
|
|
subject_cn = subject_cn.lower()
|
|
if subject_cn.startswith("www."):
|
|
return subject_cn[4:]
|
|
return subject_cn</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block makes values consistent so matching and grouping do not get confused by superficial differences.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>canonicalize_subject_cn</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## normalize_counter_pattern
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def normalize_counter_pattern(hostname: str) -> str | None:
|
|
normalized = re.sub(r"\d+", "#", canonicalize_subject_cn(hostname))
|
|
if normalized == canonicalize_subject_cn(hostname):
|
|
return None
|
|
return normalized</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block makes values consistent so matching and grouping do not get confused by superficial differences.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>normalize_counter_pattern</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## UnionFind
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">class UnionFind:
|
|
def __init__(self, size: int) -> None:
|
|
self.parent = list(range(size))
|
|
self.rank = [0] * size
|
|
|
|
def find(self, value: int) -> int:
|
|
while self.parent[value] != value:
|
|
self.parent[value] = self.parent[self.parent[value]]
|
|
value = self.parent[value]
|
|
return value
|
|
|
|
def union(self, left: int, right: int) -> None:
|
|
left_root = self.find(left)
|
|
right_root = self.find(right)
|
|
if left_root == right_root:
|
|
return
|
|
if self.rank[left_root] < self.rank[right_root]:
|
|
left_root, right_root = right_root, left_root
|
|
self.parent[right_root] = left_root
|
|
if self.rank[left_root] == self.rank[right_root]:
|
|
self.rank[left_root] += 1</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This class is a structured container for one piece of data that later code passes around instead of juggling many loose variables.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>UnionFind</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_groups
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_groups(hits: list[CertificateHit]) -> list[CertificateGroup]:
|
|
if not hits:
|
|
return []
|
|
canonical_cns_by_pattern: dict[str, set[str]] = defaultdict(set)
|
|
for hit in hits:
|
|
pattern = normalize_counter_pattern(hit.subject_cn)
|
|
if pattern is not None:
|
|
canonical_cns_by_pattern[pattern].add(canonicalize_subject_cn(hit.subject_cn))
|
|
|
|
qualifying_patterns = {
|
|
pattern
|
|
for pattern, canonical_cns in canonical_cns_by_pattern.items()
|
|
if len(canonical_cns) > 1
|
|
}
|
|
components: dict[tuple[str, str], list[int]] = defaultdict(list)
|
|
for index, hit in enumerate(hits):
|
|
canonical_cn = canonicalize_subject_cn(hit.subject_cn)
|
|
pattern = normalize_counter_pattern(hit.subject_cn)
|
|
if pattern in qualifying_patterns:
|
|
components[("pattern", pattern)].append(index)
|
|
else:
|
|
components[("exact", canonical_cn)].append(index)
|
|
|
|
provisional_groups: list[CertificateGroup] = []
|
|
for (family_kind, family_key), member_indices in components.items():
|
|
member_hits = [hits[index] for index in member_indices]
|
|
subject_cns = {hit.subject_cn for hit in member_hits}
|
|
unique_san_profiles = {tuple(hit.san_entries) for hit in member_hits}
|
|
numbered_patterns = {family_key} if family_kind == "pattern" else set()
|
|
group_type = "numbered_cn_pattern" if family_kind == "pattern" else "exact_endpoint_family"
|
|
first_seen_values = [hit.first_seen for hit in member_hits if hit.first_seen is not None]
|
|
provisional_groups.append(
|
|
CertificateGroup(
|
|
group_id="",
|
|
group_type=group_type,
|
|
member_indices=sorted(member_indices),
|
|
member_count=len(member_indices),
|
|
distinct_subject_cn_count=len(subject_cns),
|
|
distinct_exact_content_count=len(unique_san_profiles),
|
|
numbered_cn_patterns=numbered_patterns,
|
|
matched_domains={domain for hit in member_hits for domain in hit.matched_domains},
|
|
subject_cns=subject_cns,
|
|
first_seen_min=min(first_seen_values) if first_seen_values else None,
|
|
first_seen_max=max(first_seen_values) if first_seen_values else None,
|
|
valid_from_min=min(hit.validity_not_before for hit in member_hits),
|
|
valid_to_max=max(hit.validity_not_after for hit in member_hits),
|
|
revocation_counts=Counter(hit.revocation_status for hit in member_hits),
|
|
)
|
|
)
|
|
|
|
provisional_groups.sort(
|
|
key=lambda group: (
|
|
-group.member_count,
|
|
group.group_type,
|
|
min(canonicalize_subject_cn(value) for value in group.subject_cns),
|
|
)
|
|
)
|
|
for position, group in enumerate(provisional_groups, start=1):
|
|
group.group_id = f"G{position:04d}"
|
|
return provisional_groups</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Turns a flat certificate list into CN-based families such as exact endpoints or numbered rails.</p>
|
|
<p><strong>Flow arrows</strong></p><p>The flat list of `CertificateHit` objects. → <strong>build_groups</strong> → The report builders use these groups to turn raw certificate clutter into readable families.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## describe_group_basis
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def describe_group_basis(group: CertificateGroup) -> str:
|
|
if group.group_type == "numbered_cn_pattern":
|
|
pattern = next(iter(group.numbered_cn_patterns))
|
|
return f"CN pattern with running-number slot: `{pattern}`"
|
|
base = min(canonicalize_subject_cn(value) for value in group.subject_cns)
|
|
return f"Same endpoint CN family (exact CN; `www.` grouped with base name): `{base}`"</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>describe_group_basis</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## primary_issuer_name
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def primary_issuer_name(hit: CertificateHit) -> str:
|
|
return sorted(hit.issuer_names)[0]</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>primary_issuer_name</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## query_issuer_trust
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def query_issuer_trust(hits: list[CertificateHit]) -> dict[str, IssuerTrustInfo]:
|
|
issuer_name_to_ca_ids: dict[str, set[int]] = defaultdict(set)
|
|
for hit in hits:
|
|
issuer_name_to_ca_ids[primary_issuer_name(hit)].update(hit.issuer_ca_ids)
|
|
all_ca_ids = sorted({ca_id for ca_ids in issuer_name_to_ca_ids.values() for ca_id in ca_ids})
|
|
contexts_by_ca_id: dict[int, set[str]] = defaultdict(set)
|
|
if all_ca_ids:
|
|
query = """
|
|
SELECT ctp.ca_id, tc.ctx
|
|
FROM ca_trust_purpose ctp
|
|
JOIN trust_context tc ON tc.id = ctp.trust_context_id
|
|
JOIN trust_purpose tp ON tp.id = ctp.trust_purpose_id
|
|
WHERE ctp.ca_id = ANY(%s)
|
|
AND tp.purpose = 'Server Authentication'
|
|
AND ctp.is_time_valid = TRUE
|
|
AND ctp.disabled_from IS NULL
|
|
"""
|
|
with connect() as conn, conn.cursor() as cur:
|
|
cur.execute(query, (all_ca_ids,))
|
|
for ca_id, trust_context in cur.fetchall():
|
|
contexts_by_ca_id[int(ca_id)].add(str(trust_context))
|
|
major_contexts = {"Mozilla", "Chrome", "Apple", "Microsoft", "Android"}
|
|
results: dict[str, IssuerTrustInfo] = {}
|
|
for issuer_name, ca_ids in issuer_name_to_ca_ids.items():
|
|
merged_contexts = {ctx for ca_id in ca_ids for ctx in contexts_by_ca_id.get(ca_id, set())}
|
|
results[issuer_name] = IssuerTrustInfo(
|
|
issuer_name=issuer_name,
|
|
issuer_ca_ids=set(ca_ids),
|
|
server_auth_contexts=merged_contexts,
|
|
major_webpki=major_contexts.issubset(merged_contexts),
|
|
)
|
|
return results</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Checks which issuers are currently trusted for public TLS in the major WebPKI contexts.</p>
|
|
<p><strong>Flow arrows</strong></p><p>The cleaned current certificate hits. → <strong>query_issuer_trust</strong> → Report builders use this trust view in the certificate chapters and appendix tables.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## status_marker
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def status_marker(status: str) -> str:
|
|
return {
|
|
"not_revoked": "OK ",
|
|
"revoked": "REV",
|
|
"unknown": "UNK",
|
|
}[status]</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>status_marker</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## one_line_revocation
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def one_line_revocation(hit: CertificateHit) -> str:
|
|
if hit.revocation_status == "revoked":
|
|
detail = f"revoked {utc_iso(hit.revocation_date)}" if hit.revocation_date else "revoked"
|
|
if hit.revocation_reason:
|
|
detail += f", reason={hit.revocation_reason}"
|
|
return detail
|
|
if hit.revocation_status == "unknown":
|
|
if hit.revocation_note:
|
|
return f"unknown, {hit.revocation_note}"
|
|
return "unknown"
|
|
return "not revoked"</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>one_line_revocation</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## san_tail_split
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def san_tail_split(domain: str) -> tuple[list[str], str]:
|
|
labels = domain.split(".")
|
|
common_second_level = {"ac", "co", "com", "edu", "gov", "net", "org"}
|
|
suffix_len = 2
|
|
if len(labels) >= 3 and len(labels[-1]) == 2 and labels[-2] in common_second_level:
|
|
suffix_len = 3
|
|
if len(labels) <= suffix_len:
|
|
return ([], domain)
|
|
return (labels[:-suffix_len], ".".join(labels[-suffix_len:]))</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>san_tail_split</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_san_tree_lines
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_san_tree_lines(san_entries: list[str]) -> list[str]:
|
|
return build_san_tree_lines_with_style(san_entries, ascii_only=False)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block constructs a richer higher-level result from simpler inputs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>build_san_tree_lines</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_san_tree_units_with_style
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_san_tree_units_with_style(san_entries: list[str], ascii_only: bool) -> list[list[str]]:
|
|
dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")})
|
|
other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")})
|
|
tree: dict[str, Any] = {}
|
|
for domain in dns_entries:
|
|
prefix_labels, tail = san_tail_split(domain)
|
|
cursor = tree
|
|
for label in prefix_labels:
|
|
cursor = cursor.setdefault(label, {})
|
|
cursor.setdefault(tail, {})
|
|
|
|
def render(node: dict[str, Any], prefix: str = "") -> list[str]:
|
|
lines: list[str] = []
|
|
keys = sorted(node.keys(), key=str.casefold)
|
|
for index, key in enumerate(keys):
|
|
is_last = index == len(keys) - 1
|
|
if ascii_only:
|
|
connector = "`- " if is_last else "|- "
|
|
else:
|
|
connector = "└─ " if is_last else "├─ "
|
|
lines.append(prefix + connector + key)
|
|
child = node[key]
|
|
if ascii_only:
|
|
child_prefix = prefix + (" " if is_last else "| ")
|
|
else:
|
|
child_prefix = prefix + (" " if is_last else "│ ")
|
|
lines.extend(render(child, child_prefix))
|
|
return lines
|
|
|
|
units: list[list[str]] = []
|
|
for key in sorted(tree.keys(), key=str.casefold):
|
|
units.append(render({key: tree[key]}))
|
|
for entry in other_entries:
|
|
units.append([f"{'*' if ascii_only else '•'} {entry}"])
|
|
if not units:
|
|
units.append([f"{'*' if ascii_only else '•'} -"])
|
|
return units</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block constructs a richer higher-level result from simpler inputs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>build_san_tree_units_with_style</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_san_tree_chunks_with_style
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_san_tree_chunks_with_style(
|
|
san_entries: list[str],
|
|
ascii_only: bool,
|
|
max_lines_per_chunk: int = 24,
|
|
) -> list[list[str]]:
|
|
chunks: list[list[str]] = []
|
|
current_chunk: list[str] = []
|
|
current_lines = 0
|
|
|
|
def flush_current_chunk() -> None:
|
|
nonlocal current_chunk, current_lines
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
current_chunk = []
|
|
current_lines = 0
|
|
|
|
for unit in build_san_tree_units_with_style(san_entries, ascii_only=ascii_only):
|
|
if len(unit) > max_lines_per_chunk:
|
|
flush_current_chunk()
|
|
for start in range(0, len(unit), max_lines_per_chunk):
|
|
chunks.append(unit[start : start + max_lines_per_chunk])
|
|
continue
|
|
if current_chunk and current_lines + len(unit) > max_lines_per_chunk:
|
|
flush_current_chunk()
|
|
current_chunk.extend(unit)
|
|
current_lines += len(unit)
|
|
|
|
flush_current_chunk()
|
|
return chunks</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block constructs a richer higher-level result from simpler inputs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>build_san_tree_chunks_with_style</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## build_san_tree_lines_with_style
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def build_san_tree_lines_with_style(san_entries: list[str], ascii_only: bool) -> list[str]:
|
|
lines: list[str] = []
|
|
for chunk in build_san_tree_chunks_with_style(
|
|
san_entries,
|
|
ascii_only=ascii_only,
|
|
max_lines_per_chunk=10_000,
|
|
):
|
|
lines.extend(chunk)
|
|
return lines</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block constructs a richer higher-level result from simpler inputs.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>build_san_tree_lines_with_style</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## group_hits_by_issuer
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def group_hits_by_issuer(hits: list[CertificateHit]) -> tuple[dict[str, list[CertificateHit]], list[str]]:
|
|
issuer_hits: dict[str, list[CertificateHit]] = defaultdict(list)
|
|
for hit in hits:
|
|
issuer_hits[primary_issuer_name(hit)].append(hit)
|
|
ordered_issuers = sorted(
|
|
issuer_hits,
|
|
key=lambda issuer_name: (-len(issuer_hits[issuer_name]), issuer_name.casefold()),
|
|
)
|
|
return issuer_hits, ordered_issuers</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block clusters related items together so later code can analyze them as families instead of as isolated rows.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>group_hits_by_issuer</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## latex_escape
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def latex_escape(value: str) -> str:
|
|
replacements = {
|
|
"\\": r"\textbackslash{}",
|
|
"&": r"\&",
|
|
"%": r"\%",
|
|
"$": r"\$",
|
|
"#": r"\#",
|
|
"_": r"\_",
|
|
"{": r"\{",
|
|
"}": r"\}",
|
|
"~": r"\textasciitilde{}",
|
|
"^": r"\textasciicircum{}",
|
|
}
|
|
return "".join(replacements.get(char, char) for char in value)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>latex_escape</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## summarize_san_patterns
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def summarize_san_patterns(san_entries: list[str]) -> dict[str, Any]:
|
|
dns_entries = sorted({entry[4:] for entry in san_entries if entry.startswith("DNS:")}, key=str.casefold)
|
|
other_entries = sorted({entry for entry in san_entries if not entry.startswith("DNS:")}, key=str.casefold)
|
|
zone_counts: Counter[str] = Counter()
|
|
normalized_pattern_counts: Counter[str] = Counter()
|
|
wildcard_count = 0
|
|
numbered_count = 0
|
|
for domain in dns_entries:
|
|
normalized_domain = domain[2:] if domain.startswith("*.") else domain
|
|
if domain.startswith("*."):
|
|
wildcard_count += 1
|
|
if re.search(r"\d", normalized_domain):
|
|
numbered_count += 1
|
|
prefix_labels, tail = san_tail_split(normalized_domain)
|
|
zone_counts[tail] += 1
|
|
normalized_prefix = ".".join(re.sub(r"\d+", "#", label) for label in prefix_labels if label)
|
|
if normalized_prefix:
|
|
normalized_pattern_counts[f"{normalized_prefix}.{tail}"] += 1
|
|
else:
|
|
normalized_pattern_counts[tail] += 1
|
|
repeating_patterns = [
|
|
(pattern, count)
|
|
for pattern, count in normalized_pattern_counts.most_common(6)
|
|
if count > 1
|
|
]
|
|
return {
|
|
"dns_count": len(dns_entries),
|
|
"other_count": len(other_entries),
|
|
"wildcard_count": wildcard_count,
|
|
"numbered_count": numbered_count,
|
|
"zone_count": len(zone_counts),
|
|
"top_zones": zone_counts.most_common(6),
|
|
"repeating_patterns": repeating_patterns,
|
|
}</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block compresses many detailed rows into a smaller, easier-to-read summary.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>summarize_san_patterns</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## latex_status_badge
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def latex_status_badge(status: str) -> str:
|
|
return {
|
|
"not_revoked": r"\StatusOK{}",
|
|
"revoked": r"\StatusREV{}",
|
|
"unknown": r"\StatusUNK{}",
|
|
}[status]</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>latex_status_badge</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## latex_webpki_badge
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def latex_webpki_badge(value: bool) -> str:
|
|
return r"\WebPKIYes{}" if value else r"\WebPKINo{}"</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>latex_webpki_badge</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## render_markdown_report
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def render_markdown_report(
|
|
path: Path,
|
|
hits: list[CertificateHit],
|
|
groups: list[CertificateGroup],
|
|
stats: ScanStats,
|
|
issuer_trust: dict[str, IssuerTrustInfo],
|
|
) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
issuer_hits, ordered_issuers = group_hits_by_issuer(hits)
|
|
lines: list[str] = []
|
|
lines.append("# Certificate CN Family Report")
|
|
lines.append("")
|
|
lines.append(f"Generated: {stats.generated_at_utc}")
|
|
lines.append(f"Configured domains: {', '.join(stats.configured_domains)}")
|
|
lines.append("")
|
|
lines.append("## What This File Contains")
|
|
lines.append("")
|
|
lines.append("- Chapters are built from Subject CN construction only.")
|
|
lines.append("- If multiple concrete CNs share the same numbered schema, they are grouped together.")
|
|
lines.append("- Otherwise the chapter is one endpoint family; `www.` is grouped with the base name as a low-signal convenience.")
|
|
lines.append("- SAN entries are shown only inside each Subject CN subsection.")
|
|
lines.append("- All certificates shown here are verified leaf certificates.")
|
|
lines.append("")
|
|
lines.append("## Issuer Overview")
|
|
lines.append("")
|
|
for issuer_name in ordered_issuers:
|
|
trust = issuer_trust[issuer_name]
|
|
ca_ids = ", ".join(str(value) for value in sorted(trust.issuer_ca_ids))
|
|
trust_label = "YES" if trust.major_webpki else "NO"
|
|
lines.append(
|
|
f"- {issuer_name} | certificates={len(issuer_hits[issuer_name])} | WebPKI server-auth in major stores={trust_label} | ca_id={ca_ids}"
|
|
)
|
|
lines.append("")
|
|
lines.append("## Leaf-Certificate Assurance")
|
|
lines.append("")
|
|
lines.append("- SQL filter: `certificate_lifecycle.certificate_type = 'Certificate'`")
|
|
lines.append("- Local filter: precertificate poison absent, `BasicConstraints.ca != true`, `KeyUsage.keyCertSign != true`")
|
|
lines.append(f"- Verified leaf certificates kept: {stats.unique_leaf_certificates}")
|
|
lines.append(f"- Non-leaf filtered after download: {stats.verification.non_leaf_filtered}")
|
|
lines.append(f"- Precertificate poison filtered after download: {stats.verification.precertificate_poison_filtered}")
|
|
lines.append("")
|
|
for issuer_position, issuer_name in enumerate(ordered_issuers, start=1):
|
|
trust = issuer_trust[issuer_name]
|
|
issuer_title = f"Issuer {issuer_position:02d} {issuer_name}"
|
|
lines.append(f"## {issuer_title}")
|
|
lines.append("")
|
|
lines.append(f"- Certificates under issuer: {len(issuer_hits[issuer_name])}")
|
|
lines.append(
|
|
f"- WebPKI server-auth in major stores (Mozilla, Chrome, Apple, Microsoft, Android): {'YES' if trust.major_webpki else 'NO'}"
|
|
)
|
|
lines.append(
|
|
f"- Server-auth trust contexts seen in crt.sh live trust data: {', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none'}"
|
|
)
|
|
lines.append(f"- Issuer CA IDs: {', '.join(str(value) for value in sorted(trust.issuer_ca_ids))}")
|
|
lines.append("")
|
|
issuer_groups = build_groups(issuer_hits[issuer_name])
|
|
for family_index, group in enumerate(issuer_groups, start=1):
|
|
member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices]
|
|
chapter_title = f"Family {family_index:02d} {describe_group_basis(group)}"
|
|
lines.append(f"### {chapter_title}")
|
|
lines.append("")
|
|
lines.append(f"- Certificates in chapter: {group.member_count}")
|
|
lines.append(f"- Concrete Subject CNs: {group.distinct_subject_cn_count}")
|
|
lines.append(f"- Distinct SAN profiles in chapter: {group.distinct_exact_content_count}")
|
|
lines.append(f"- Matched domains: {', '.join(sorted(group.matched_domains))}")
|
|
lines.append(f"- Family validity span: {utc_iso(group.valid_from_min)} -> {utc_iso(group.valid_to_max)}")
|
|
if group.first_seen_min and group.first_seen_max:
|
|
lines.append(f"- First seen span: {utc_iso(group.first_seen_min)} -> {utc_iso(group.first_seen_max)}")
|
|
lines.append(f"- Revocation mix: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown")
|
|
lines.append("")
|
|
|
|
hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list)
|
|
for hit in member_hits:
|
|
hits_by_subject[hit.subject_cn].append(hit)
|
|
|
|
ordered_subjects = sorted(
|
|
hits_by_subject.keys(),
|
|
key=lambda value: (canonicalize_subject_cn(value), value.casefold()),
|
|
)
|
|
for subject_cn in ordered_subjects:
|
|
subject_hits = sorted(
|
|
hits_by_subject[subject_cn],
|
|
key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256),
|
|
)
|
|
lines.append(f"#### Subject CN: `{subject_cn}`")
|
|
lines.append("")
|
|
lines.append(f"- Certificates under this CN: {len(subject_hits)}")
|
|
lines.append(f"- Validity span under this CN: {utc_iso(min(hit.validity_not_before for hit in subject_hits))} -> {utc_iso(max(hit.validity_not_after for hit in subject_hits))}")
|
|
san_profiles: dict[tuple[str, ...], list[CertificateHit]] = defaultdict(list)
|
|
for hit in subject_hits:
|
|
san_profiles[tuple(hit.san_entries)].append(hit)
|
|
profile_size_counts = Counter(len(profile) for profile in san_profiles)
|
|
unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries})
|
|
lines.append(f"- Distinct SAN profiles under this CN: {len(san_profiles)}")
|
|
lines.append(
|
|
"- SAN profile sizes seen: "
|
|
+ ", ".join(
|
|
f"{size} SAN x {count}"
|
|
for size, count in sorted(profile_size_counts.items())
|
|
)
|
|
)
|
|
lines.append("")
|
|
lines.append("Validity history")
|
|
lines.append("")
|
|
|
|
for hit in subject_hits:
|
|
crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids))
|
|
lines.append(
|
|
f"- [{status_marker(hit.revocation_status)}] {utc_iso(hit.validity_not_before)} -> {utc_iso(hit.validity_not_after)} | SANs={len(hit.san_entries)} | crt.sh={crtsh_ids} | {one_line_revocation(hit)}"
|
|
)
|
|
lines.append("")
|
|
lines.append("SAN structure")
|
|
lines.append("")
|
|
lines.append("```text")
|
|
for tree_line in build_san_tree_lines(unique_san_entries):
|
|
lines.append(tree_line)
|
|
lines.append("```")
|
|
lines.append("")
|
|
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
lines.append("## Statistics")
|
|
lines.append("")
|
|
lines.append(f"- Unique leaf certificates: {stats.unique_leaf_certificates}")
|
|
lines.append(f"- CN-family chapters: {stats.groups_total}")
|
|
lines.append(f"- Chapters with more than one certificate: {stats.groups_multi_member}")
|
|
lines.append(f"- Single-certificate chapters: {stats.groups_singleton}")
|
|
lines.append(f"- Numbered CN pattern chapters: {stats.groups_by_type.get('numbered_cn_pattern', 0)}")
|
|
lines.append(f"- Exact endpoint chapters: {stats.groups_by_type.get('exact_endpoint_family', 0)}")
|
|
lines.append("")
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Writes the raw inventory appendix as readable Markdown.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Current hits, groups, and trust data. → <strong>render_markdown_report</strong> → Produces the Markdown inventory appendix.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## render_latex_report
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def render_latex_report(
|
|
path: Path,
|
|
hits: list[CertificateHit],
|
|
groups: list[CertificateGroup],
|
|
stats: ScanStats,
|
|
issuer_trust: dict[str, IssuerTrustInfo],
|
|
show_page_numbers: bool = True,
|
|
) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
issuer_hits, ordered_issuers = group_hits_by_issuer(hits)
|
|
revoked_total = sum(1 for hit in hits if hit.revocation_status == "revoked")
|
|
unknown_total = sum(1 for hit in hits if hit.revocation_status == "unknown")
|
|
not_revoked_total = sum(1 for hit in hits if hit.revocation_status == "not_revoked")
|
|
|
|
lines: list[str] = [
|
|
r"\documentclass[11pt]{article}",
|
|
r"\usepackage[a4paper,margin=18mm]{geometry}",
|
|
r"\usepackage{fontspec}",
|
|
r"\usepackage[table]{xcolor}",
|
|
r"\usepackage{microtype}",
|
|
r"\usepackage{hyperref}",
|
|
r"\usepackage{xurl}",
|
|
r"\usepackage{array}",
|
|
r"\usepackage{booktabs}",
|
|
r"\usepackage{tabularx}",
|
|
r"\usepackage{longtable}",
|
|
r"\usepackage{enumitem}",
|
|
r"\usepackage{titlesec}",
|
|
r"\usepackage[most]{tcolorbox}",
|
|
r"\usepackage{fancyvrb}",
|
|
r"\usepackage{needspace}",
|
|
r"\defaultfontfeatures{Ligatures=TeX,Scale=MatchLowercase}",
|
|
r"\definecolor{Ink}{HTML}{17202A}",
|
|
r"\definecolor{Muted}{HTML}{667085}",
|
|
r"\definecolor{Line}{HTML}{D0D5DD}",
|
|
r"\definecolor{Panel}{HTML}{F8FAFC}",
|
|
r"\definecolor{Accent}{HTML}{0F766E}",
|
|
r"\definecolor{AccentSoft}{HTML}{E6F4F1}",
|
|
r"\definecolor{AccentLine}{HTML}{74C4B8}",
|
|
r"\definecolor{Warn}{HTML}{9A6700}",
|
|
r"\definecolor{WarnSoft}{HTML}{FFF4DB}",
|
|
r"\definecolor{Danger}{HTML}{B42318}",
|
|
r"\definecolor{DangerSoft}{HTML}{FEE4E2}",
|
|
r"\definecolor{OkText}{HTML}{065F46}",
|
|
r"\definecolor{OkSoft}{HTML}{DCFCE7}",
|
|
r"\definecolor{UnknownText}{HTML}{9A6700}",
|
|
r"\definecolor{UnknownSoft}{HTML}{FEF3C7}",
|
|
r"\hypersetup{colorlinks=true,linkcolor=Accent,urlcolor=Accent,pdfauthor={CertTransparencySearch},pdftitle={Certificate Transparency Endpoint Atlas}}",
|
|
r"\setlength{\parindent}{0pt}",
|
|
r"\setlength{\parskip}{6pt}",
|
|
r"\setlength{\emergencystretch}{3em}",
|
|
r"\setlength{\footskip}{24pt}",
|
|
r"\setlength{\tabcolsep}{4.2pt}",
|
|
r"\renewcommand{\arraystretch}{1.12}",
|
|
r"\raggedbottom",
|
|
r"\setcounter{tocdepth}{2}",
|
|
rf"\pagestyle{{{'plain' if show_page_numbers else 'empty'}}}",
|
|
r"\titleformat{\section}{\sffamily\bfseries\LARGE\color{Ink}\raggedright}{\thesection}{0.8em}{}",
|
|
r"\titleformat{\subsection}{\sffamily\bfseries\Large\color{Ink}\raggedright}{\thesubsection}{0.8em}{}",
|
|
r"\titleformat{\subsubsection}{\sffamily\bfseries\normalsize\color{Ink}\raggedright}{\thesubsubsection}{0.8em}{}",
|
|
r"\tcbset{",
|
|
r" panel/.style={enhanced,breakable,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=white,colframe=Line},",
|
|
r" hero/.style={panel,colback=Ink,colframe=Ink,left=14pt,right=14pt,top=14pt,bottom=14pt},",
|
|
r" summary/.style={panel,colback=Panel,colframe=Line},",
|
|
r" issuerpanel/.style={panel,colback=Panel,colframe=Ink!45},",
|
|
r" familypanel/.style={panel,colback=AccentSoft,colframe=AccentLine},",
|
|
r" subjectpanel/.style={panel,colback=white,colframe=Line},",
|
|
r" treepanel/.style={enhanced,boxrule=0.55pt,arc=3pt,left=9pt,right=9pt,top=8pt,bottom=8pt,colback=Panel,colframe=AccentLine},",
|
|
r"}",
|
|
r"\newcommand{\DomainChip}[1]{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=AccentSoft]{\sffamily\footnotesize\texttt{#1}}}",
|
|
r"\newcommand{\MetricChip}[2]{\tcbox[on line,boxrule=0pt,arc=3pt,left=6pt,right=6pt,top=3pt,bottom=3pt,colback=Panel]{\sffamily\footnotesize\textcolor{Muted}{#1}\hspace{0.45em}\textbf{#2}}}",
|
|
r"\newcommand{\StatusOK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{OK}}}",
|
|
r"\newcommand{\StatusREV}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{REV}}}",
|
|
r"\newcommand{\StatusUNK}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=UnknownSoft]{\sffamily\bfseries\footnotesize\textcolor{UnknownText}{UNK}}}",
|
|
r"\newcommand{\WebPKIYes}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=OkSoft]{\sffamily\bfseries\footnotesize\textcolor{OkText}{WebPKI: YES}}}",
|
|
r"\newcommand{\WebPKINo}{\tcbox[on line,boxrule=0pt,arc=3pt,left=5pt,right=5pt,top=2pt,bottom=2pt,colback=DangerSoft]{\sffamily\bfseries\footnotesize\textcolor{Danger}{WebPKI: NO}}}",
|
|
r"\begin{document}",
|
|
r"\begin{titlepage}",
|
|
r"\thispagestyle{empty}",
|
|
r"\vspace*{20mm}",
|
|
r"\begin{tcolorbox}[hero]",
|
|
r"{\color{white}\sffamily\bfseries\fontsize{24}{28}\selectfont Certificate Transparency Endpoint Atlas\par}",
|
|
r"\vspace{4pt}",
|
|
r"{\color{white}\Large Currently valid leaf certificates matching the configured domains\par}",
|
|
r"\vspace{12pt}",
|
|
r"{\color{white}\sffamily\small This artefact is optimized for review: issuer-first navigation, CN-family grouping, certificate timelines, and SAN structure blocks designed to be read rather than decoded.}",
|
|
r"\end{tcolorbox}",
|
|
r"\vspace{10mm}",
|
|
r"\begin{tcolorbox}[summary]",
|
|
rf"\textbf{{Generated}}: {latex_escape(stats.generated_at_utc)}\par",
|
|
r"\textbf{Configured domains}: " + " ".join(
|
|
rf"\DomainChip{{{latex_escape(domain)}}}" for domain in stats.configured_domains
|
|
),
|
|
r"\par\medskip",
|
|
r"\MetricChip{Leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " "
|
|
+ r"\MetricChip{CN families}{" + str(stats.groups_total) + r"}" + " "
|
|
+ r"\MetricChip{Numbered families}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " "
|
|
+ r"\MetricChip{Exact families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}",
|
|
r"\par\medskip",
|
|
r"\MetricChip{Not revoked}{" + str(not_revoked_total) + r"}" + " "
|
|
+ r"\MetricChip{Revoked}{" + str(revoked_total) + r"}" + " "
|
|
+ r"\MetricChip{Unknown}{" + str(unknown_total) + r"}",
|
|
r"\end{tcolorbox}",
|
|
r"\vfill",
|
|
r"{\sffamily\small\textcolor{Muted}{Same scan, three outputs: Markdown for editor preview, LaTeX for source control, PDF for distribution.}}",
|
|
r"\end{titlepage}",
|
|
r"\tableofcontents",
|
|
r"\clearpage",
|
|
r"\section*{Executive Summary}",
|
|
r"\addcontentsline{toc}{section}{Executive Summary}",
|
|
r"\begin{tcolorbox}[summary]",
|
|
r"\textbf{Reading guide}\par",
|
|
r"Major chapters are exact issuer names. Inside each issuer, families are derived only from the construction of the Subject CN. Each concrete Subject CN then gets its own certificate timeline and a SAN structure panel.\par",
|
|
r"\medskip",
|
|
r"\textbf{Leaf-only assurance}\par",
|
|
r"SQL excludes entries whose lifecycle type is not \texttt{Certificate}. Local parsing then rejects any artifact with precertificate poison, \texttt{BasicConstraints.ca = true}, or \texttt{KeyUsage.keyCertSign = true}.",
|
|
r"\end{tcolorbox}",
|
|
r"\begin{tcolorbox}[summary]",
|
|
r"\textbf{Issuer landscape}\par",
|
|
r"\medskip",
|
|
r"\begin{tabularx}{\linewidth}{>{\raggedright\arraybackslash}X >{\raggedleft\arraybackslash}p{1.7cm} >{\raggedleft\arraybackslash}p{1.9cm} >{\raggedleft\arraybackslash}p{2.0cm}}",
|
|
r"\toprule",
|
|
r"Issuer & Certificates & Share & WebPKI \\",
|
|
r"\midrule",
|
|
]
|
|
|
|
total_hits = len(hits) if hits else 1
|
|
for issuer_name in ordered_issuers:
|
|
issuer_count = len(issuer_hits[issuer_name])
|
|
share = f"{issuer_count / total_hits:.1%}"
|
|
lines.append(
|
|
rf"{latex_escape(issuer_name)} & {issuer_count} & {latex_escape(share)} & {latex_webpki_badge(issuer_trust[issuer_name].major_webpki)} \\"
|
|
)
|
|
lines.extend(
|
|
[
|
|
r"\bottomrule",
|
|
r"\end{tabularx}",
|
|
r"\end{tcolorbox}",
|
|
]
|
|
)
|
|
|
|
for issuer_position, issuer_name in enumerate(ordered_issuers, start=1):
|
|
trust = issuer_trust[issuer_name]
|
|
issuer_groups = build_groups(issuer_hits[issuer_name])
|
|
lines.extend(
|
|
[
|
|
r"\clearpage",
|
|
rf"\section{{Issuer {issuer_position:02d}: {latex_escape(issuer_name)}}}",
|
|
r"\begin{tcolorbox}[issuerpanel]",
|
|
r"\MetricChip{Certificates}{" + str(len(issuer_hits[issuer_name])) + r"}" + " "
|
|
+ r"\MetricChip{Families}{" + str(len(issuer_groups)) + r"}" + " "
|
|
+ latex_webpki_badge(trust.major_webpki),
|
|
r"\par\medskip",
|
|
rf"\textbf{{Trust contexts seen in crt.sh live data}}: {latex_escape(', '.join(sorted(trust.server_auth_contexts)) if trust.server_auth_contexts else 'none')}\par",
|
|
rf"\textbf{{Issuer CA IDs}}: {latex_escape(', '.join(str(value) for value in sorted(trust.issuer_ca_ids)))}",
|
|
r"\end{tcolorbox}",
|
|
]
|
|
)
|
|
for family_index, group in enumerate(issuer_groups, start=1):
|
|
member_hits = [issuer_hits[issuer_name][index] for index in group.member_indices]
|
|
lines.extend(
|
|
[
|
|
r"\Needspace{14\baselineskip}",
|
|
rf"\subsection{{Family {family_index:02d}: {latex_escape(describe_group_basis(group).replace('`', ''))}}}",
|
|
r"\begin{tcolorbox}[familypanel]",
|
|
r"\MetricChip{Certificates}{" + str(group.member_count) + r"}" + " "
|
|
+ r"\MetricChip{Concrete CNs}{" + str(group.distinct_subject_cn_count) + r"}" + " "
|
|
+ r"\MetricChip{Distinct SAN profiles}{" + str(group.distinct_exact_content_count) + r"}",
|
|
r"\par\medskip",
|
|
rf"\textbf{{Matched domains}}: {' '.join(rf'\DomainChip{{{latex_escape(domain)}}}' for domain in sorted(group.matched_domains))}\par",
|
|
rf"\textbf{{Family validity span}}: \texttt{{{latex_escape(utc_iso(group.valid_from_min))}}} to \texttt{{{latex_escape(utc_iso(group.valid_to_max))}}}\par",
|
|
(
|
|
rf"\textbf{{First seen span}}: \texttt{{{latex_escape(utc_iso(group.first_seen_min))}}} to \texttt{{{latex_escape(utc_iso(group.first_seen_max))}}}\par"
|
|
if group.first_seen_min and group.first_seen_max
|
|
else ""
|
|
),
|
|
rf"\textbf{{Revocation mix}}: {group.revocation_counts.get('revoked', 0)} revoked, {group.revocation_counts.get('not_revoked', 0)} not revoked, {group.revocation_counts.get('unknown', 0)} unknown",
|
|
r"\end{tcolorbox}",
|
|
]
|
|
)
|
|
|
|
hits_by_subject: dict[str, list[CertificateHit]] = defaultdict(list)
|
|
for hit in member_hits:
|
|
hits_by_subject[hit.subject_cn].append(hit)
|
|
ordered_subjects = sorted(
|
|
hits_by_subject.keys(),
|
|
key=lambda value: (canonicalize_subject_cn(value), value.casefold()),
|
|
)
|
|
for subject_cn in ordered_subjects:
|
|
subject_hits = sorted(
|
|
hits_by_subject[subject_cn],
|
|
key=lambda hit: (hit.validity_not_before, hit.validity_not_after, hit.fingerprint_sha256),
|
|
)
|
|
san_summary = summarize_san_patterns(sorted({entry for hit in subject_hits for entry in hit.san_entries}))
|
|
unique_san_entries = sorted({entry for hit in subject_hits for entry in hit.san_entries})
|
|
lines.extend(
|
|
[
|
|
r"\Needspace{18\baselineskip}",
|
|
rf"\subsubsection{{Subject CN: {latex_escape(subject_cn)}}}",
|
|
r"\begin{tcolorbox}[subjectpanel]",
|
|
r"\MetricChip{Certificates under this CN}{" + str(len(subject_hits)) + r"}" + " "
|
|
+ r"\MetricChip{Distinct SAN profiles}{" + str(len({tuple(hit.san_entries) for hit in subject_hits})) + r"}" + " "
|
|
+ r"\MetricChip{Unique SAN entries}{" + str(len(unique_san_entries)) + r"}",
|
|
r"\par\medskip",
|
|
rf"\textbf{{Validity span under this CN}}: \texttt{{{latex_escape(utc_iso(min(hit.validity_not_before for hit in subject_hits)))}}} to \texttt{{{latex_escape(utc_iso(max(hit.validity_not_after for hit in subject_hits)))}}}",
|
|
r"\par\medskip",
|
|
r"\textbf{Certificate timeline}",
|
|
r"\begin{itemize}[leftmargin=1.4em,itemsep=0.55em,topsep=0.4em]",
|
|
]
|
|
)
|
|
for hit in subject_hits:
|
|
crtsh_ids = ", ".join(str(value) for value in sorted(hit.crtsh_certificate_ids))
|
|
lines.extend(
|
|
[
|
|
r"\item "
|
|
+ latex_status_badge(hit.revocation_status)
|
|
+ " "
|
|
+ rf"\texttt{{{latex_escape(utc_iso(hit.validity_not_before))}}} to \texttt{{{latex_escape(utc_iso(hit.validity_not_after))}}}",
|
|
rf"\newline \textcolor{{Muted}}{{SANs: {len(hit.san_entries)} \quad crt.sh: {latex_escape(crtsh_ids)} \quad {latex_escape(one_line_revocation(hit))}}}",
|
|
]
|
|
)
|
|
tree_chunks = build_san_tree_chunks_with_style(
|
|
unique_san_entries,
|
|
ascii_only=True,
|
|
max_lines_per_chunk=24,
|
|
)
|
|
lines.extend(
|
|
[
|
|
r"\end{itemize}",
|
|
r"\medskip",
|
|
r"\textbf{SAN pattern snapshot}",
|
|
r"\par\medskip",
|
|
r"\MetricChip{DNS SANs}{" + str(san_summary["dns_count"]) + r"}" + " "
|
|
+ r"\MetricChip{Other SANs}{" + str(san_summary["other_count"]) + r"}" + " "
|
|
+ r"\MetricChip{Wildcard SANs}{" + str(san_summary["wildcard_count"]) + r"}" + " "
|
|
+ r"\MetricChip{Numbered SANs}{" + str(san_summary["numbered_count"]) + r"}" + " "
|
|
+ r"\MetricChip{DNS zones}{" + str(san_summary["zone_count"]) + r"}",
|
|
r"\par\medskip",
|
|
rf"\textbf{{Dominant zones}}: {latex_escape(', '.join(f'{zone} ({count})' for zone, count in san_summary['top_zones']) if san_summary['top_zones'] else 'none')}",
|
|
r"\par",
|
|
rf"\textbf{{Repeating host schemas}}: {latex_escape(', '.join(f'{pattern} ({count})' for pattern, count in san_summary['repeating_patterns']) if san_summary['repeating_patterns'] else 'mostly one-off SAN hostnames')}",
|
|
(
|
|
rf"\par\medskip\textcolor{{Muted}}{{The SAN structure below is shown in {len(tree_chunks)} intact panels so the visual grouping is not broken across a page.}}"
|
|
if len(tree_chunks) > 1
|
|
else ""
|
|
),
|
|
r"\end{tcolorbox}",
|
|
]
|
|
)
|
|
for tree_chunk_index, tree_lines in enumerate(tree_chunks, start=1):
|
|
tree_title = (
|
|
"SAN Structure"
|
|
if len(tree_chunks) == 1
|
|
else f"SAN Structure ({tree_chunk_index}/{len(tree_chunks)})"
|
|
)
|
|
tree_needspace = max(12, min(len(tree_lines) + 7, 32))
|
|
lines.extend(
|
|
[
|
|
rf"\Needspace{{{tree_needspace}\baselineskip}}",
|
|
rf"\begin{{tcolorbox}}[treepanel,title={{{latex_escape(tree_title)}}}]",
|
|
r"\begin{Verbatim}[fontsize=\footnotesize]",
|
|
]
|
|
)
|
|
lines.extend(tree_lines)
|
|
lines.extend(
|
|
[
|
|
r"\end{Verbatim}",
|
|
r"\end{tcolorbox}",
|
|
]
|
|
)
|
|
|
|
lines.extend(
|
|
[
|
|
r"\clearpage",
|
|
r"\section*{Statistics}",
|
|
r"\addcontentsline{toc}{section}{Statistics}",
|
|
r"\begin{tcolorbox}[summary]",
|
|
r"\MetricChip{Unique leaf certificates}{" + str(stats.unique_leaf_certificates) + r"}" + " "
|
|
+ r"\MetricChip{CN-family chapters}{" + str(stats.groups_total) + r"}" + " "
|
|
+ r"\MetricChip{Multi-certificate chapters}{" + str(stats.groups_multi_member) + r"}" + " "
|
|
+ r"\MetricChip{Singleton chapters}{" + str(stats.groups_singleton) + r"}",
|
|
r"\par\medskip",
|
|
r"\MetricChip{Numbered CN patterns}{" + str(stats.groups_by_type.get("numbered_cn_pattern", 0)) + r"}" + " "
|
|
+ r"\MetricChip{Exact endpoint families}{" + str(stats.groups_by_type.get("exact_endpoint_family", 0)) + r"}" + " "
|
|
+ r"\MetricChip{Non-leaf filtered}{" + str(stats.verification.non_leaf_filtered) + r"}" + " "
|
|
+ r"\MetricChip{Precert poison filtered}{" + str(stats.verification.precertificate_poison_filtered) + r"}",
|
|
r"\end{tcolorbox}",
|
|
r"\end{document}",
|
|
]
|
|
)
|
|
path.write_text("\n".join(line for line in lines if line != "") + "\n", encoding="utf-8")</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Writes the raw inventory appendix as LaTeX for PDF assembly.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Current hits, groups, and trust data. → <strong>render_latex_report</strong> → Produces the LaTeX appendix source that later becomes PDF.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## cleanup_latex_auxiliary_files
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def cleanup_latex_auxiliary_files(tex_path: Path, pdf_output: Path) -> None:
|
|
generated_base = pdf_output.parent / tex_path.stem
|
|
for suffix in (".aux", ".log", ".out", ".toc"):
|
|
candidate = generated_base.with_suffix(suffix)
|
|
if candidate.exists():
|
|
candidate.unlink()</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This function is one of the building blocks inside `ct_scan.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>cleanup_latex_auxiliary_files</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## compile_latex_to_pdf
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def compile_latex_to_pdf(tex_path: Path, pdf_output: Path, engine: str) -> None:
|
|
engine_path = shutil.which(engine)
|
|
if engine_path is None:
|
|
raise RuntimeError(f"LaTeX engine not found: {engine}")
|
|
tex_path = tex_path.resolve()
|
|
pdf_output = pdf_output.resolve()
|
|
pdf_output.parent.mkdir(parents=True, exist_ok=True)
|
|
compile_cmd = [
|
|
engine_path,
|
|
"-interaction=nonstopmode",
|
|
"-halt-on-error",
|
|
"-output-directory",
|
|
str(pdf_output.parent),
|
|
str(tex_path),
|
|
]
|
|
for _ in range(2):
|
|
result = subprocess.run(
|
|
compile_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
message = (result.stdout + "\n" + result.stderr).strip()
|
|
raise RuntimeError(
|
|
"LaTeX compilation failed.\n"
|
|
+ "\n".join(message.splitlines()[-40:])
|
|
)
|
|
generated_pdf = pdf_output.parent / f"{tex_path.stem}.pdf"
|
|
if generated_pdf != pdf_output:
|
|
generated_pdf.replace(pdf_output)
|
|
cleanup_latex_auxiliary_files(tex_path, pdf_output)</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>Hands LaTeX to XeLaTeX and turns it into a finished PDF file.</p>
|
|
<p><strong>Flow arrows</strong></p><p>A finished `.tex` file. → <strong>compile_latex_to_pdf</strong> → Produces the human-readable PDF artifact.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## parse_args
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Search crt.sh for currently valid certificates matching configured domain fragments.",
|
|
)
|
|
parser.add_argument(
|
|
"--domains-file",
|
|
type=Path,
|
|
default=Path("domains.local.txt"),
|
|
help="Text file containing one domain fragment per line.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=Path("output/current-valid-certificates.md"),
|
|
help="Readable single-file markdown report to write.",
|
|
)
|
|
parser.add_argument(
|
|
"--latex-output",
|
|
type=Path,
|
|
default=Path("output/current-valid-certificates.tex"),
|
|
help="Readable single-file LaTeX report to write.",
|
|
)
|
|
parser.add_argument(
|
|
"--pdf-output",
|
|
type=Path,
|
|
default=Path("output/current-valid-certificates.pdf"),
|
|
help="Compiled PDF report to write.",
|
|
)
|
|
parser.add_argument(
|
|
"--pdf-engine",
|
|
default="xelatex",
|
|
help="LaTeX engine used to compile the PDF report.",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-pdf",
|
|
action="store_true",
|
|
help="Write Markdown and LaTeX outputs but skip PDF compilation.",
|
|
)
|
|
parser.add_argument(
|
|
"--cache-dir",
|
|
type=Path,
|
|
default=Path(".cache/ct-search"),
|
|
help="Directory for cached per-domain query results.",
|
|
)
|
|
parser.add_argument(
|
|
"--cache-ttl-seconds",
|
|
type=int,
|
|
default=900,
|
|
help="Reuse cached database results younger than this many seconds.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-candidates-per-domain",
|
|
type=int,
|
|
default=10000,
|
|
help="Maximum raw crt.sh identity rows to inspect per domain fragment.",
|
|
)
|
|
parser.add_argument(
|
|
"--retries",
|
|
type=int,
|
|
default=3,
|
|
help="Retry count for replica/recovery conflicts from crt.sh.",
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
action="store_true",
|
|
help="Suppress progress output.",
|
|
)
|
|
return parser.parse_args()</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>This block defines the command-line knobs for the file: input paths, cache settings, output paths, and other runtime switches.</p>
|
|
<p><strong>Flow arrows</strong></p><p>Earlier blocks or operator input feed this block. → <strong>parse_args</strong> → Later blocks in the same file or in the next analytical stage consume its output.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## main
|
|
|
|
<table style="width:100%; table-layout:fixed; border-collapse:collapse;">
|
|
<tr>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<pre style="margin:0; padding:14px; overflow-x:auto; background:#111827; color:#e5e7eb; border-radius:10px; border:1px solid #374151; font-size:12px; line-height:1.45;"><code class="language-python">def main() -> int:
|
|
args = parse_args()
|
|
domains = load_domains(args.domains_file)
|
|
all_records: list[DatabaseRecord] = []
|
|
for domain in domains:
|
|
cached = load_cached_records(
|
|
cache_dir=args.cache_dir,
|
|
domain=domain,
|
|
ttl_seconds=args.cache_ttl_seconds,
|
|
max_candidates=args.max_candidates_per_domain,
|
|
)
|
|
if cached is not None:
|
|
if not args.quiet:
|
|
print(f"[cache] domain={domain} records={len(cached)}", file=sys.stderr)
|
|
all_records.extend(cached)
|
|
continue
|
|
if not args.quiet:
|
|
print(f"[query] domain={domain}", file=sys.stderr)
|
|
records = query_domain(
|
|
domain=domain,
|
|
max_candidates=args.max_candidates_per_domain,
|
|
attempts=args.retries,
|
|
verbose=not args.quiet,
|
|
)
|
|
if not args.quiet:
|
|
print(f"[done] domain={domain} records={len(records)}", file=sys.stderr)
|
|
store_cached_records(args.cache_dir, domain, args.max_candidates_per_domain, records)
|
|
all_records.extend(records)
|
|
hits, verification = build_hits(all_records)
|
|
groups = build_groups(hits)
|
|
scan_stats = ScanStats(
|
|
generated_at_utc=utc_iso(datetime.now(UTC)),
|
|
configured_domains=domains,
|
|
unique_leaf_certificates=len(hits),
|
|
groups_total=len(groups),
|
|
groups_multi_member=sum(1 for group in groups if group.member_count > 1),
|
|
groups_singleton=sum(1 for group in groups if group.member_count == 1),
|
|
groups_by_type=dict(Counter(group.group_type for group in groups)),
|
|
verification=verification,
|
|
)
|
|
issuer_trust = query_issuer_trust(hits)
|
|
render_markdown_report(args.output, hits, groups, scan_stats, issuer_trust)
|
|
render_latex_report(args.latex_output, hits, groups, scan_stats, issuer_trust)
|
|
if not args.skip_pdf:
|
|
compile_latex_to_pdf(args.latex_output, args.pdf_output, args.pdf_engine)
|
|
if not args.quiet:
|
|
print(
|
|
f"[report] hits={len(hits)} groups={len(groups)} markdown={args.output} latex={args.latex_output}"
|
|
+ ("" if args.skip_pdf else f" pdf={args.pdf_output}"),
|
|
file=sys.stderr,
|
|
)
|
|
return 0</code></pre>
|
|
</td>
|
|
<td style="width:50%; vertical-align:top; padding:8px;">
|
|
<p><strong>What this block is doing</strong></p><p>The standalone command-line entrypoint for the inventory scanner.</p>
|
|
<p><strong>Flow arrows</strong></p><p>CLI arguments from the operator. → <strong>main</strong> → Runs the whole scanner end to end.</p>
|
|
<p><strong>How to think about it</strong></p><p>Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?</p>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|