agent-smith/packages/GLiNER2/benchmark_statistical.py
2026-03-06 12:59:32 +01:00

402 lines
15 KiB
Python

"""
Statistical benchmark with confidence intervals and p-values.
Micro-benchmarks: interleaved old/new in same process → paired t-test.
End-to-end: saves raw timings to JSON for cross-process Welch's t-test.
Usage:
# Baseline
git stash
python benchmark_statistical.py --tag baseline --n 300
git stash pop
# Optimized
python benchmark_statistical.py --tag optimized --n 300
# Compare
python benchmark_statistical.py --compare baseline optimized
"""
import argparse
import json
import math
import random
import time
import statistics
import sys
from collections import OrderedDict
import torch
from scipy import stats as sp_stats
# ─── Helpers ──────────────────────────────────────────────────────
def sync():
if torch.cuda.is_available():
torch.cuda.synchronize()
def ci95(data):
"""95% CI half-width using t-distribution."""
n = len(data)
if n < 2:
return 0.0
se = statistics.stdev(data) / math.sqrt(n)
t_crit = sp_stats.t.ppf(0.975, df=n - 1)
return t_crit * se
def collect(fn, n_warmup, n_iter):
"""Run fn with warmup, return list of times in ms."""
for _ in range(n_warmup):
fn()
sync()
times = []
for _ in range(n_iter):
sync()
t0 = time.perf_counter()
fn()
sync()
times.append((time.perf_counter() - t0) * 1000)
return times
def paired_test(old_times, new_times):
"""Paired t-test on matched samples. Returns (t_stat, p_value, mean_diff, ci95_diff)."""
diffs = [o - n for o, n in zip(old_times, new_times)]
n = len(diffs)
mean_d = statistics.mean(diffs)
se_d = statistics.stdev(diffs) / math.sqrt(n)
t_stat = mean_d / se_d if se_d > 0 else 0
p_val = 2 * sp_stats.t.sf(abs(t_stat), df=n - 1)
hw = ci95(diffs)
return t_stat, p_val, mean_d, hw
def welch_test(a, b):
"""Welch's t-test (unequal variance). Returns (t_stat, p_value)."""
t_stat, p_val = sp_stats.ttest_ind(a, b, equal_var=False)
return t_stat, p_val
def fmt_p(p):
if p < 0.001:
return f"{p:.2e}"
return f"{p:.4f}"
# ─── End-to-end benchmark ────────────────────────────────────────
def run_e2e(n_iter, n_warmup):
"""Run end-to-end scenarios, return dict of {name: [times]}."""
from gliner2 import GLiNER2
device = "cuda" if torch.cuda.is_available() else "cpu"
model = GLiNER2.from_pretrained("fastino/gliner2-base-v1")
model = model.to(device)
model.eval()
text1 = "Apple CEO Tim Cook announced the iPhone 15 launch in Cupertino on September 12, 2023."
ents = ["company", "person", "product", "location", "date"]
texts8 = [
"Apple CEO Tim Cook announced the iPhone 15 launch in Cupertino.",
"Google's Sundar Pichai spoke at the conference in Mountain View.",
"Microsoft released Windows 11 in Redmond last year.",
"Amazon founder Jeff Bezos invested in Blue Origin in Seattle.",
"Tesla CEO Elon Musk unveiled the Cybertruck at the Fremont factory.",
"Meta's Mark Zuckerberg presented Quest 3 in Menlo Park.",
"NVIDIA's Jensen Huang showcased the H100 GPU at GTC in San Jose.",
"OpenAI CEO Sam Altman launched GPT-4 in San Francisco.",
]
long_text = (
"Apple Inc., headquartered in Cupertino, California, is a multinational technology company "
"founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in April 1976. The company designs, "
"develops, and sells consumer electronics, computer software, and online services. Tim Cook "
"has served as CEO since August 2011. Apple's main products include the iPhone, iPad, Mac, "
"Apple Watch, and AirPods. The company also operates services including the App Store, "
"Apple Music, iCloud, and Apple TV Plus. In 2023, Apple reported annual revenue of $383 "
"billion, making it the world's largest technology company by revenue. The company employs "
"over 160,000 people worldwide."
)
ents6 = ["company", "person", "product", "location", "date", "monetary_value"]
text_struct = "John Smith, aged 35, is a software engineer at Google in Mountain View."
schema_struct = model.create_schema()
schema_struct.structure("person").field("name").field("age").field("job_title").field("company").field("location")
text_rel = "Apple CEO Tim Cook announced the iPhone 15 launch in Cupertino on September 12."
rels = ["CEO_of", "located_in", "announced_on"]
results = OrderedDict()
scenarios = [
("single_entity", lambda: model.extract_entities(text1, ents)),
("single_structure", lambda: model.extract(text_struct, schema_struct)),
("single_relation", lambda: model.extract_relations(text_rel, rels)),
("batch8_entity", lambda: model.batch_extract_entities(texts8, ents, batch_size=8)),
("long_text_entity", lambda: model.extract_entities(long_text, ents6)),
]
for name, fn in scenarios:
print(f" Running {name} (n={n_iter})...", end=" ", flush=True)
times = collect(fn, n_warmup, n_iter)
results[name] = times
m, hw = statistics.mean(times), ci95(times)
print(f"{m:.2f} ± {hw:.2f} ms")
return results
# ─── Micro-benchmarks (interleaved old/new) ──────────────────────
def run_micro(n_iter, n_warmup):
"""Run micro-benchmarks with interleaved old/new for paired comparison."""
import copy
from gliner2 import GLiNER2
from gliner2.training.trainer import ExtractorCollator
from torch.utils.data import DataLoader
device = "cuda" if torch.cuda.is_available() else "cpu"
model = GLiNER2.from_pretrained("fastino/gliner2-base-v1")
model = model.to(device)
model.eval()
tokenizer = model.processor.tokenizer
results = OrderedDict()
# --- OPT-1: Token ID lookup ---
special_set_str = {"[P]", "[C]", "[E]", "[R]", "[L]"}
special_ids = frozenset(tokenizer.convert_tokens_to_ids(t) for t in special_set_str)
dummy_ids = list(range(200))
def opt1_old():
for tid in dummy_ids:
tok = tokenizer.convert_ids_to_tokens(tid)
_ = tok in special_set_str
def opt1_new():
for tid in dummy_ids:
_ = tid in special_ids
print(" OPT-1 Token ID lookup...", end=" ", flush=True)
old_t, new_t = _interleaved(opt1_old, opt1_new, n_warmup, n_iter)
results["OPT-1 Token ID lookup"] = {"old": old_t, "new": new_t}
_print_paired(old_t, new_t)
# --- OPT-3: Avoid retokenization ---
test_text = "Apple CEO Tim Cook announced the iPhone 15 launch in Cupertino on September 12."
dummy_map = list(range(15))
def opt3_old():
return len(model.processor._tokenize_text(test_text))
def opt3_new():
return len(dummy_map)
print(" OPT-3 Avoid retokenization...", end=" ", flush=True)
old_t, new_t = _interleaved(opt3_old, opt3_new, n_warmup, n_iter)
results["OPT-3 Avoid retokenization"] = {"old": old_t, "new": new_t}
_print_paired(old_t, new_t)
# --- OPT-4: Deepcopy ---
schema_dict = {
"json_structures": [{"person": {"name": "", "age": "", "job": ""}}],
"entities": {"company": "", "location": ""},
"relations": [], "classifications": [],
}
record = {"text": "Apple CEO Tim Cook announced iPhone 15." * 3, "schema": schema_dict}
def opt4_old():
return copy.deepcopy(record)
def opt4_new():
return {"text": record["text"], "schema": copy.deepcopy(record["schema"])}
print(" OPT-4 Deepcopy...", end=" ", flush=True)
old_t, new_t = _interleaved(opt4_old, opt4_new, n_warmup, n_iter)
results["OPT-4 Deepcopy"] = {"old": old_t, "new": new_t}
_print_paired(old_t, new_t)
# --- OPT-6: Token cache ---
special_tokens = ["[SEP_STRUCT]", "[SEP_TEXT]", "[P]", "[C]", "[E]", "[R]", "[L]",
"[EXAMPLE]", "[OUTPUT]", "[DESCRIPTION]", "(", ")", ",", "|"]
cache = {tok: tokenizer.tokenize(tok) for tok in special_tokens}
test_tokens = special_tokens * 10
def opt6_old():
for tok in test_tokens:
tokenizer.tokenize(tok)
def opt6_new():
for tok in test_tokens:
if tok in cache:
_ = cache[tok]
else:
tokenizer.tokenize(tok)
print(" OPT-6 Token cache...", end=" ", flush=True)
old_t, new_t = _interleaved(opt6_old, opt6_new, n_warmup, n_iter)
results["OPT-6 Token cache"] = {"old": old_t, "new": new_t}
_print_paired(old_t, new_t)
# --- OPT-12: Skip DataLoader ---
collator = ExtractorCollator(model.processor, is_training=False)
text_norm = "Apple CEO Tim Cook announced the iPhone 15 launch in Cupertino on September 12, 2023."
schema_e = model.create_schema().entities(["company", "person", "product", "location", "date"])
sd = schema_e.build()
for c in sd.get("classifications", []):
c.setdefault("true_label", ["N/A"])
small_dataset = [(text_norm, sd)]
def opt12_old():
loader = DataLoader(small_dataset, batch_size=8, shuffle=False,
num_workers=0, collate_fn=collator)
return list(loader)
def opt12_new():
return [collator(small_dataset)]
print(" OPT-12 Skip DataLoader...", end=" ", flush=True)
old_t, new_t = _interleaved(opt12_old, opt12_new, n_warmup, n_iter)
results["OPT-12 Skip DataLoader"] = {"old": old_t, "new": new_t}
_print_paired(old_t, new_t)
return results
def _interleaved(old_fn, new_fn, n_warmup, n_iter):
"""Run old/new interleaved to eliminate ordering effects. Returns paired lists."""
# Warmup both
for _ in range(n_warmup):
old_fn()
new_fn()
sync()
old_times = []
new_times = []
for _ in range(n_iter):
# Randomize order each iteration to eliminate systematic bias
if random.random() < 0.5:
sync(); t0 = time.perf_counter(); old_fn(); sync()
old_times.append((time.perf_counter() - t0) * 1000)
sync(); t0 = time.perf_counter(); new_fn(); sync()
new_times.append((time.perf_counter() - t0) * 1000)
else:
sync(); t0 = time.perf_counter(); new_fn(); sync()
new_times.append((time.perf_counter() - t0) * 1000)
sync(); t0 = time.perf_counter(); old_fn(); sync()
old_times.append((time.perf_counter() - t0) * 1000)
return old_times, new_times
def _print_paired(old_t, new_t):
m_old, m_new = statistics.mean(old_t), statistics.mean(new_t)
t_stat, p_val, mean_diff, hw = paired_test(old_t, new_t)
speedup = m_old / m_new if m_new > 0 else float('inf')
print(f"{m_old:.4f} -> {m_new:.4f} ms ({speedup:.1f}x) "
f"diff={mean_diff:.4f}±{hw:.4f}ms p={fmt_p(p_val)}")
# ─── Compare mode ────────────────────────────────────────────────
def compare(baseline_path, optimized_path):
"""Compare two end-to-end result files with Welch's t-test."""
with open(baseline_path) as f:
baseline = json.load(f)
with open(optimized_path) as f:
optimized = json.load(f)
print(f"\nBaseline: {baseline_path} (device={baseline['device']}, n={baseline.get('n', '?')})")
print(f"Optimized: {optimized_path} (device={optimized['device']}, n={optimized.get('n', '?')})")
print(f"\n{'Scenario':<25} {'Baseline':>18} {'Optimized':>18} {'Diff':>14} {'Speedup':>8} {'p-value':>10}")
print("=" * 100)
for name in baseline["e2e"]:
b = baseline["e2e"][name]
o = optimized["e2e"][name]
m_b, ci_b = statistics.mean(b), ci95(b)
m_o, ci_o = statistics.mean(o), ci95(o)
diff = m_b - m_o
diff_ci = math.sqrt(ci_b**2 + ci_o**2) # approximate CI of difference
speedup = m_b / m_o if m_o > 0 else float('inf')
t_stat, p_val = welch_test(b, o)
sig = "*" if p_val < 0.05 else " "
if p_val < 0.01:
sig = "**"
if p_val < 0.001:
sig = "***"
print(f"{name:<25} {m_b:>7.2f}±{ci_b:>5.2f}ms {m_o:>7.2f}±{ci_o:>5.2f}ms "
f"{diff:>+6.2f}±{diff_ci:>4.2f}ms {speedup:>7.3f}x {fmt_p(p_val):>9}{sig}")
# Micro-benchmarks (if present in optimized)
if "micro" in optimized:
print(f"\n{'Component':<30} {'Old':>16} {'New':>16} {'Diff (paired)':>18} {'Speedup':>8} {'p-value':>10}")
print("=" * 105)
for name, data in optimized["micro"].items():
old_t = data["old"]
new_t = data["new"]
m_old, ci_old = statistics.mean(old_t), ci95(old_t)
m_new, ci_new = statistics.mean(new_t), ci95(new_t)
t_stat, p_val, mean_diff, hw = paired_test(old_t, new_t)
speedup = m_old / m_new if m_new > 0 else float('inf')
sig = "*" if p_val < 0.05 else " "
if p_val < 0.01: sig = "**"
if p_val < 0.001: sig = "***"
print(f"{name:<30} {m_old:>6.4f}±{ci_old:>6.4f}ms {m_new:>6.4f}±{ci_new:>6.4f}ms "
f"{mean_diff:>+7.4f}±{hw:>6.4f}ms {speedup:>7.1f}x {fmt_p(p_val):>9}{sig}")
# ─── Main ────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--tag", help="Tag for this run (baseline or optimized)")
parser.add_argument("--n", type=int, default=300, help="Iterations per scenario")
parser.add_argument("--warmup", type=int, default=10, help="Warmup iterations")
parser.add_argument("--compare", nargs=2, metavar=("BASELINE", "OPTIMIZED"),
help="Compare two result files")
args = parser.parse_args()
if args.compare:
compare(
f"bench_stats_{args.compare[0]}.json",
f"bench_stats_{args.compare[1]}.json"
)
return
if not args.tag:
parser.error("--tag is required (or use --compare)")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")
print(f"Iterations: {args.n}, Warmup: {args.warmup}\n")
output = {"tag": args.tag, "device": device, "n": args.n}
# End-to-end
print("END-TO-END BENCHMARKS")
print("-" * 60)
e2e = run_e2e(args.n, args.warmup)
output["e2e"] = e2e
# Micro-benchmarks (only meaningful for optimized run since we inline both versions)
print("\nCOMPONENT MICRO-BENCHMARKS (interleaved old/new)")
print("-" * 60)
micro = run_micro(args.n, args.warmup)
output["micro"] = {k: v for k, v in micro.items()}
out_path = f"bench_stats_{args.tag}.json"
with open(out_path, "w") as f:
json.dump(output, f)
print(f"\nRaw timings saved to {out_path}")
if __name__ == "__main__":
main()