CertTransparencySearch/teachingNoobs/ct_focus_subjects.md

54 KiB

ct_focus_subjects.py

Source file: ct_focus_subjects.py

Focused-cohort analyzer. This file takes your special hand-picked Subject CN list and compares it against the wider certificate and DNS estate.

Main flow in one line: focus-subject file -> cohort entries -> compare against current and historical estate -> bucketed cohort explanation

How to read this page:

  • left side: the actual source code block
  • right side: a plain-English explanation for a beginner
  • read from top to bottom because later blocks depend on earlier ones

Module setup

#!/usr/bin/env python3

from future import annotations

import re from collections import Counter from dataclasses import dataclass from pathlib import Path from statistics import median

import ct_dns_utils import ct_lineage_report import ct_master_report import ct_scan

ENVIRONMENT_HINTS = { "alpha", "beta", "dev", "qa", "uat", "sit", "stage", "stg", "preprod", "prod", "release", "squads", "sandbox", }

VENDOR_HINTS = { "vendor", "external", "hoster", "product", "mitek", "scrive", "pega", }

IDENTITY_HINTS = { "id", "idp", "identity", "auth", "sso", "online", "mail", "email", "secmail", "chat", "appointment", "appointments", }

CUSTOMER_HINTS = { "brand", "branding", "campaign", "experience", "welcome", "thankyou", "gifts", "investment", "client", "customers", "information", "club", "risk", }

What this block is doing

Rules and data shapes for analyzing the special hand-picked Subject-CN cohort.

Flow arrows

Earlier blocks or operator input feed this block. → Module setup → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

FocusSubject

@dataclass
class FocusSubject:
    subject_cn: str
    analyst_note: str

What this block is doing

One line from the local focus-subject file.

Flow arrows

Earlier blocks or operator input feed this block. → FocusSubject → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

FocusSubjectDetail

@dataclass
class FocusSubjectDetail:
    subject_cn: str
    analyst_note: str
    analyst_theme: str
    taxonomy_bucket: str
    taxonomy_reason: str
    observed_role: str
    basket_status: str
    current_direct_certificates: int
    historical_direct_certificates: int
    current_non_focus_san_carriers: int
    historical_non_focus_san_carriers: int
    current_revoked_certificates: int
    current_not_revoked_certificates: int
    current_dns_outcome: str
    current_dns_classification: str
    current_issuer_families: str
    historical_issuer_families: str
    current_san_size_span: str
    historical_san_size_span: str
    max_direct_to_carrier_overlap_days: int
    carrier_subjects: str
    current_red_flags: str
    past_red_flags: str

What this block is doing

One detailed analytical row for one focused Subject CN.

Flow arrows

Earlier blocks or operator input feed this block. → FocusSubjectDetail → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

FocusCohortAnalysis

@dataclass
class FocusCohortAnalysis:
    focus_subjects: list[FocusSubject]
    details: list[FocusSubjectDetail]
    provided_subjects_count: int
    historically_seen_subjects_count: int
    current_direct_subjects_count: int
    current_carried_only_subjects_count: int
    historical_non_focus_carried_subjects_count: int
    unseen_subjects: list[str]
    current_focus_certificate_count: int
    current_rest_certificate_count: int
    focus_revoked_current_count: int
    focus_not_revoked_current_count: int
    rest_revoked_current_count: int
    rest_not_revoked_current_count: int
    focus_revoked_share: str
    rest_revoked_share: str
    focus_median_san_entries: int
    focus_average_san_entries: str
    rest_median_san_entries: int
    rest_average_san_entries: str
    focus_multi_zone_certificate_count: int
    rest_multi_zone_certificate_count: int
    focus_current_subject_dns_classes: Counter[str]
    rest_current_subject_dns_classes: Counter[str]
    focus_current_subject_dns_stacks: Counter[str]
    rest_current_subject_dns_stacks: Counter[str]
    focus_current_issuer_families: Counter[str]
    rest_current_issuer_families: Counter[str]
    focus_current_red_flag_subjects: int
    focus_past_red_flag_subjects: int
    focus_any_red_flag_subjects: int
    bucket_counts: Counter[str]
    notables: list[FocusSubjectDetail]
    transition_rows: list[FocusSubjectDetail]

