CertTransparencySearch/teachingNoobs/ct_caa_analysis.md

32 KiB

ct_caa_analysis.py

Source file: ct_caa_analysis.py

CAA analyzer. This file resolves live DNS issuance policy and compares it against the public CA families that are actually covering the names today.

Main flow in one line: DNS name -> effective CAA lookup -> allowed CA families -> compare with live cert families

How to read this page:

  • left side: the actual source code block
  • right side: a plain-English explanation for a beginner
  • read from top to bottom because later blocks depend on earlier ones

Module setup

#!/usr/bin/env python3

from future import annotations

from collections import Counter, defaultdict from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any

import ct_dns_utils import ct_scan

What this block is doing

Data structures and lookup logic for effective CAA policy analysis.

Flow arrows

Earlier blocks or operator input feed this block. → Module setup → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

CaaObservation

@dataclass
class CaaObservation:
    name: str
    effective_rr_owner: str | None
    source_kind: str
    source_label: str | None
    aliases_seen: list[str]
    caa_rows: list[tuple[int, str, str]]

What this block is doing

One resolved CAA result before it is merged with certificate coverage data.

Flow arrows

Earlier blocks or operator input feed this block. → CaaObservation → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

CaaNameRow

@dataclass
class CaaNameRow:
    name: str
    zone: str
    source_kind: str
    effective_rr_owner: str | None
    source_label: str | None
    aliases_seen: list[str]
    issue_values: list[str]
    issuewild_values: list[str]
    iodef_values: list[str]
    allowed_ca_families: list[str]
    current_covering_families: list[str]
    current_covering_subject_cns: list[str]
    current_covering_cert_count: int
    current_multi_family_overlap: bool
    current_policy_mismatch: bool
    mismatch_families: list[str]

What this block is doing

One final row that compares DNS policy with current live certificate families.

Flow arrows

Earlier blocks or operator input feed this block. → CaaNameRow → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

CaaAnalysis

@dataclass
class CaaAnalysis:
    generated_at_utc: str
    configured_domains: list[str]
    total_names: int
    rows: list[CaaNameRow]
    source_kind_counts: Counter[str]
    zone_counts: Counter[str]
    multi_family_overlap_names: list[str]
    policy_mismatch_names: list[str]

What this block is doing

The full CAA analysis bundle used by the monograph.

Flow arrows

Earlier blocks or operator input feed this block. → CaaAnalysis → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

normalize_dns_name

def normalize_dns_name(value: str) -> str:
    value = value.strip()
    if value.upper().startswith("DNS:"):
        return ct_dns_utils.normalize_name(value[4:])
    return ct_dns_utils.normalize_name(value)

What this block is doing

This block makes values consistent so matching and grouping do not get confused by superficial differences.

Flow arrows

Earlier blocks or operator input feed this block. → normalize_dns_name → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

issuer_family

def issuer_family(names: set[str]) -> str:
    lowered = " ".join(sorted(names)).lower()
    if "amazon" in lowered:
        return "Amazon"
    if "google trust services" in lowered or "cn=we1" in lowered:
        return "Google Trust Services"
    if "sectigo" in lowered or "comodo" in lowered:
        return "Sectigo/COMODO"
    if any(token in lowered for token in ["digicert", "quovadis", "thawte", "geotrust", "rapidssl", "symantec", "verisign"]):
        return "DigiCert/QuoVadis"
    return "Other"

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → issuer_family → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

classify_zone

def classify_zone(name: str, configured_domains: list[str]) -> str:
    for domain in sorted(configured_domains, key=len, reverse=True):
        lowered_domain = domain.lower()
        if name == lowered_domain or name.endswith(f".{lowered_domain}"):
            return lowered_domain
    return "other"

What this block is doing

This block applies rules and chooses a category label.

Flow arrows

Earlier blocks or operator input feed this block. → classify_zone → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

cache_path

def cache_path(cache_dir: Path, name: str) -> Path:
    return cache_dir / ct_dns_utils.cache_key(f"caa-{name}")

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → cache_path → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

serialize_observation

def serialize_observation(observation: CaaObservation) -> dict[str, Any]:
    return {
        "name": observation.name,
        "effective_rr_owner": observation.effective_rr_owner,
        "source_kind": observation.source_kind,
        "source_label": observation.source_label,
        "aliases_seen": observation.aliases_seen,
        "caa_rows": [list(row) for row in observation.caa_rows],
    }

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → serialize_observation → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

