CertTransparencySearch/ct_usage_assessment.py
2026-03-29 11:40:06 +02:00

453 lines
18 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import hashlib
import json
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass
from datetime import UTC, datetime
from pathlib import Path
from cryptography import x509
from cryptography.x509.oid import ExtensionOID
import ct_scan
SERVER_AUTH_OID = "1.3.6.1.5.5.7.3.1"
CLIENT_AUTH_OID = "1.3.6.1.5.5.7.3.2"
CODE_SIGNING_OID = "1.3.6.1.5.5.7.3.3"
EMAIL_PROTECTION_OID = "1.3.6.1.5.5.7.3.4"
TIME_STAMPING_OID = "1.3.6.1.5.5.7.3.8"
OCSP_SIGNING_OID = "1.3.6.1.5.5.7.3.9"
ANY_EXTENDED_KEY_USAGE_OID = "2.5.29.37.0"
EKU_LABELS = {
SERVER_AUTH_OID: "serverAuth",
CLIENT_AUTH_OID: "clientAuth",
CODE_SIGNING_OID: "codeSigning",
EMAIL_PROTECTION_OID: "emailProtection",
TIME_STAMPING_OID: "timeStamping",
OCSP_SIGNING_OID: "OCSPSigning",
ANY_EXTENDED_KEY_USAGE_OID: "anyExtendedKeyUsage",
}
@dataclass
class PurposeClassification:
fingerprint_sha256: str
subject_cn: str
issuer_name: str
category: str
eku_oids: list[str]
key_usage_flags: list[str]
valid_from_utc: str
valid_to_utc: str
matched_domains: list[str]
san_dns_names: list[str]
@dataclass
class AssessmentSummary:
generated_at_utc: str
source_cache_domains: list[str]
unique_leaf_certificates: int
category_counts: dict[str, int]
eku_templates: dict[str, int]
key_usage_templates: dict[str, int]
issuer_breakdown: dict[str, dict[str, int]]
validity_start_years: dict[str, dict[str, int]]
san_type_counts: dict[str, int]
subject_cn_in_dns_san_count: int
subject_cn_not_in_dns_san_count: int
dual_eku_subject_cns_with_server_only_sibling: list[str]
dual_eku_subject_cns_without_server_only_sibling: list[str]
def utc_now_iso() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Assess certificate intended usage from EKU and KeyUsage."
)
parser.add_argument(
"--domains-file",
type=Path,
default=Path("domains.local.txt"),
help="Configurable list of search domains, one per line.",
)
parser.add_argument(
"--cache-dir",
type=Path,
default=Path(".cache/ct-search"),
help="Directory used by ct_scan.py for cached CT results.",
)
parser.add_argument(
"--cache-ttl-seconds",
type=int,
default=86400,
help="Reuse cached CT results up to this age before refreshing from crt.sh.",
)
parser.add_argument(
"--max-candidates",
type=int,
default=10000,
help="Maximum raw crt.sh identity rows to inspect per configured domain.",
)
parser.add_argument(
"--attempts",
type=int,
default=3,
help="Retry attempts for live crt.sh database queries.",
)
parser.add_argument(
"--markdown-output",
type=Path,
default=Path("output/certificate-purpose-assessment.md"),
help="Human-readable assessment output.",
)
parser.add_argument(
"--json-output",
type=Path,
default=Path("output/certificate-purpose-assessment.json"),
help="Machine-readable assessment output.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Print refresh activity to stderr.",
)
return parser.parse_args()
def load_records(
domains: list[str],
cache_dir: Path,
cache_ttl_seconds: int,
max_candidates: int,
attempts: int,
verbose: bool,
) -> list[ct_scan.DatabaseRecord]:
all_records: list[ct_scan.DatabaseRecord] = []
for domain in domains:
records = ct_scan.load_cached_records(cache_dir, domain, cache_ttl_seconds, max_candidates)
if records is None:
records = ct_scan.query_domain(domain, max_candidates=max_candidates, attempts=attempts, verbose=verbose)
ct_scan.store_cached_records(cache_dir, domain, max_candidates=max_candidates, records=records)
all_records.extend(records)
return all_records
def extract_eku_oids(cert: x509.Certificate) -> list[str]:
try:
extension = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
except x509.ExtensionNotFound:
return []
return sorted(oid.dotted_string for oid in extension.value)
def extract_key_usage_flags(cert: x509.Certificate) -> list[str]:
try:
key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value
except x509.ExtensionNotFound:
return []
flags: list[str] = []
for attribute in (
"digital_signature",
"content_commitment",
"key_encipherment",
"data_encipherment",
"key_agreement",
"key_cert_sign",
"crl_sign",
):
if getattr(key_usage, attribute):
flags.append(attribute)
if key_usage.key_agreement:
if key_usage.encipher_only:
flags.append("encipher_only")
if key_usage.decipher_only:
flags.append("decipher_only")
return flags
def classify_purpose(eku_oids: list[str]) -> str:
eku_set = set(eku_oids)
has_server = SERVER_AUTH_OID in eku_set or ANY_EXTENDED_KEY_USAGE_OID in eku_set
has_client = CLIENT_AUTH_OID in eku_set or ANY_EXTENDED_KEY_USAGE_OID in eku_set
has_code_signing = CODE_SIGNING_OID in eku_set
has_email = EMAIL_PROTECTION_OID in eku_set
if not eku_oids:
return "no_eku"
if has_server and not has_client and not has_code_signing and not has_email:
return "tls_server_only"
if has_server and has_client and not has_code_signing and not has_email:
return "tls_server_and_client"
if has_client and not has_server and not has_code_signing and not has_email:
return "client_auth_only"
if has_email and not has_server and not has_client and not has_code_signing:
return "smime_only"
if has_code_signing and not has_server and not has_client and not has_email:
return "code_signing_only"
return "mixed_or_other"
def format_eku_template(eku_oids: list[str]) -> str:
if not eku_oids:
return "(none)"
return ", ".join(EKU_LABELS.get(oid, oid) for oid in eku_oids)
def format_key_usage_template(flags: list[str]) -> str:
if not flags:
return "(missing)"
return ", ".join(flags)
def build_classifications(
hits: list[ct_scan.CertificateHit],
records: list[ct_scan.DatabaseRecord],
) -> list[PurposeClassification]:
certificates_by_fingerprint: dict[str, x509.Certificate] = {}
for record in records:
cert = x509.load_der_x509_certificate(record.certificate_der)
is_leaf, _reason = ct_scan.is_leaf_certificate(cert)
if not is_leaf:
continue
fingerprint_sha256 = hashlib.sha256(record.certificate_der).hexdigest()
certificates_by_fingerprint.setdefault(fingerprint_sha256, cert)
results: list[PurposeClassification] = []
for hit in hits:
cert = certificates_by_fingerprint[hit.fingerprint_sha256]
san_dns_names = sorted(entry[4:] for entry in hit.san_entries if entry.startswith("DNS:"))
results.append(
PurposeClassification(
fingerprint_sha256=hit.fingerprint_sha256,
subject_cn=hit.subject_cn,
issuer_name=ct_scan.primary_issuer_name(hit),
category=classify_purpose(extract_eku_oids(cert)),
eku_oids=extract_eku_oids(cert),
key_usage_flags=extract_key_usage_flags(cert),
valid_from_utc=ct_scan.utc_iso(hit.validity_not_before),
valid_to_utc=ct_scan.utc_iso(hit.validity_not_after),
matched_domains=sorted(hit.matched_domains),
san_dns_names=san_dns_names,
)
)
results.sort(
key=lambda item: (
item.category,
item.subject_cn.casefold(),
item.valid_from_utc,
item.fingerprint_sha256,
)
)
return results
def summarize(classifications: list[PurposeClassification], domains: list[str]) -> AssessmentSummary:
category_counts = Counter(item.category for item in classifications)
eku_templates = Counter(format_eku_template(item.eku_oids) for item in classifications)
key_usage_templates = Counter(format_key_usage_template(item.key_usage_flags) for item in classifications)
issuer_breakdown: dict[str, Counter[str]] = defaultdict(Counter)
validity_start_years: dict[str, Counter[str]] = defaultdict(Counter)
san_type_counts: Counter[str] = Counter()
subject_cn_in_dns_san_count = 0
subject_cn_not_in_dns_san_count = 0
categories_by_canonical_cn: dict[str, set[str]] = defaultdict(set)
for item in classifications:
issuer_breakdown[item.category][item.issuer_name] += 1
validity_start_years[item.category][item.valid_from_utc[:4]] += 1
san_type_counts["DNSName"] += len(item.san_dns_names)
if item.subject_cn in set(item.san_dns_names):
subject_cn_in_dns_san_count += 1
else:
subject_cn_not_in_dns_san_count += 1
categories_by_canonical_cn[ct_scan.canonicalize_subject_cn(item.subject_cn)].add(item.category)
dual_with_server_only = sorted(
canonical_cn
for canonical_cn, values in categories_by_canonical_cn.items()
if "tls_server_and_client" in values and "tls_server_only" in values
)
dual_without_server_only = sorted(
canonical_cn
for canonical_cn, values in categories_by_canonical_cn.items()
if values == {"tls_server_and_client"}
)
return AssessmentSummary(
generated_at_utc=utc_now_iso(),
source_cache_domains=domains,
unique_leaf_certificates=len(classifications),
category_counts=dict(category_counts),
eku_templates=dict(eku_templates.most_common()),
key_usage_templates=dict(key_usage_templates.most_common()),
issuer_breakdown={category: dict(counter.most_common()) for category, counter in issuer_breakdown.items()},
validity_start_years={
category: dict(sorted(counter.items()))
for category, counter in validity_start_years.items()
},
san_type_counts=dict(san_type_counts),
subject_cn_in_dns_san_count=subject_cn_in_dns_san_count,
subject_cn_not_in_dns_san_count=subject_cn_not_in_dns_san_count,
dual_eku_subject_cns_with_server_only_sibling=dual_with_server_only,
dual_eku_subject_cns_without_server_only_sibling=dual_without_server_only,
)
def render_markdown(summary: AssessmentSummary, classifications: list[PurposeClassification]) -> str:
lines: list[str] = []
lines.append("# Certificate Purpose Assessment")
lines.append("")
lines.append(f"Generated at: `{summary.generated_at_utc}`")
lines.append(f"Configured domains: `{', '.join(summary.source_cache_domains)}`")
lines.append("")
lines.append("## Headline Verdict")
lines.append("")
lines.append(f"- Unique current leaf certificates assessed: **{summary.unique_leaf_certificates}**")
lines.append(f"- TLS server only: **{summary.category_counts.get('tls_server_only', 0)}**")
lines.append(f"- TLS server and client auth: **{summary.category_counts.get('tls_server_and_client', 0)}**")
lines.append(f"- Client auth only: **{summary.category_counts.get('client_auth_only', 0)}**")
lines.append(f"- S/MIME only: **{summary.category_counts.get('smime_only', 0)}**")
lines.append(f"- Code signing only: **{summary.category_counts.get('code_signing_only', 0)}**")
lines.append(f"- Mixed or other: **{summary.category_counts.get('mixed_or_other', 0)}**")
lines.append(f"- No EKU: **{summary.category_counts.get('no_eku', 0)}**")
lines.append("")
lines.append("## What This Means")
lines.append("")
lines.append("- The corpus contains **only TLS-capable certificates**. There are no client-only, S/MIME, or code-signing certificates.")
lines.append("- All SAN entries seen in this corpus are DNS names.")
lines.append(f"- Subject CN appears literally in a DNS SAN for **{summary.subject_cn_in_dns_san_count} of {summary.unique_leaf_certificates}** certificates.")
lines.append("- The only ambiguity is whether to keep or set aside the certificates whose EKU allows both `serverAuth` and `clientAuth`.")
lines.append("")
lines.append("## Rework Options")
lines.append("")
lines.append(f"- Keep the full operational server corpus: **{summary.unique_leaf_certificates}** certificates.")
lines.append(f"- Keep only strict server-auth certificates: **{summary.category_counts.get('tls_server_only', 0)}** certificates.")
lines.append(f"- Create a review bucket for dual-EKU certificates: **{summary.category_counts.get('tls_server_and_client', 0)}** certificates.")
lines.append("")
lines.append("## EKU Templates")
lines.append("")
for template, count in summary.eku_templates.items():
lines.append(f"- `{template}`: {count}")
lines.append("")
lines.append("## KeyUsage Templates")
lines.append("")
for template, count in summary.key_usage_templates.items():
lines.append(f"- `{template}`: {count}")
lines.append("")
lines.append("## Issuer Breakdown")
lines.append("")
for category in sorted(summary.issuer_breakdown):
lines.append(f"### `{category}`")
lines.append("")
for issuer_name, count in summary.issuer_breakdown[category].items():
lines.append(f"- `{issuer_name}`: {count}")
lines.append("")
lines.append("## Time Pattern")
lines.append("")
dual_years = set(summary.validity_start_years.get("tls_server_and_client", {}))
server_years = set(summary.validity_start_years.get("tls_server_only", {}))
if dual_years and len(dual_years) == 1:
lines.append(
f"- The dual-EKU bucket is entirely composed of certificates whose current validity starts in **{next(iter(sorted(dual_years)))}**."
)
if dual_years and server_years and dual_years != server_years:
lines.append("- The year split suggests at least some change in issuance policy over time.")
else:
lines.append("- Time alone does not prove a migration. The stronger signal is the template split by issuer and EKU.")
lines.append("")
for category in sorted(summary.validity_start_years):
year_counts = ", ".join(f"{year}: {count}" for year, count in summary.validity_start_years[category].items())
lines.append(f"- `{category}`: {year_counts}")
lines.append("")
lines.append("## Interpretation")
lines.append("")
lines.append("- The `tls_server_and_client` certificates still look like hostname certificates, not user or robot identity certificates.")
lines.append("- Evidence: public DNS-style Subject CNs, DNS-only SANs, public WebPKI server-auth issuers, and no email or personal-name SAN material.")
lines.append("- The most plausible reading is **legacy or permissive server certificate templates** that also included `clientAuth`, not a separate client-certificate estate.")
lines.append("")
lines.append("## Dual-EKU Hostname Overlap")
lines.append("")
lines.append(
f"- Dual-EKU subject CN families that also have a strict server-only sibling: **{len(summary.dual_eku_subject_cns_with_server_only_sibling)}**"
)
lines.append(
f"- Dual-EKU subject CN families that currently appear only in the dual-EKU bucket: **{len(summary.dual_eku_subject_cns_without_server_only_sibling)}**"
)
lines.append("")
if summary.dual_eku_subject_cns_with_server_only_sibling:
lines.append("### Dual-EKU Families With Server-Only Siblings")
lines.append("")
for subject_cn in summary.dual_eku_subject_cns_with_server_only_sibling:
lines.append(f"- `{subject_cn}`")
lines.append("")
if summary.dual_eku_subject_cns_without_server_only_sibling:
lines.append("### Dual-EKU Families Without Server-Only Siblings")
lines.append("")
for subject_cn in summary.dual_eku_subject_cns_without_server_only_sibling:
lines.append(f"- `{subject_cn}`")
lines.append("")
lines.append("## Detailed Dual-EKU Certificates")
lines.append("")
dual_items = [item for item in classifications if item.category == "tls_server_and_client"]
if not dual_items:
lines.append("- None")
lines.append("")
else:
for item in dual_items:
dns_sample = ", ".join(item.san_dns_names[:8])
if len(item.san_dns_names) > 8:
dns_sample += ", ..."
lines.append(f"### `{item.subject_cn}`")
lines.append("")
lines.append(f"- Issuer: `{item.issuer_name}`")
lines.append(f"- Validity: `{item.valid_from_utc}` to `{item.valid_to_utc}`")
lines.append(f"- Matched search domains: `{', '.join(item.matched_domains)}`")
lines.append(f"- EKU: `{format_eku_template(item.eku_oids)}`")
lines.append(f"- KeyUsage: `{format_key_usage_template(item.key_usage_flags)}`")
lines.append(f"- DNS SAN count: `{len(item.san_dns_names)}`")
lines.append(f"- DNS SAN sample: `{dns_sample}`")
lines.append("")
return "\n".join(lines) + "\n"
def main() -> int:
args = parse_args()
domains = ct_scan.load_domains(args.domains_file)
records = load_records(
domains=domains,
cache_dir=args.cache_dir,
cache_ttl_seconds=args.cache_ttl_seconds,
max_candidates=args.max_candidates,
attempts=args.attempts,
verbose=args.verbose,
)
hits, verification = ct_scan.build_hits(records)
classifications = build_classifications(hits, records)
summary = summarize(classifications, domains)
markdown_payload = render_markdown(summary, classifications)
json_payload = {
"summary": asdict(summary),
"verification": asdict(verification),
"classifications": [asdict(item) for item in classifications],
}
args.markdown_output.parent.mkdir(parents=True, exist_ok=True)
args.json_output.parent.mkdir(parents=True, exist_ok=True)
args.markdown_output.write_text(markdown_payload, encoding="utf-8")
args.json_output.write_text(json.dumps(json_payload, indent=2, sort_keys=True), encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())