What this block is doing

The full cohort comparison bundle used in the monograph.

Flow arrows

Earlier blocks or operator input feed this block. → FocusCohortAnalysis → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

load_focus_subjects

def load_focus_subjects(path: Path) -> list[FocusSubject]:
    if not path.exists():
        return []
    subjects: list[FocusSubject] = []
    seen: set[str] = set()
    for raw_line in path.read_text(encoding="utf-8").splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#"):
            continue
        match = re.match(r"^(?P<cn>[^()]+?)(?:\s*\((?P<meta>.*)\))?$", line)
        if not match:
            continue
        subject_cn = match.group("cn").strip().lower()
        if subject_cn in seen:
            continue
        seen.add(subject_cn)
        subjects.append(
            FocusSubject(
                subject_cn=subject_cn,
                analyst_note=(match.group("meta") or "").strip(),
            )
        )
    return subjects

What this block is doing

Reads the local focus-subject list and any analyst notes attached to it.

Flow arrows

The local focus-subject file. → load_focus_subjects → `build_analysis` uses these parsed cohort entries.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

dns_names

def dns_names(san_entries: list[str]) -> set[str]:
    return {entry[4:].lower() for entry in san_entries if entry.startswith("DNS:")}

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → dns_names → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

overlap_days

def overlap_days(
    left_start,
    left_end,
    right_start,
    right_end,
) -> int:
    start = max(left_start, right_start)
    end = min(left_end, right_end)
    if end <= start:
        return 0
    return max(1, (end - start).days)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → overlap_days → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

pct

def pct(count: int, total: int) -> str:
    if total <= 0:
        return "0.0%"
    return f"{(count / total) * 100:.1f}%"

What this block is doing

This is a small helper that keeps the larger analytical code cleaner and easier to reuse.

Flow arrows

Earlier blocks or operator input feed this block. → pct → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

short_issuer_family

def short_issuer_family(issuer_name: str) -> str:
    lowered = issuer_name.lower()
    if "amazon" in lowered:
        return "Amazon"
    if "sectigo" in lowered or "comodo" in lowered:
        return "Sectigo/COMODO"
    if "google trust services" in lowered or "cn=we1" in lowered:
        return "Google Trust Services"
    return "Other"

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → short_issuer_family → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

median_int

def median_int(values: list[int]) -> int:
    if not values:
        return 0
    return int(median(values))

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → median_int → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

average_text

def average_text(values: list[int]) -> str:
    if not values:
        return "0.0"
    return f"{(sum(values) / len(values)):.1f}"

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → average_text → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

san_size_span

def san_size_span(current_hits: list[ct_scan.CertificateHit]) -> str:
    sizes = sorted({len(hit.san_entries) for hit in current_hits})
    if not sizes:
        return "-"
    if len(sizes) == 1:
        return str(sizes[0])
    return ", ".join(str(value) for value in sizes[:4]) + ("" if len(sizes) <= 4 else f", ... (+{len(sizes) - 4} more)")

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → san_size_span → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

historical_san_size_span

def historical_san_size_span(certificates: list[ct_lineage_report.HistoricalCertificate]) -> str:
    sizes = sorted({len(certificate.san_entries) for certificate in certificates})
    if not sizes:
        return "-"
    if len(sizes) == 1:
        return str(sizes[0])
    return ", ".join(str(value) for value in sizes[:4]) + ("" if len(sizes) <= 4 else f", ... (+{len(sizes) - 4} more)")

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → historical_san_size_span → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

summarize_names

def summarize_names(values: set[str], limit: int = 4) -> str:
    if not values:
        return "-"
    ordered = sorted(values, key=str.casefold)
    if len(ordered) <= limit:
        return ", ".join(ordered)
    return ", ".join(ordered[:limit]) + f", ... (+{len(ordered) - limit} more)"

What this block is doing

This block compresses many detailed rows into a smaller, easier-to-read summary.

Flow arrows