deserialize_observation

def deserialize_observation(payload: dict[str, Any]) -> CaaObservation:
    return CaaObservation(
        name=payload["name"],
        effective_rr_owner=payload.get("effective_rr_owner"),
        source_kind=payload["source_kind"],
        source_label=payload.get("source_label"),
        aliases_seen=list(payload.get("aliases_seen", [])),
        caa_rows=[(int(flag), str(tag), str(value)) for flag, tag, value in payload.get("caa_rows", [])],
    )

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → deserialize_observation → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

parse_caa_response

def parse_caa_response(lines: list[str]) -> tuple[list[tuple[int, str, str]], list[str]]:
    rows: list[tuple[int, str, str]] = []
    aliases: list[str] = []
    for line in lines:
        parts = line.split(maxsplit=2)
        if len(parts) == 3 and parts[0].isdigit():
            flag, tag, value = parts
            rows.append((int(flag), tag.lower(), value.strip().strip('"').lower()))
        elif line.endswith("."):
            aliases.append(ct_dns_utils.normalize_name(line))
    return rows, aliases

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → parse_caa_response → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

query_caa_lines

def query_caa_lines(name: str) -> list[str]:
    output = ct_dns_utils.run_dig(name, "CAA", short=True)
    return [line.strip() for line in output.splitlines() if line.strip()]

What this block is doing

This block asks an external source for data and returns it in a shape the rest of the file can use.

Flow arrows

Earlier blocks or operator input feed this block. → query_caa_lines → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

relevant_caa_live

def relevant_caa_live(name: str) -> CaaObservation:
    labels = name.rstrip(".").lower().split(".")
    for index in range(len(labels)):
        candidate = ".".join(labels[index:])
        rows, aliases = parse_caa_response(query_caa_lines(candidate))
        if rows:
            if index == 0:
                source_kind = "alias_target" if aliases else "exact"
            else:
                source_kind = "parent_alias_target" if aliases else "parent"
            return CaaObservation(
                name=name,
                effective_rr_owner=candidate,
                source_kind=source_kind,
                source_label=aliases[-1] if aliases else candidate,
                aliases_seen=aliases,
                caa_rows=rows,
            )
    return CaaObservation(
        name=name,
        effective_rr_owner=None,
        source_kind="none",
        source_label=None,
        aliases_seen=[],
        caa_rows=[],
    )

What this block is doing

Finds the effective live CAA for one name, including inheritance and alias behavior.

Flow arrows

One DNS name from the SAN universe. → relevant_caa_live → `build_analysis` uses this to learn the effective issuance policy per name.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

scan_name_cached

def scan_name_cached(name: str, cache_dir: Path, ttl_seconds: int) -> CaaObservation:
    key = cache_path(cache_dir, name).name
    cached = ct_dns_utils.load_json_cache(cache_dir, key, ttl_seconds)
    if cached is not None:
        cached.pop("cached_at", None)
        return deserialize_observation(cached)
    observation = relevant_caa_live(name)
    ct_dns_utils.store_json_cache(cache_dir, key, serialize_observation(observation))
    return observation

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → scan_name_cached → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

allowed_ca_families

def allowed_ca_families(caa_rows: list[tuple[int, str, str]]) -> list[str]:
    families: set[str] = set()
    for _flag, tag, value in caa_rows:
        if tag != "issue":
            continue
        normalized = value[:-1] if value.endswith(".") else value
        if any(token in normalized for token in ["amazon.com", "amazontrust.com", "awstrust.com", "amazonaws.com", "aws.amazon.com"]):
            families.add("Amazon")
        if any(token in normalized for token in ["sectigo.com", "comodoca.com", "comodo.com"]):
            families.add("Sectigo/COMODO")
        if any(token in normalized for token in ["digicert.com", "digicert.ne.jp", "thawte.com", "geotrust.com", "rapidssl.com", "symantec.com", "quovadisglobal.com", "digitalcertvalidation.com"]):
            families.add("DigiCert/QuoVadis")
        if "pki.goog" in normalized:
            families.add("Google Trust Services")
        if "letsencrypt.org" in normalized:
            families.add("Let's Encrypt")
        if any(token in normalized for token in ["telia.com", "telia.fi", "telia.se"]):
            families.add("Telia")
    return sorted(families)

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Raw CAA rows for one effective policy. → allowed_ca_families → `build_analysis` uses the normalized families for policy-vs-live comparison.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

