feat(ci): add GHCR vulnerability gate policy and audit traceability

This commit is contained in:
Chummy 2026-02-25 13:26:03 +00:00 committed by Chum Yin
parent 1189ff59b8
commit 7849d10a69
3 changed files with 512 additions and 35 deletions

View File

@ -0,0 +1,17 @@
{
"schema_version": "zeroclaw.ghcr-vulnerability-policy.v1",
"required_tag_classes": [
"release",
"sha",
"latest"
],
"blocking_severities": [
"HIGH",
"CRITICAL"
],
"max_blocking_findings_per_tag": 0,
"require_blocking_count_parity": true,
"require_artifact_id_parity": true,
"scan_artifact_retention_days": 14,
"audit_artifact_retention_days": 21
}

View File

@ -13,7 +13,9 @@ on:
- "dev/config.template.toml"
- ".github/workflows/pub-docker-img.yml"
- ".github/release/ghcr-tag-policy.json"
- ".github/release/ghcr-vulnerability-policy.json"
- "scripts/ci/ghcr_publish_contract_guard.py"
- "scripts/ci/ghcr_vulnerability_gate.py"
workflow_dispatch:
concurrency:
@ -216,14 +218,44 @@ jobs:
mkdir -p artifacts
TAG_NAME="${{ steps.meta.outputs.release_tag }}"
SHA_TAG="${{ steps.meta.outputs.sha_tag }}"
LATEST_TAG="${{ steps.meta.outputs.latest_tag }}"
IMAGE_BASE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}"
VERSION_REF="${IMAGE_BASE}:${TAG_NAME}"
SHA_REF="${IMAGE_BASE}:${{ steps.meta.outputs.sha_tag }}"
LATEST_REF="${IMAGE_BASE}:${{ steps.meta.outputs.latest_tag }}"
SHA_REF="${IMAGE_BASE}:${SHA_TAG}"
LATEST_REF="${IMAGE_BASE}:${LATEST_TAG}"
SARIF_OUT="artifacts/trivy-${TAG_NAME}.sarif"
TABLE_OUT="artifacts/trivy-${TAG_NAME}.txt"
SHA_TABLE_OUT="artifacts/trivy-sha-${GITHUB_SHA::12}.txt"
LATEST_TABLE_OUT="artifacts/trivy-latest.txt"
JSON_OUT="artifacts/trivy-${TAG_NAME}.json"
SHA_TABLE_OUT="artifacts/trivy-${SHA_TAG}.txt"
SHA_JSON_OUT="artifacts/trivy-${SHA_TAG}.json"
LATEST_TABLE_OUT="artifacts/trivy-${LATEST_TAG}.txt"
LATEST_JSON_OUT="artifacts/trivy-${LATEST_TAG}.json"
scan_trivy() {
local image_ref="$1"
local output_prefix="$2"
docker run --rm \
-v "$PWD/artifacts:/work" \
aquasec/trivy:0.58.2 image \
--quiet \
--ignore-unfixed \
--severity HIGH,CRITICAL \
--format json \
--output "/work/${output_prefix}.json" \
"${image_ref}"
docker run --rm \
-v "$PWD/artifacts:/work" \
aquasec/trivy:0.58.2 image \
--quiet \
--ignore-unfixed \
--severity HIGH,CRITICAL \
--format table \
--output "/work/${output_prefix}.txt" \
"${image_ref}"
}
docker run --rm \
-v "$PWD/artifacts:/work" \
@ -233,41 +265,65 @@ jobs:
--severity HIGH,CRITICAL \
--format sarif \
--output "/work/trivy-${TAG_NAME}.sarif" \
--exit-code 1 \
"${VERSION_REF}"
docker run --rm \
-v "$PWD/artifacts:/work" \
aquasec/trivy:0.58.2 image \
--quiet \
--ignore-unfixed \
--severity HIGH,CRITICAL \
--format table \
--output "/work/trivy-${TAG_NAME}.txt" \
"${VERSION_REF}"
docker run --rm \
-v "$PWD/artifacts:/work" \
aquasec/trivy:0.58.2 image \
--quiet \
--ignore-unfixed \
--severity HIGH,CRITICAL \
--format table \
--output "/work/trivy-sha-${GITHUB_SHA::12}.txt" \
"${SHA_REF}"
docker run --rm \
-v "$PWD/artifacts:/work" \
aquasec/trivy:0.58.2 image \
--quiet \
--ignore-unfixed \
--severity HIGH,CRITICAL \
--format table \
--output "/work/trivy-latest.txt" \
"${LATEST_REF}"
scan_trivy "${VERSION_REF}" "trivy-${TAG_NAME}"
scan_trivy "${SHA_REF}" "trivy-${SHA_TAG}"
scan_trivy "${LATEST_REF}" "trivy-${LATEST_TAG}"
echo "Generated Trivy reports:"
ls -1 "$SARIF_OUT" "$TABLE_OUT" "$SHA_TABLE_OUT" "$LATEST_TABLE_OUT"
ls -1 "$SARIF_OUT" "$TABLE_OUT" "$JSON_OUT" "$SHA_TABLE_OUT" "$SHA_JSON_OUT" "$LATEST_TABLE_OUT" "$LATEST_JSON_OUT"
- name: Validate GHCR vulnerability gate
shell: bash
run: |
set -euo pipefail
python3 scripts/ci/ghcr_vulnerability_gate.py \
--release-tag "${{ steps.meta.outputs.release_tag }}" \
--sha-tag "${{ steps.meta.outputs.sha_tag }}" \
--latest-tag "${{ steps.meta.outputs.latest_tag }}" \
--release-report-json "artifacts/trivy-${{ steps.meta.outputs.release_tag }}.json" \
--sha-report-json "artifacts/trivy-${{ steps.meta.outputs.sha_tag }}.json" \
--latest-report-json "artifacts/trivy-${{ steps.meta.outputs.latest_tag }}.json" \
--policy-file .github/release/ghcr-vulnerability-policy.json \
--output-json artifacts/ghcr-vulnerability-gate.json \
--output-md artifacts/ghcr-vulnerability-gate.md \
--fail-on-violation
- name: Emit GHCR vulnerability gate audit event
if: always()
shell: bash
run: |
set -euo pipefail
if [ -f artifacts/ghcr-vulnerability-gate.json ]; then
python3 scripts/ci/emit_audit_event.py \
--event-type ghcr_vulnerability_gate \
--input-json artifacts/ghcr-vulnerability-gate.json \
--output-json artifacts/audit-event-ghcr-vulnerability-gate.json \
--artifact-name ghcr-vulnerability-gate \
--retention-days 21
fi
- name: Publish GHCR vulnerability summary
if: always()
shell: bash
run: |
set -euo pipefail
if [ -f artifacts/ghcr-vulnerability-gate.md ]; then
cat artifacts/ghcr-vulnerability-gate.md >> "$GITHUB_STEP_SUMMARY"
fi
- name: Upload GHCR vulnerability gate artifacts
if: always()
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: ghcr-vulnerability-gate
path: |
artifacts/ghcr-vulnerability-gate.json
artifacts/ghcr-vulnerability-gate.md
artifacts/audit-event-ghcr-vulnerability-gate.json
if-no-files-found: ignore
retention-days: 21
- name: Upload Trivy SARIF
if: always()
@ -284,7 +340,10 @@ jobs:
path: |
artifacts/trivy-${{ github.ref_name }}.sarif
artifacts/trivy-${{ github.ref_name }}.txt
artifacts/trivy-${{ github.ref_name }}.json
artifacts/trivy-sha-*.txt
artifacts/trivy-sha-*.json
artifacts/trivy-latest.txt
artifacts/trivy-latest.json
if-no-files-found: ignore
retention-days: 14