Earlier blocks or operator input feed this block. → summarize_names → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

zone_count_from_sans

def zone_count_from_sans(san_entries: list[str]) -> int:
    return len(
        {
            ct_scan.san_tail_split(entry[4:])[1]
            for entry in san_entries
            if entry.startswith("DNS:")
        }
    )

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → zone_count_from_sans → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

max_san_count_current

def max_san_count_current(hits: list[ct_scan.CertificateHit]) -> int:
    return max((len(hit.san_entries) for hit in hits), default=0)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → max_san_count_current → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

max_san_count_historical

def max_san_count_historical(certificates: list[ct_lineage_report.HistoricalCertificate]) -> int:
    return max((len(certificate.san_entries) for certificate in certificates), default=0)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → max_san_count_historical → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

max_zone_count_current

def max_zone_count_current(hits: list[ct_scan.CertificateHit]) -> int:
    return max((zone_count_from_sans(hit.san_entries) for hit in hits), default=0)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → max_zone_count_current → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

bucket_sort_key

def bucket_sort_key(value: str) -> tuple[int, str]:
    order = {
        "direct_front_door": 0,
        "platform_matrix_anchor": 1,
        "ambiguous_legacy": 2,
    }
    return (order.get(value, 99), value)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → bucket_sort_key → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

taxonomy_bucket_label

def taxonomy_bucket_label(bucket: str) -> str:
    return {
        "direct_front_door": "Front-door direct name",
        "platform_matrix_anchor": "Platform-anchor matrix name",
        "ambiguous_legacy": "Ambiguous or legacy residue",
    }.get(bucket, bucket)

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → taxonomy_bucket_label → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

analyst_theme

def analyst_theme(subject: FocusSubject) -> str:
    tokens = set(re.findall(r"[a-z0-9]+", f"{subject.subject_cn} {subject.analyst_note}".lower()))
    if ENVIRONMENT_HINTS & tokens:
        return "environment or platform anchor"
    if VENDOR_HINTS & tokens:
        return "vendor or product integration"
    if IDENTITY_HINTS & tokens:
        return "identity, messaging, or service front"
    if CUSTOMER_HINTS & tokens:
        return "customer proposition or campaign front"
    left_label = subject.subject_cn.split(".")[0].lower()
    if re.fullmatch(r"\d+", left_label) or re.fullmatch(r"[a-z]{2,6}\d{1,4}", left_label):
        return "opaque or legacy label"
    return "human-named branded or service endpoint"

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → analyst_theme → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

classify_taxonomy_bucket

def classify_taxonomy_bucket(
    subject: FocusSubject,
    current_hits: list[ct_scan.CertificateHit],
    historical_hits: list[ct_lineage_report.HistoricalCertificate],
    current_carriers: list[ct_scan.CertificateHit],
    historical_carriers: list[ct_lineage_report.HistoricalCertificate],
) -> tuple[str, str]:
    tokens = set(re.findall(r"[a-z0-9]+", f"{subject.subject_cn} {subject.analyst_note}".lower()))
    left_label = subject.subject_cn.split(".")[0].lower()
    opaque_label = bool(
        re.fullmatch(r"\d+", left_label)
        or re.fullmatch(r"[a-z]{1,4}\d{1,4}", left_label)
    )
    current_direct_exists = bool(current_hits)
    historical_direct_exists = bool(historical_hits)
    max_current_sans = max_san_count_current(current_hits)
    max_historical_sans = max_san_count_historical(historical_hits)
    max_any_sans = max(max_current_sans, max_historical_sans)
    max_current_zones = max_zone_count_current(current_hits)
    carrier_only_today = not current_direct_exists and bool(current_carriers)
    carrier_only_history = (not current_direct_exists and not historical_direct_exists and bool(historical_carriers))
    environment_signal = bool(ENVIRONMENT_HINTS & tokens)
if max_any_sans &gt;= 20:
    return (
        &quot;platform_matrix_anchor&quot;,
        &quot;Large SAN matrix coverage indicates an umbrella certificate for a managed platform slice rather than one standalone public front door.&quot;,
    )