issue_values

def issue_values(caa_rows: list[tuple[int, str, str]], tag: str) -> list[str]:
    return sorted({value for _flag, row_tag, value in caa_rows if row_tag == tag})

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → issue_values → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

build_analysis

def build_analysis(
    hits: list[ct_scan.CertificateHit],
    configured_domains: list[str],
    cache_dir: Path,
    ttl_seconds: int,
) -> CaaAnalysis:
    names = sorted(
        {
            normalize_dns_name(entry)
            for hit in hits
            for entry in hit.san_entries
            if normalize_dns_name(entry)
        }
    )
    coverage: dict[str, list[tuple[str, str]]] = defaultdict(list)
    for hit in hits:
        family = issuer_family(hit.issuer_names)
        subject_cn = normalize_dns_name(hit.subject_cn)
        for entry in hit.san_entries:
            coverage[normalize_dns_name(entry)].append((subject_cn, family))
rows: list[CaaNameRow] = []
for name in names:
    observation = scan_name_cached(name, cache_dir, ttl_seconds)
    allowed_families = allowed_ca_families(observation.caa_rows)
    current_families = sorted({family for _subject, family in coverage[name]})
    mismatch_families = sorted(family for family in current_families if allowed_families and family not in allowed_families)
    rows.append(
        CaaNameRow(
            name=name,
            zone=classify_zone(name, configured_domains),
            source_kind=observation.source_kind,
            effective_rr_owner=observation.effective_rr_owner,
            source_label=observation.source_label,
            aliases_seen=observation.aliases_seen,
            issue_values=issue_values(observation.caa_rows, "issue"),
            issuewild_values=issue_values(observation.caa_rows, "issuewild"),
            iodef_values=issue_values(observation.caa_rows, "iodef"),
            allowed_ca_families=allowed_families,
            current_covering_families=current_families,
            current_covering_subject_cns=sorted({subject for subject, _family in coverage[name]}),
            current_covering_cert_count=len(coverage[name]),
            current_multi_family_overlap=len(current_families) > 1,
            current_policy_mismatch=bool(mismatch_families),
            mismatch_families=mismatch_families,
        )
    )

return CaaAnalysis(
    generated_at_utc=ct_scan.utc_iso(datetime.now(UTC)),
    configured_domains=sorted(configured_domains),
    total_names=len(rows),
    rows=rows,
    source_kind_counts=Counter(row.source_kind for row in rows),
    zone_counts=Counter(row.zone for row in rows),
    multi_family_overlap_names=sorted(row.name for row in rows if row.current_multi_family_overlap),
    policy_mismatch_names=sorted(row.name for row in rows if row.current_policy_mismatch),
)</code></pre>

What this block is doing

Runs CAA across the whole SAN namespace and compares policy with live issuance.

Flow arrows

Current certificate hits and the configured zones. → build_analysis → The monograph uses this for the CAA chapter and appendix.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

rows_for_zone

def rows_for_zone(analysis: CaaAnalysis, zone: str) -> list[CaaNameRow]:
    return [row for row in analysis.rows if row.zone == zone]

What this block is doing

Filters the full analysis down to one configured DNS zone.

Flow arrows

The full CAA analysis bundle. → rows_for_zone → The monograph uses zone-filtered rows for per-zone policy tables.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

policy_counter

def policy_counter(rows: list[CaaNameRow]) -> Counter[tuple[str, ...]]:
    counter: Counter[tuple[str, ...]] = Counter()
    for row in rows:
        key = tuple(row.allowed_ca_families) if row.allowed_ca_families else ("UNRESTRICTED",)
        counter[key] += 1
    return counter

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → policy_counter → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

serialize_analysis

def serialize_analysis(analysis: CaaAnalysis) -> dict[str, Any]:
    return {
        "generated_at_utc": analysis.generated_at_utc,
        "configured_domains": analysis.configured_domains,
        "total_names": analysis.total_names,
        "rows": [asdict(row) for row in analysis.rows],
        "source_kind_counts": dict(analysis.source_kind_counts),
        "zone_counts": dict(analysis.zone_counts),
        "multi_family_overlap_names": analysis.multi_family_overlap_names,
        "policy_mismatch_names": analysis.policy_mismatch_names,
    }

What this block is doing

This function is one of the building blocks inside `ct_caa_analysis.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → serialize_analysis → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?