zeroclaw/scripts/ci/ci_change_audit.py

421 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""Generate a CI/CD change audit report for GitHub workflows and CI scripts.
The report is designed for change-control traceability and light policy checks:
- enumerate changed CI-related files
- summarize line churn
- capture newly introduced action references
- detect unpinned `uses:` action references
- detect risky pipe-to-shell commands (e.g. `curl ... | sh`)
- detect newly introduced `pull_request_target` triggers in supported YAML forms
- detect broad `permissions: write-all` grants
- detect unsafe JS execution patterns in workflow helper scripts
- detect newly introduced `${{ secrets.* }}` references
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
AUDIT_PREFIXES = (
".github/workflows/",
".github/release/",
".github/actions/",
".github/codeql/",
"scripts/ci/",
".githooks/",
)
AUDIT_FILES = {
".github/dependabot.yml",
"deny.toml",
".gitleaks.toml",
}
WORKFLOW_PATH_PREFIXES = (
".github/workflows/",
".github/release/",
".github/actions/",
".github/codeql/",
)
WORKFLOW_EXTENSIONS = (".yml", ".yaml")
SHELL_EXTENSIONS = (".sh", ".bash")
JS_EXTENSIONS = (".js", ".cjs", ".mjs")
USES_RE = re.compile(r"^\+\s*(?:-\s*)?uses:\s*([^\s#]+)")
SECRETS_RE = re.compile(r"\$\{\{\s*secrets\.([A-Za-z0-9_]+)\s*}}")
SHA_PIN_RE = re.compile(r"^[0-9a-f]{40}$")
PIPE_TO_SHELL_RE = re.compile(r"\b(?:curl|wget)\b.*\|\s*(?:sh|bash)\b")
PERMISSION_WRITE_RE = re.compile(r"^\+\s*([a-z-]+):\s*write\s*$")
PERMISSIONS_WRITE_ALL_RE = re.compile(r"^\+\s*permissions\s*:\s*write-all\s*$", re.IGNORECASE)
UNSAFE_JS_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
("eval()", re.compile(r"\beval\s*\(")),
("Function()", re.compile(r"\bFunction\s*\(")),
(
"vm.* execution",
re.compile(r"\bvm\.(?:runInContext|runInNewContext|runInThisContext|Script)\b"),
),
(
"child_process dynamic execution",
re.compile(
r"\bchild_process\.(?:exec|execSync|spawn|spawnSync|execFile|execFileSync|fork)\s*\("
),
),
)
def line_adds_pull_request_target(added_text: str) -> bool:
# Support the three common YAML forms:
# 1) pull_request_target:
# 2) - pull_request_target
# 3) on: [push, pull_request_target]
normalized = added_text.split("#", 1)[0].strip().lower()
if not normalized:
return False
if normalized.startswith("pull_request_target:"):
return True
if normalized == "- pull_request_target":
return True
if normalized.startswith("on:") and "[" in normalized and "]" in normalized:
return "pull_request_target" in normalized
return False
def run(cmd: list[str]) -> str:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise RuntimeError(f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{proc.stderr}")
return proc.stdout
def is_ci_path(path: str) -> bool:
return path in AUDIT_FILES or path.startswith(AUDIT_PREFIXES)
def is_workflow_yaml_path(path: str) -> bool:
return path.startswith(WORKFLOW_PATH_PREFIXES) and path.endswith(WORKFLOW_EXTENSIONS)
def is_shell_path(path: str) -> bool:
return path.endswith(SHELL_EXTENSIONS) or path.startswith(".githooks/")
def is_workflow_script_js_path(path: str) -> bool:
return path.startswith(".github/workflows/scripts/") and path.endswith(JS_EXTENSIONS)
def detect_unsafe_js_patterns(added_text: str) -> list[str]:
stripped = added_text.lstrip()
# Ignore comments for this policy check to reduce false positives in docs/comments.
if stripped.startswith("//") or stripped.startswith("/*") or stripped.startswith("*"):
return []
return [label for label, pattern in UNSAFE_JS_PATTERNS if pattern.search(added_text)]
@dataclass
class FileAudit:
path: str
status: str
added: int = 0
deleted: int = 0
added_actions: list[str] = field(default_factory=list)
unpinned_actions: list[str] = field(default_factory=list)
added_secret_refs: list[str] = field(default_factory=list)
added_pipe_to_shell: list[str] = field(default_factory=list)
added_write_permissions: list[str] = field(default_factory=list)
added_pull_request_target: int = 0
added_unsafe_js_patterns: list[str] = field(default_factory=list)
@property
def risk_level(self) -> str:
if (
self.unpinned_actions
or self.added_pipe_to_shell
or self.added_pull_request_target
or self.added_unsafe_js_patterns
or "write-all" in self.added_write_permissions
):
return "high"
if self.added_secret_refs or self.added_actions or self.added_write_permissions:
return "medium"
return "low"
def parse_changed_files(base_sha: str, head_sha: str) -> list[tuple[str, str]]:
out = run(["git", "diff", "--name-status", "--find-renames", base_sha, head_sha])
changed: list[tuple[str, str]] = []
for line in out.splitlines():
if not line.strip():
continue
parts = line.split("\t")
status = parts[0]
path = parts[-1]
changed.append((status, path))
return changed
def parse_numstat(base_sha: str, head_sha: str, path: str) -> tuple[int, int]:
out = run(["git", "diff", "--numstat", base_sha, head_sha, "--", path]).strip()
if not out:
return (0, 0)
parts = out.split("\t")
if len(parts) < 3:
return (0, 0)
add_raw, del_raw = parts[0], parts[1]
try:
added = 0 if add_raw == "-" else int(add_raw)
deleted = 0 if del_raw == "-" else int(del_raw)
except ValueError:
return (0, 0)
return (added, deleted)
def parse_patch_added_lines(base_sha: str, head_sha: str, path: str) -> Iterable[str]:
out = run(["git", "diff", "-U0", base_sha, head_sha, "--", path])
for line in out.splitlines():
if not line.startswith("+") or line.startswith("+++"):
continue
yield line
def action_is_pinned(action_ref: str) -> bool:
if action_ref.startswith("./"):
return True
if "@" not in action_ref:
return False
version = action_ref.rsplit("@", 1)[1]
return bool(SHA_PIN_RE.fullmatch(version))
def build_markdown(
audits: list[FileAudit],
*,
base_sha: str,
head_sha: str,
violations: list[str],
) -> str:
lines: list[str] = []
lines.append("# CI/CD Change Audit")
lines.append("")
lines.append(f"- Base SHA: `{base_sha}`")
lines.append(f"- Head SHA: `{head_sha}`")
lines.append(f"- Audited files: `{len(audits)}`")
lines.append(
f"- Policy violations: `{len(violations)}` "
"(currently: unpinned `uses:`, pipe-to-shell commands, broad "
"`permissions: write-all`, unsafe workflow-script JS execution patterns, "
"and new `pull_request_target` triggers)"
)
lines.append("")
if violations:
lines.append("## Violations")
for entry in violations:
lines.append(f"- {entry}")
lines.append("")
if not audits:
lines.append("No CI/CD files changed in this diff.")
return "\n".join(lines) + "\n"
lines.append("## File Summary")
lines.append("")
lines.append(
"| Path | Status | +Lines | -Lines | New Actions | New Secret Refs | "
"Pipe-to-Shell | Unsafe JS Patterns | New `*: write` | New `pull_request_target` | Risk |"
)
lines.append("| --- | --- | ---:| ---:| ---:| ---:| ---:| ---:| ---:| ---:| --- |")
for audit in sorted(audits, key=lambda x: x.path):
lines.append(
f"| `{audit.path}` | `{audit.status}` | {audit.added} | {audit.deleted} | "
f"{len(audit.added_actions)} | {len(audit.added_secret_refs)} | "
f"{len(audit.added_pipe_to_shell)} | {len(set(audit.added_unsafe_js_patterns))} | "
f"{len(set(audit.added_write_permissions))} | "
f"{audit.added_pull_request_target} | "
f"`{audit.risk_level}` |"
)
lines.append("")
medium_or_high = [a for a in audits if a.risk_level in {"medium", "high"}]
if medium_or_high:
lines.append("## Detailed Review Targets")
for audit in sorted(medium_or_high, key=lambda x: x.path):
lines.append(f"### `{audit.path}`")
if audit.added_actions:
lines.append("- Added `uses:` references:")
for action in audit.added_actions:
pin_state = "pinned" if action not in audit.unpinned_actions else "unpinned"
lines.append(f" - `{action}` ({pin_state})")
if audit.added_secret_refs:
lines.append("- Added secret references:")
for secret_key in sorted(set(audit.added_secret_refs)):
lines.append(f" - `secrets.{secret_key}`")
if audit.added_pipe_to_shell:
lines.append("- Added pipe-to-shell commands (high risk):")
for cmd in audit.added_pipe_to_shell:
lines.append(f" - `{cmd}`")
if audit.added_unsafe_js_patterns:
lines.append("- Added unsafe workflow-script JS patterns (high risk):")
for pattern_name in sorted(set(audit.added_unsafe_js_patterns)):
lines.append(f" - `{pattern_name}`")
if audit.added_write_permissions:
lines.append("- Added write permissions:")
for permission_name in sorted(set(audit.added_write_permissions)):
if permission_name == "write-all":
lines.append(" - `permissions: write-all`")
else:
lines.append(f" - `{permission_name}: write`")
if audit.added_pull_request_target:
lines.append("- Added `pull_request_target` trigger (high risk):")
lines.append(" - Review event payload usage and token scope carefully.")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Generate CI/CD change audit report.")
parser.add_argument("--base-sha", required=True, help="Base commit SHA")
parser.add_argument("--head-sha", default="HEAD", help="Head commit SHA (default: HEAD)")
parser.add_argument("--output-json", required=True, help="Output JSON path")
parser.add_argument("--output-md", required=True, help="Output Markdown path")
parser.add_argument(
"--fail-on-violations",
action="store_true",
help="Return non-zero when policy violations are found",
)
args = parser.parse_args()
try:
changed = parse_changed_files(args.base_sha, args.head_sha)
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
return 2
audits: list[FileAudit] = []
violations: list[str] = []
for status, path in changed:
if not is_ci_path(path):
continue
added, deleted = parse_numstat(args.base_sha, args.head_sha, path)
audit = FileAudit(path=path, status=status, added=added, deleted=deleted)
workflow_yaml = is_workflow_yaml_path(path)
shell_script = is_shell_path(path)
workflow_script_js = is_workflow_script_js_path(path)
for line in parse_patch_added_lines(args.base_sha, args.head_sha, path):
added_text = line[1:].strip()
uses_match = USES_RE.search(line)
if uses_match and workflow_yaml:
action_ref = uses_match.group(1).strip()
audit.added_actions.append(action_ref)
if not action_is_pinned(action_ref):
audit.unpinned_actions.append(action_ref)
violations.append(
f"{path}: unpinned action reference introduced -> `{action_ref}`"
)
for secret_name in SECRETS_RE.findall(line):
audit.added_secret_refs.append(secret_name)
if PIPE_TO_SHELL_RE.search(added_text) and (workflow_yaml or shell_script):
command = added_text[:220]
audit.added_pipe_to_shell.append(command)
violations.append(
f"{path}: pipe-to-shell command introduced -> `{command}`"
)
if workflow_script_js:
unsafe_matches = detect_unsafe_js_patterns(added_text)
for pattern_name in unsafe_matches:
audit.added_unsafe_js_patterns.append(pattern_name)
violations.append(
f"{path}: unsafe workflow-script JS pattern introduced -> `{pattern_name}`"
)
permission_match = PERMISSION_WRITE_RE.match(line)
if permission_match and workflow_yaml:
audit.added_write_permissions.append(permission_match.group(1))
if PERMISSIONS_WRITE_ALL_RE.match(line) and workflow_yaml:
audit.added_write_permissions.append("write-all")
violations.append(
f"{path}: `permissions: write-all` introduced; scope permissions minimally."
)
if line_adds_pull_request_target(added_text) and workflow_yaml:
audit.added_pull_request_target += 1
violations.append(
f"{path}: `pull_request_target` trigger introduced -> `{added_text[:180]}`; "
"manual security review required."
)
audits.append(audit)
summary = {
"total_changed_files": len(changed),
"audited_files": len(audits),
"added_lines": sum(a.added for a in audits),
"deleted_lines": sum(a.deleted for a in audits),
"new_actions": sum(len(a.added_actions) for a in audits),
"new_unpinned_actions": sum(len(a.unpinned_actions) for a in audits),
"new_secret_references": sum(len(a.added_secret_refs) for a in audits),
"new_pipe_to_shell_commands": sum(len(a.added_pipe_to_shell) for a in audits),
"new_unsafe_js_patterns": sum(len(set(a.added_unsafe_js_patterns)) for a in audits),
"new_write_permissions": sum(len(set(a.added_write_permissions)) for a in audits),
"new_pull_request_target_triggers": sum(a.added_pull_request_target for a in audits),
"violations": len(violations),
}
payload = {
"generated_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"base_sha": args.base_sha,
"head_sha": args.head_sha,
"summary": summary,
"files": [
{
"path": a.path,
"status": a.status,
"added": a.added,
"deleted": a.deleted,
"added_actions": a.added_actions,
"unpinned_actions": a.unpinned_actions,
"added_secret_refs": sorted(set(a.added_secret_refs)),
"added_pipe_to_shell": a.added_pipe_to_shell,
"added_unsafe_js_patterns": sorted(set(a.added_unsafe_js_patterns)),
"added_write_permissions": sorted(set(a.added_write_permissions)),
"added_pull_request_target": a.added_pull_request_target,
"risk_level": a.risk_level,
}
for a in sorted(audits, key=lambda x: x.path)
],
"violations": violations,
}
json_path = Path(args.output_json)
md_path = Path(args.output_md)
json_path.parent.mkdir(parents=True, exist_ok=True)
md_path.parent.mkdir(parents=True, exist_ok=True)
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
md_path.write_text(
build_markdown(audits, base_sha=args.base_sha, head_sha=args.head_sha, violations=violations),
encoding="utf-8",
)
if args.fail_on_violations and violations:
print("CI/CD change audit violations found:", file=sys.stderr)
for item in violations:
print(f"- {item}", file=sys.stderr)
return 3
return 0
if __name__ == "__main__":
raise SystemExit(main())