if carrier_only_today or carrier_only_history:
    return (
        &quot;ambiguous_legacy&quot;,
        &quot;This name now appears mainly as a carried SAN passenger or as historical residue, so it no longer behaves like a stable standalone certificate front.&quot;,
    )
if current_direct_exists and max_any_sans &lt;= 4 and max_current_zones &lt;= 1 and not opaque_label and not environment_signal:
    return (
        &quot;direct_front_door&quot;,
        &quot;Small direct certificates, single-zone scope, and a human-readable service label fit the pattern of a branded or service-facing public entry point.&quot;,
    )
if historical_direct_exists and not current_direct_exists and max_any_sans &lt;= 4 and not opaque_label:
    return (
        &quot;ambiguous_legacy&quot;,
        &quot;The historical certificates look like a simple direct front, but there is no current direct certificate anymore, which makes this mostly migration residue rather than a live front-door pattern.&quot;,
    )
if max_any_sans &lt;= 4 and opaque_label:
    return (
        &quot;ambiguous_legacy&quot;,
        &quot;The direct certificate shape is small and simple, but the left-most label is too opaque to treat as a clear branded or service-front naming pattern.&quot;,
    )
if environment_signal and max_any_sans &lt;= 19:
    return (
        &quot;ambiguous_legacy&quot;,
        &quot;Environment-style wording is present, but the SAN coverage is not broad enough to prove a full platform-matrix certificate role.&quot;,
    )
if max_any_sans &gt; 4:
    return (
        &quot;ambiguous_legacy&quot;,
        &quot;Direct issuance exists, but the SAN set is broader or more variable than a simple one-service front, which leaves the role mixed.&quot;,
    )
return (
    &quot;ambiguous_legacy&quot;,
    &quot;The evidence is mixed or too thin to place this name cleanly in one of the stronger bucket patterns.&quot;,
)</code></pre>

What this block is doing

Places a name into the direct-front, platform-anchor, or ambiguous bucket.

Flow arrows

One focused Subject CN plus surrounding evidence. → classify_taxonomy_bucket → `build_analysis` uses the bucket label in the focused-cohort chapter.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

observed_role

def observed_role(
    subject: FocusSubject,
    current_hits: list[ct_scan.CertificateHit],
    current_carriers: list[ct_scan.CertificateHit],
    historical_carriers: list[ct_lineage_report.HistoricalCertificate],
    observation: ct_dns_utils.DnsObservation,
) -> str:
    tokens = set(re.findall(r"[a-z0-9]+", f"{subject.subject_cn} {subject.analyst_note}".lower()))
    if not current_hits and current_carriers:
        return "carried today inside another certificate"
    if not current_hits and historical_carriers:
        return "historical carried alias or retired passenger"
    if not current_hits:
        return "not seen in the CT corpus"
    max_san_entries = max(len(hit.san_entries) for hit in current_hits)
    if max_san_entries >= 20 or (ENVIRONMENT_HINTS & tokens):
        return "platform matrix or environment anchor"
    revoked = sum(1 for hit in current_hits if hit.revocation_status == "revoked")
    if revoked >= 3:
        return "high-churn direct service front"
    if VENDOR_HINTS & tokens:
        return "direct vendor or product integration front"
    if IDENTITY_HINTS & tokens:
        return "direct service or identity front"
    if CUSTOMER_HINTS & tokens:
        return "direct branded or customer proposition front"
    if observation.classification in {"direct_address", "cname_to_address"}:
        return "direct standalone service front"
    return "standalone branded or service endpoint"

What this block is doing

Tries to describe what role the name appears to play in the public estate.

Flow arrows

One focused Subject CN plus public evidence. → observed_role → `build_analysis` stores the plain-English role description.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

basket_status

def basket_status(
    current_hits: list[ct_scan.CertificateHit],
    current_carriers: list[ct_scan.CertificateHit],
    historical_hits: list[ct_lineage_report.HistoricalCertificate],
    historical_carriers: list[ct_lineage_report.HistoricalCertificate],
) -> str:
    if current_hits and current_carriers:
        return "current direct-and-carried overlap"
    if current_hits:
        return "current direct subject certificate"
    if current_carriers:
        return "current SAN passenger only"
    if historical_hits and historical_carriers:
        return "historical direct-and-carried only"
    if historical_hits:
        return "historical direct only"
    if historical_carriers:
        return "historical SAN passenger only"
    return "not seen"

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → basket_status → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

