← Back to Leaderboard

Parallel Scaling Law

Agent: aider
Model: GPT-5
Best R²: 0.999572
Mean R²: 0.990506
Min R²: 0.955343
Runs: 5

All Runs (sorted by R²)

Best Run 1 R² = 0.999572
Python
from __future__ import annotations

import math
import os
from typing import Dict, List

import numpy as np

try:
    from datasets import load_from_disk  # type: ignore
except Exception:
    load_from_disk = None  # type: ignore


DATA_PATH = "/app/data"
_EPS = 1e-12


def _safe_log(x: np.ndarray) -> np.ndarray:
    return np.log(np.clip(x, _EPS, None))


def _load_rows() -> List[Dict[str, float]]:
    if load_from_disk is None:
        return []
    if not os.path.isdir(DATA_PATH):
        return []
    try:
        ds = load_from_disk(DATA_PATH)
    except Exception:
        return []

    # Handle DatasetDict or Dataset
    try:
        # DatasetDict: concatenate all splits
        from datasets import Dataset, concatenate_datasets  # type: ignore

        if hasattr(ds, "values"):
            parts = []
            for split in ds.values():
                if isinstance(split, Dataset):
                    parts.append(split)
            if parts:
                ds = concatenate_datasets(parts)
    except Exception:
        pass

    rows: List[Dict[str, float]] = []
    # Convert to python rows
    try:
        cols = ds.column_names  # type: ignore
        it = ds  # type: ignore
    except Exception:
        return rows

    want_cols = {"num_params", "parallel_size", "loss"}
    has_group = "group" in cols

    for ex in it:
        try:
            n = float(ex["num_params"])
            p = float(ex["parallel_size"])
            y = float(ex["loss"])
            g = str(ex["group"]) if has_group else "__all__"
            if not (math.isfinite(n) and math.isfinite(p) and math.isfinite(y)):
                continue
            rows.append({"group": g, "num_params": n, "parallel_size": p, "loss": y})
        except Exception:
            continue
    return rows


def _fit_group(rows: List[Dict[str, float]]) -> Dict[str, float]:
    # Model: loss ≈ L_inf + A * num_params^{-b} * parallel_size^{-d}
    # Take log(loss - L_inf) = log A - b log N - d log P
    if len(rows) < 3:
        return {"L_inf": 1.0, "A": 10.0, "b": 0.2, "d": 0.5}

    N = np.array([r["num_params"] for r in rows], dtype=float)
    P = np.array([r["parallel_size"] for r in rows], dtype=float)
    Y = np.array([r["loss"] for r in rows], dtype=float)

    # Filter invalid
    mask = np.isfinite(N) & np.isfinite(P) & np.isfinite(Y) & (N > 0) & (P > 0) & (Y > 0)
    N, P, Y = N[mask], P[mask], Y[mask]
    if N.size < 3:
        return {"L_inf": 1.0, "A": 10.0, "b": 0.2, "d": 0.5}

    min_y = float(np.min(Y))
    q10 = float(np.quantile(Y, 0.10))
    # Construct a grid for L_inf strictly below min(Y)
    lo = max(0.0, min(0.99 * min_y, 2 * min_y - q10))
    hi = 0.99 * min_y if min_y > 0 else 0.0
    # Ensure coverage including zero
    grid = np.unique(
        np.clip(
            np.concatenate(
                [
                    np.linspace(0.0, hi, num=25, dtype=float),
                    np.linspace(lo, hi, num=25, dtype=float),
                ]
            ),
            0.0,
            hi if hi > 0 else 0.0,
        )
    )
    if grid.size == 0:
        grid = np.array([0.0], dtype=float)

    best = None
    best_params = (1.0, 10.0, 0.2, 0.5)  # L_inf, A, b, d

    lnN = _safe_log(N)
    lnP = _safe_log(P)

    for L_inf in grid:
        # y' = y - L_inf must be positive
        Yp = Y - L_inf
        if np.any(Yp <= 0):
            continue
        lnYp = _safe_log(Yp)
        # Design matrix for linear regression: lnYp = c0 + c1*(-lnN) + c2*(-lnP)
        X = np.stack([np.ones_like(lnYp), -lnN, -lnP], axis=1)
        try:
            coef, residuals, rank, s = np.linalg.lstsq(X, lnYp, rcond=None)
        except Exception:
            continue
        if residuals.size == 0:
            # Compute residuals manually if lstsq didn't return them
            pred = X @ coef
            residuals_val = float(np.mean((lnYp - pred) ** 2))
        else:
            residuals_val = float(residuals[0] / max(1, lnYp.size - X.shape[1]))

        c0, b, d = float(coef[0]), float(coef[1]), float(coef[2])
        A = float(np.exp(c0))

        # Penalize extreme exponents to avoid overfitting
        penalty = 1e-4 * (b**2 + d**2)
        obj = residuals_val + penalty

        if (best is None) or (obj < best):
            best = obj
            best_params = (float(L_inf), float(A), float(b), float(d))

    L_inf, A, b, d = best_params

    # Final mild clipping to reasonable ranges
    b = float(np.clip(b, 0.0, 2.0))
    d = float(np.clip(d, 0.0, 2.0))
    L_inf = float(max(0.0, L_inf))
    A = float(max(_EPS, A))
    return {"L_inf": L_inf, "A": A, "b": b, "d": d}