View File

@ -0,0 +1,401 @@
#!/usr/bin/env python3
"""Validate GHCR Trivy vulnerability gate and emit publish traceability evidence."""
from __future__ import annotations
import argparse
import datetime as dt
import json
import sys
from collections import Counter
from pathlib import Path
POLICY_SCHEMA = "zeroclaw.ghcr-vulnerability-policy.v1"
ALLOWED_TAG_CLASSES = {"release", "sha", "latest"}
ALLOWED_SEVERITIES = {"UNKNOWN", "LOW", "MEDIUM", "HIGH", "CRITICAL"}
def load_policy(path: Path) -> tuple[dict[str, object], list[str]]:
violations: list[str] = []
raw = json.loads(path.read_text(encoding="utf-8"))
def ensure_string(name: str) -> str:
value = raw.get(name)
if not isinstance(value, str) or not value.strip():
violations.append(f"Policy field `{name}` must be a non-empty string.")
return ""
return value.strip()
def ensure_bool(name: str) -> bool:
value = raw.get(name)
if not isinstance(value, bool):
violations.append(f"Policy field `{name}` must be a boolean.")
return False
return value
def ensure_non_negative_int(name: str) -> int:
value = raw.get(name)
if not isinstance(value, int) or value < 0:
violations.append(f"Policy field `{name}` must be a non-negative integer.")
return 0
return value
def ensure_positive_int(name: str) -> int:
value = raw.get(name)
if not isinstance(value, int) or value <= 0:
violations.append(f"Policy field `{name}` must be a positive integer.")
return 0
return value
def ensure_string_list(name: str, *, allowed: set[str]) -> list[str]:
value = raw.get(name)
if not isinstance(value, list) or not value:
violations.append(f"Policy field `{name}` must be a non-empty array.")
return []
out: list[str] = []
seen: set[str] = set()
for item in value:
if not isinstance(item, str) or not item.strip():
violations.append(f"Policy field `{name}` contains invalid entry.")
continue
text = item.strip().upper() if name == "blocking_severities" else item.strip()
if text in seen:
violations.append(f"Policy field `{name}` contains duplicate entry `{text}`.")
continue
if text not in allowed:
allowed_sorted = ", ".join(sorted(allowed))
violations.append(
f"Policy field `{name}` contains unsupported value `{text}`. Allowed: {allowed_sorted}."
)
continue
out.append(text)
seen.add(text)
return out
schema_version = ensure_string("schema_version")
if schema_version and schema_version != POLICY_SCHEMA:
violations.append(f"Policy schema_version must be `{POLICY_SCHEMA}`, got `{schema_version}`.")
policy = {
"schema_version": schema_version,
"required_tag_classes": ensure_string_list(
"required_tag_classes",
allowed=ALLOWED_TAG_CLASSES,
),
"blocking_severities": ensure_string_list(
"blocking_severities",
allowed=ALLOWED_SEVERITIES,
),
"max_blocking_findings_per_tag": ensure_non_negative_int("max_blocking_findings_per_tag"),
"require_blocking_count_parity": ensure_bool("require_blocking_count_parity"),
"require_artifact_id_parity": ensure_bool("require_artifact_id_parity"),
"scan_artifact_retention_days": ensure_positive_int("scan_artifact_retention_days"),
"audit_artifact_retention_days": ensure_positive_int("audit_artifact_retention_days"),
}
return policy, violations
def _pick_artifact_id(raw: dict[str, object]) -> str:
artifact_id = str(raw.get("ArtifactID", "") or "").strip()
if artifact_id:
return artifact_id
metadata = raw.get("Metadata")
if isinstance(metadata, dict):
return str(metadata.get("ImageID", "") or "").strip()
return ""
def _pick_vulnerability_id(vuln: dict[str, object]) -> str:
candidate = vuln.get("VulnerabilityID")
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
candidate = vuln.get("VulnID")
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
return "unknown"
def summarize_trivy_report(
*,
report_class: str,
tag: str,
report_file: Path,
blocking_severities: set[str],
) -> tuple[dict[str, object], list[str], list[str]]:
violations: list[str] = []
warnings: list[str] = []
try:
raw = json.loads(report_file.read_text(encoding="utf-8"))
except Exception as exc: # noqa: BLE001
summary = {
"class": report_class,
"tag": tag,
"report_file": str(report_file),
"parsed": False,
"artifact_id": "",
"result_entries": 0,
"total_vulnerabilities": 0,
"blocking_vulnerabilities": 0,
"severity_counts": {},
"blocking_sample_ids": [],
}
violations.append(f"Failed to parse Trivy report for `{report_class}` (`{tag}`): {exc}")
return summary, violations, warnings
if not isinstance(raw, dict):
summary = {
"class": report_class,
"tag": tag,
"report_file": str(report_file),
"parsed": False,
"artifact_id": "",
"result_entries": 0,
"total_vulnerabilities": 0,
"blocking_vulnerabilities": 0,
"severity_counts": {},
"blocking_sample_ids": [],
}
violations.append(f"Trivy report for `{report_class}` (`{tag}`) must be a JSON object.")
return summary, violations, warnings
results = raw.get("Results")
if not isinstance(results, list):
results = []
violations.append(
f"Trivy report for `{report_class}` (`{tag}`) is missing `Results` array."
)
severity_counts: Counter[str] = Counter()
blocking_ids: list[str] = []
blocking_count = 0
for result in results:
if not isinstance(result, dict):
warnings.append(f"Report `{report_class}` contains non-object result entry; skipped.")
continue
vulnerabilities = result.get("Vulnerabilities")
if vulnerabilities is None:
continue
if not isinstance(vulnerabilities, list):
warnings.append(
f"Report `{report_class}` result `{result.get('Target', 'unknown')}` has non-array `Vulnerabilities`; skipped."
)
continue
for vuln in vulnerabilities:
if not isinstance(vuln, dict):
warnings.append(f"Report `{report_class}` contains non-object vulnerability entry; skipped.")
continue
severity = str(vuln.get("Severity", "UNKNOWN") or "UNKNOWN").strip().upper() or "UNKNOWN"
severity_counts[severity] += 1
if severity in blocking_severities:
blocking_count += 1
if len(blocking_ids) < 10:
blocking_ids.append(_pick_vulnerability_id(vuln))
summary = {
"class": report_class,
"tag": tag,
"report_file": str(report_file),
"parsed": True,
"artifact_id": _pick_artifact_id(raw),
"result_entries": len(results),
"total_vulnerabilities": int(sum(severity_counts.values())),
"blocking_vulnerabilities": int(blocking_count),
"severity_counts": dict(sorted(severity_counts.items())),
"blocking_sample_ids": blocking_ids,
}
return summary, violations, warnings
def build_markdown(report: dict[str, object]) -> str:
lines: list[str] = []
lines.append("# GHCR Vulnerability Gate Report")
lines.append("")
lines.append(f"- Generated at: `{report['generated_at']}`")
lines.append(f"- Release tag: `{report['release_tag']}`")
lines.append(f"- Ready: `{report['ready']}`")
lines.append("")
tags: dict[str, str] = report["resolved_tags"]
lines.append("## Resolved Tags")
lines.append(f"- Release: `{tags['release']}`")
lines.append(f"- SHA: `{tags['sha']}`")
lines.append(f"- Latest: `{tags['latest']}`")
lines.append("")
lines.append("## Scan Summary")
lines.append("| Class | Tag | Blocking | Total | Artifact ID |")
lines.append("| --- | --- | ---: | ---: | --- |")
for report_class in ("release", "sha", "latest"):
entry = report["reports"].get(report_class)
if not isinstance(entry, dict):
continue
lines.append(
"| `{}` | `{}` | {} | {} | `{}` |".format(
report_class,
entry.get("tag", ""),
entry.get("blocking_vulnerabilities", 0),
entry.get("total_vulnerabilities", 0),
entry.get("artifact_id", ""),
)
)
lines.append("")
lines.append("## Severity Counts")
for report_class in ("release", "sha", "latest"):
entry = report["reports"].get(report_class)
if not isinstance(entry, dict):
continue
lines.append(f"### `{report_class}`")
severity_counts = entry.get("severity_counts", {})
if not isinstance(severity_counts, dict) or not severity_counts:
lines.append("- none")
else:
for severity, count in severity_counts.items():
lines.append(f"- `{severity}`: {count}")
if entry.get("blocking_sample_ids"):
sample = ", ".join(f"`{item}`" for item in entry["blocking_sample_ids"])
lines.append(f"- blocking sample IDs: {sample}")
lines.append("")
if report["warnings"]:
lines.append("## Warnings")
for item in report["warnings"]:
lines.append(f"- {item}")
lines.append("")
if report["violations"]:
lines.append("## Violations")
for item in report["violations"]:
lines.append(f"- {item}")
lines.append("")
return "\n".join(lines).strip() + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Validate GHCR Trivy vulnerability gate report contract.")
parser.add_argument("--release-tag", required=True)
parser.add_argument("--sha-tag", required=True)
parser.add_argument("--latest-tag", required=True)
parser.add_argument("--release-report-json", required=True)
parser.add_argument("--sha-report-json", required=True)
parser.add_argument("--latest-report-json", required=True)
parser.add_argument("--policy-file", required=True)
parser.add_argument("--output-json", required=True)
parser.add_argument("--output-md", required=True)
parser.add_argument("--fail-on-violation", action="store_true")
args = parser.parse_args()
policy_file = Path(args.policy_file).resolve()
if not policy_file.exists() or not policy_file.is_file():
print(f"policy file does not exist: {policy_file}", file=sys.stderr)
return 2
report_files = {
"release": Path(args.release_report_json).resolve(),
"sha": Path(args.sha_report_json).resolve(),
"latest": Path(args.latest_report_json).resolve(),
}
missing_reports = [name for name, path in report_files.items() if not path.exists()]
if missing_reports:
print(
"missing trivy reports: " + ", ".join(f"{name} ({report_files[name]})" for name in missing_reports),
file=sys.stderr,
)
return 2
policy, violations = load_policy(policy_file)
warnings: list[str] = []
resolved_tags = {
"release": args.release_tag,
"sha": args.sha_tag,
"latest": args.latest_tag,
}
blocking_severities = {str(item) for item in policy.get("blocking_severities", [])}
required_classes = [str(item) for item in policy.get("required_tag_classes", [])]
reports: dict[str, dict[str, object]] = {}
for report_class in required_classes:
report_file = report_files[report_class]
summary, report_violations, report_warnings = summarize_trivy_report(
report_class=report_class,
tag=resolved_tags[report_class],
report_file=report_file,
blocking_severities=blocking_severities,
)
reports[report_class] = summary
violations.extend(report_violations)
warnings.extend(report_warnings)
max_blocking = int(policy.get("max_blocking_findings_per_tag", 0))
for report_class in required_classes:
summary = reports.get(report_class)
if not isinstance(summary, dict):
violations.append(f"Missing parsed report summary for required class `{report_class}`.")
continue
blocking_count = int(summary.get("blocking_vulnerabilities", 0))
if blocking_count > max_blocking:
violations.append(
f"Blocking vulnerabilities for `{report_class}` tag `{summary.get('tag', '')}` exceed policy: {blocking_count} > {max_blocking}."
)
if bool(policy.get("require_blocking_count_parity")) and len(required_classes) > 1:
blocking_by_class = {
report_class: int(reports.get(report_class, {}).get("blocking_vulnerabilities", 0))
for report_class in required_classes
}
if len(set(blocking_by_class.values())) > 1:
detail = ", ".join(f"{name}={count}" for name, count in blocking_by_class.items())
violations.append(f"Blocking vulnerability count parity violation across tags: {detail}.")
if bool(policy.get("require_artifact_id_parity")) and len(required_classes) > 1:
artifact_by_class = {
report_class: str(reports.get(report_class, {}).get("artifact_id", "") or "")
for report_class in required_classes
}
non_empty_values = {value for value in artifact_by_class.values() if value}
if len(non_empty_values) > 1:
detail = ", ".join(f"{name}={value or '<empty>'}" for name, value in artifact_by_class.items())
violations.append(f"Artifact ID parity violation across tags: {detail}.")
report = {
"schema_version": "zeroclaw.ghcr-vulnerability-gate.v1",
"generated_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"release_tag": args.release_tag,
"resolved_tags": resolved_tags,
"policy_file": str(policy_file),
"policy_schema_version": policy.get("schema_version"),
"policy": policy,
"reports": reports,
"violations": violations,
"warnings": warnings,
"ready": not violations,
}
output_json = Path(args.output_json).resolve()
output_json.parent.mkdir(parents=True, exist_ok=True)
output_json.write_text(json.dumps(report, indent=2) + "\n", encoding="utf-8")
output_md = Path(args.output_md).resolve()
output_md.parent.mkdir(parents=True, exist_ok=True)
output_md.write_text(build_markdown(report), encoding="utf-8")
if violations and args.fail_on_violation:
print("ghcr vulnerability gate violations found:", file=sys.stderr)
for item in violations:
print(f"- {item}", file=sys.stderr)
return 3
return 0
if __name__ == "__main__":
raise SystemExit(main())