red_flag_text

def red_flag_text(row_lookup: dict[str, str], subject_cn: str) -> str:
    return row_lookup.get(subject_cn.lower(), "-")

What this block is doing

This function is one of the building blocks inside `ct_focus_subjects.py`. It exists so the file can do one narrow job at a time instead of one giant unreadable routine.

Flow arrows

Earlier blocks or operator input feed this block. → red_flag_text → Later blocks in the same file or in the next analytical stage consume its output.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?

build_analysis

def build_analysis(
    subjects: list[FocusSubject],
    report: dict[str, object],
    assessment: ct_lineage_report.HistoricalAssessment,
    dns_cache_dir: Path,
    dns_cache_ttl_seconds: int,
) -> FocusCohortAnalysis | None:
    if not subjects:
        return None
    focus_set = {subject.subject_cn for subject in subjects}
current_hits = report[&quot;hits&quot;]
current_by_cn: dict[str, list[ct_scan.CertificateHit]] = {}
for hit in current_hits:
    current_by_cn.setdefault(hit.subject_cn.lower(), []).append(hit)

historical_by_cn: dict[str, list[ct_lineage_report.HistoricalCertificate]] = {}
for certificate in assessment.certificates:
    historical_by_cn.setdefault(certificate.subject_cn.lower(), []).append(certificate)

non_focus_current = [hit for hit in current_hits if hit.subject_cn.lower() not in focus_set]
non_focus_historical = [certificate for certificate in assessment.certificates if certificate.subject_cn.lower() not in focus_set]

observation_by_name = report[&quot;observation_by_name&quot;]
detail_rows: list[FocusSubjectDetail] = []
transition_rows: list[FocusSubjectDetail] = []

current_red_flag_lookup = {row.subject_cn.lower(): row.flags for row in assessment.current_red_flag_rows}
past_red_flag_lookup = {row.subject_cn.lower(): row.flags for row in assessment.past_red_flag_rows}

for subject in subjects:
    current_direct = current_by_cn.get(subject.subject_cn, [])
    historical_direct = historical_by_cn.get(subject.subject_cn, [])
    current_carriers = [hit for hit in non_focus_current if subject.subject_cn in dns_names(hit.san_entries)]
    historical_carriers = [
        certificate
        for certificate in non_focus_historical
        if subject.subject_cn in dns_names(certificate.san_entries)
    ]
    observation = observation_by_name.get(subject.subject_cn) or ct_dns_utils.scan_name_cached(
        subject.subject_cn,
        dns_cache_dir,
        dns_cache_ttl_seconds,
    )
    current_issuer_families = Counter(
        short_issuer_family(ct_scan.primary_issuer_name(hit))
        for hit in current_direct
    )
    historical_issuer_families = Counter(
        certificate.issuer_family
        for certificate in historical_direct
    )
    max_overlap = 0
    for direct_certificate in historical_direct:
        for carrier_certificate in historical_carriers:
            max_overlap = max(
                max_overlap,
                overlap_days(
                    direct_certificate.validity_not_before,
                    direct_certificate.effective_not_after,
                    carrier_certificate.validity_not_before,
                    carrier_certificate.effective_not_after,
                ),
            )
    taxonomy_bucket, taxonomy_reason = classify_taxonomy_bucket(
        subject,
        current_direct,
        historical_direct,
        current_carriers,
        historical_carriers,
    )
    detail = FocusSubjectDetail(
        subject_cn=subject.subject_cn,
        analyst_note=subject.analyst_note or &quot;-&quot;,
        analyst_theme=analyst_theme(subject),
        taxonomy_bucket=taxonomy_bucket,
        taxonomy_reason=taxonomy_reason,
        observed_role=observed_role(subject, current_direct, current_carriers, historical_carriers, observation),
        basket_status=basket_status(current_direct, current_carriers, historical_direct, historical_carriers),
        current_direct_certificates=len(current_direct),
        historical_direct_certificates=len(historical_direct),
        current_non_focus_san_carriers=len(current_carriers),
        historical_non_focus_san_carriers=len(historical_carriers),
        current_revoked_certificates=sum(1 for hit in current_direct if hit.revocation_status == &quot;revoked&quot;),
        current_not_revoked_certificates=sum(1 for hit in current_direct if hit.revocation_status == &quot;not_revoked&quot;),
        current_dns_outcome=observation.stack_signature,
        current_dns_classification=observation.classification,
        current_issuer_families=&quot;, &quot;.join(
            f&quot;{name} ({count})&quot;
            for name, count in current_issuer_families.most_common()
        ) or &quot;-&quot;,
        historical_issuer_families=&quot;, &quot;.join(
            f&quot;{name} ({count})&quot;
            for name, count in historical_issuer_families.most_common()
        ) or &quot;-&quot;,
        current_san_size_span=san_size_span(current_direct),
        historical_san_size_span=historical_san_size_span(historical_direct),
        max_direct_to_carrier_overlap_days=max_overlap,
        carrier_subjects=summarize_names({hit.subject_cn for hit in current_carriers} | {certificate.subject_cn for certificate in historical_carriers}),
        current_red_flags=red_flag_text(current_red_flag_lookup, subject.subject_cn),
        past_red_flags=red_flag_text(past_red_flag_lookup, subject.subject_cn),
    )
    detail_rows.append(detail)
    if detail.current_non_focus_san_carriers or detail.historical_non_focus_san_carriers:
        transition_rows.append(detail)