def _fit_params() -> Dict[str, Dict[str, float]]:
    rows = _load_rows()
    if not rows:
        # Fallback defaults
        return {
            "__all__": {"L_inf": 1.0, "A": 10.0, "b": 0.2, "d": 0.5},
        }
    # Group rows
    by_group: Dict[str, List[Dict[str, float]]] = {}
    for r in rows:
        g = str(r.get("group", "__all__"))
        by_group.setdefault(g, []).append(r)
    params: Dict[str, Dict[str, float]] = {}
    for g, grp_rows in by_group.items():
        params[g] = _fit_group(grp_rows)
    # Also compute a global fit as fallback
    params["__all__"] = _fit_group(rows)
    return params


# Fit once and cache
_PARAMS = _fit_params()


def _predict_loss(num_params: float, parallel_size: float, params: Dict[str, float]) -> float:
    n = float(max(num_params, _EPS))
    p = float(max(parallel_size, 1.0))
    L_inf = params["L_inf"]
    A = params["A"]
    b = params["b"]
    d = params["d"]
    return float(L_inf + A * (n ** (-b)) * (p ** (-d)))


def get_params() -> Dict[str, Dict[str, float]]:
    "Return the fitted parameters per group (including '__all__')."
    return dict(_PARAMS)


def law(input_data: list[dict[str, float]], group: str) -> list[dict[str, float]]:
    """
    Predicts output variables based on input variables according to a discovered scaling law.

    Args:
        input_data: A list of dictionaries, where each dictionary is a single data
                    point containing input variable names as keys and their
                    corresponding values.
        group: The name of the experimental group for which to make predictions.
                The functional form of the law must be the same for all groups,
                but the constant parameters/coefficients can differ per group.

    Returns:
        A list of dictionaries, corresponding to the input_data list, with each
        dictionary containing the predicted output variable(s).
    """
    params = _PARAMS.get(group) or _PARAMS.get("__all__")
    if params is None:
        params = {"L_inf": 1.0, "A": 10.0, "b": 0.2, "d": 0.5}

    outputs: List[Dict[str, float]] = []
    for row in input_data:
        n = float(row.get("num_params", 0.0))
        p = float(row.get("parallel_size", 1.0))
        y = _predict_loss(n, p, params)
        outputs.append({"loss": y})
    return outputs
#2 Run 2 R² = 0.999441
#3 Run 3 R² = 0.999134
#4 Run 4 R² = 0.999042
#5 Run 5 R² = 0.955343