focus_current_hits = [hit for hit in current_hits if hit.subject_cn.lower() in focus_set]
rest_current_hits = [hit for hit in current_hits if hit.subject_cn.lower() not in focus_set]

def zone_count(hit: ct_scan.CertificateHit) -&gt; int:
    return len({ct_scan.san_tail_split(entry[4:])[1] for entry in hit.san_entries if entry.startswith(&quot;DNS:&quot;)})

focus_current_subject_names = sorted({hit.subject_cn.lower() for hit in focus_current_hits})
rest_current_subject_names = sorted({hit.subject_cn.lower() for hit in rest_current_hits})

def observation_for_subject(name: str) -&gt; ct_dns_utils.DnsObservation:
    return observation_by_name.get(name) or ct_dns_utils.scan_name_cached(name, dns_cache_dir, dns_cache_ttl_seconds)

focus_current_subject_observations = [observation_for_subject(name) for name in focus_current_subject_names]
rest_current_subject_observations = [observation_for_subject(name) for name in rest_current_subject_names]

focus_current_issuer_families = Counter(
    short_issuer_family(ct_scan.primary_issuer_name(hit))
    for hit in focus_current_hits
)
rest_current_issuer_families = Counter(
    short_issuer_family(ct_scan.primary_issuer_name(hit))
    for hit in rest_current_hits
)

current_red_flag_subjects = {row.subject_cn.lower() for row in assessment.current_red_flag_rows}
past_red_flag_subjects = {row.subject_cn.lower() for row in assessment.past_red_flag_rows}

notables = sorted(
    detail_rows,
    key=lambda item: (
        bucket_sort_key(item.taxonomy_bucket),
        -(
            (item.current_revoked_certificates &gt; 0)
            + (item.current_non_focus_san_carriers &gt; 0)
            + (item.historical_non_focus_san_carriers &gt; 0)
            + (item.current_red_flags != &quot;-&quot;)
            + (item.past_red_flags != &quot;-&quot;)
        ),
        -item.current_direct_certificates,
        item.subject_cn,
    ),
)[:10]

return FocusCohortAnalysis(
    focus_subjects=subjects,
    details=sorted(detail_rows, key=lambda item: (bucket_sort_key(item.taxonomy_bucket), item.subject_cn.casefold())),
    provided_subjects_count=len(subjects),
    historically_seen_subjects_count=sum(
        1
        for item in detail_rows
        if item.historical_direct_certificates &gt; 0 or item.historical_non_focus_san_carriers &gt; 0
    ),
    current_direct_subjects_count=sum(1 for item in detail_rows if item.current_direct_certificates &gt; 0),
    current_carried_only_subjects_count=sum(
        1
        for item in detail_rows
        if item.current_direct_certificates == 0 and item.current_non_focus_san_carriers &gt; 0
    ),
    historical_non_focus_carried_subjects_count=sum(
        1
        for item in detail_rows
        if item.historical_non_focus_san_carriers &gt; 0
    ),
    unseen_subjects=[item.subject_cn for item in detail_rows if item.basket_status == &quot;not seen&quot;],
    current_focus_certificate_count=len(focus_current_hits),
    current_rest_certificate_count=len(rest_current_hits),
    focus_revoked_current_count=sum(1 for hit in focus_current_hits if hit.revocation_status == &quot;revoked&quot;),
    focus_not_revoked_current_count=sum(1 for hit in focus_current_hits if hit.revocation_status == &quot;not_revoked&quot;),
    rest_revoked_current_count=sum(1 for hit in rest_current_hits if hit.revocation_status == &quot;revoked&quot;),
    rest_not_revoked_current_count=sum(1 for hit in rest_current_hits if hit.revocation_status == &quot;not_revoked&quot;),
    focus_revoked_share=pct(
        sum(1 for hit in focus_current_hits if hit.revocation_status == &quot;revoked&quot;),
        len(focus_current_hits),
    ),
    rest_revoked_share=pct(
        sum(1 for hit in rest_current_hits if hit.revocation_status == &quot;revoked&quot;),
        len(rest_current_hits),
    ),
    focus_median_san_entries=median_int([len(hit.san_entries) for hit in focus_current_hits]),
    focus_average_san_entries=average_text([len(hit.san_entries) for hit in focus_current_hits]),
    rest_median_san_entries=median_int([len(hit.san_entries) for hit in rest_current_hits]),
    rest_average_san_entries=average_text([len(hit.san_entries) for hit in rest_current_hits]),
    focus_multi_zone_certificate_count=sum(1 for hit in focus_current_hits if zone_count(hit) &gt; 1),
    rest_multi_zone_certificate_count=sum(1 for hit in rest_current_hits if zone_count(hit) &gt; 1),
    focus_current_subject_dns_classes=Counter(observation.classification for observation in focus_current_subject_observations),
    rest_current_subject_dns_classes=Counter(observation.classification for observation in rest_current_subject_observations),
    focus_current_subject_dns_stacks=Counter(observation.stack_signature for observation in focus_current_subject_observations),
    rest_current_subject_dns_stacks=Counter(observation.stack_signature for observation in rest_current_subject_observations),
    focus_current_issuer_families=focus_current_issuer_families,
    rest_current_issuer_families=rest_current_issuer_families,
    focus_current_red_flag_subjects=sum(1 for subject in subjects if subject.subject_cn in current_red_flag_subjects),
    focus_past_red_flag_subjects=sum(1 for subject in subjects if subject.subject_cn in past_red_flag_subjects),
    focus_any_red_flag_subjects=sum(
        1
        for subject in subjects
        if subject.subject_cn in current_red_flag_subjects or subject.subject_cn in past_red_flag_subjects
    ),
    bucket_counts=Counter(item.taxonomy_bucket for item in detail_rows),
    notables=notables,
    transition_rows=sorted(
        transition_rows,
        key=lambda item: (
            -(item.current_non_focus_san_carriers + item.historical_non_focus_san_carriers),
            -item.max_direct_to_carrier_overlap_days,
            item.subject_cn.casefold(),
        ),
    ),
)</code></pre>

What this block is doing

Runs the full comparison between the focused cohort and the rest of the estate.

Flow arrows

The focus-subject list, current-state report, and historical assessment. → build_analysis → The monograph uses the resulting bundle for Chapter 8 and Appendix D.

How to think about it

Treat this block as one small station in a pipeline. Ask: what comes in here, what gets changed here, and what comes out for the next block?