← Back to Leaderboard

Data-Constrained Scaling Law

Agent: aider
Model: GPT-5
Best R²: 0.963475
Mean R²: 0.717744
Min R²: 0.103650
Runs: 5

All Runs (sorted by R²)

Best Run 1 R² = 0.963475
Python
from __future__ import annotations

from typing import Dict, List
import math


def law(input_data: List[Dict[str, float]], group: str) -> List[Dict[str, float]]:
    """
    Predicts output variables based on input variables according to a discovered scaling law.

    Args:
        input_data: A list of dictionaries, where each dictionary is a single data
                    point containing input variable names as keys and their
                    corresponding values.
        group: The name of the experimental group for which to make predictions.
               The functional form of the law is the same for all groups,
               but the constant parameters/coefficients can differ per group.

    Returns:
        A list of dictionaries, corresponding to the input_data list, with each
        dictionary containing the predicted output variable(s).
    """
    # We keep the module limited to a single public function by placing helpers inside.
    import numpy as np

    # Fit-on-first-use and cache learned parameters on the function object
    if not hasattr(law, "_fitted"):

        def _safe_array(x):
            arr = np.asarray(x, dtype=float)
            # Avoid zeros/negatives that could cause under/overflow in power transforms
            return np.maximum(arr, 1e-12)

        def _as_dataset_array(ds, key: str) -> np.ndarray:
            return _safe_array(ds[key] if isinstance(ds[key], list) else list(ds[key]))

        def _kfold_indices(n: int, k: int = 5, rng: np.random.Generator | None = None):
            if n < k:
                # Degenerate: use leave-one-out if very small
                idx = np.arange(n)
                for i in range(n):
                    test_idx = idx[i : i + 1]
                    train_idx = np.delete(idx, i)
                    yield train_idx, test_idx
                return
            if rng is None:
                rng = np.random.default_rng(42)
            idx = np.arange(n)
            rng.shuffle(idx)
            folds = np.array_split(idx, k)
            for i in range(k):
                test_idx = folds[i]
                train_idx = np.concatenate([folds[j] for j in range(k) if j != i])
                yield train_idx, test_idx

        def _fit_group(y: np.ndarray, p: np.ndarray, t: np.ndarray, u: np.ndarray):
            # Grid over exponents for the three inverse power-law terms
            exp_grid = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0])
            best = {
                "rmse": np.inf,
                "alpha_p": 0.5,
                "alpha_t": 0.5,
                "alpha_u": 0.5,
                "coef": np.zeros(4),
            }
            n = y.shape[0]
            rng = np.random.default_rng(123)
            for ap in exp_grid:
                fp = np.power(p, -ap)
                for at in exp_grid:
                    ft = np.power(t, -at)
                    for au in exp_grid:
                        fu = np.power(u, -au)
                        # K-fold CV to pick exponents
                        rmses = []
                        for tr, te in _kfold_indices(n, k=5, rng=rng):
                            Xtr = np.column_stack(
                                [np.ones(tr.shape[0]), fp[tr], ft[tr], fu[tr]]
                            )
                            ytr = y[tr]
                            # OLS with small ridge to improve stability
                            XtX = Xtr.T @ Xtr
                            ridge = 1e-8 * np.eye(XtX.shape[0])
                            coef = np.linalg.solve(XtX + ridge, Xtr.T @ ytr)
                            Xte = np.column_stack(
                                [np.ones(te.shape[0]), fp[te], ft[te], fu[te]]
                            )
                            yhat = Xte @ coef
                            rmse = float(np.sqrt(np.mean((yhat - y[te]) ** 2)))
                            rmses.append(rmse)
                        mean_rmse = float(np.mean(rmses))
                        if mean_rmse < best["rmse"]:
                            # Refit on all data with chosen exponents
                            X = np.column_stack([np.ones(n), fp, ft, fu])
                            XtX = X.T @ X
                            ridge = 1e-8 * np.eye(XtX.shape[0])
                            coef = np.linalg.solve(XtX + ridge, X.T @ y)
                            best = {
                                "rmse": mean_rmse,
                                "alpha_p": float(ap),
                                "alpha_t": float(at),
                                "alpha_u": float(au),
                                "coef": coef,
                            }
            # Enforce non-negativity on contribution coefficients (except intercept)
            coef = best["coef"].copy()
            coef[1:] = np.maximum(coef[1:], 0.0)
            best["coef"] = coef
            return best

        def _load_training():
            try:
                from datasets import load_from_disk  # type: ignore
            except Exception:
                return None

            try:
                ds = load_from_disk("/app/data")
            except Exception:
                return None

            # Support both Dataset and DatasetDict
            records = []
            if hasattr(ds, "select"):  # Dataset
                records = [row for row in ds]
            elif isinstance(ds, dict) or hasattr(ds, "keys"):
                # Concatenate all splits
                for key in ds.keys():
                    split = ds[key]
                    records.extend([row for row in split])
            else:
                return None

            # Extract to simple arrays
            def _get_col(name: str, default=None):
                vals = [r.get(name, default) for r in records]
                return vals

            params = _get_col("params")
            tokens = _get_col("tokens")
            uniq = _get_col("unique_tokens")
            loss = _get_col("loss")
            grp = _get_col("group", "GLOBAL")

            # Validate essential fields
            if any(v is None for v in (params, tokens, uniq, loss)):
                return None

            return {
                "params": np.asarray(params, dtype=float),
                "tokens": np.asarray(tokens, dtype=float),
                "unique_tokens": np.asarray(uniq, dtype=float),
                "loss": np.asarray(loss, dtype=float),
                "group": np.asarray(grp),
            }

        # Default/fallback parameters
        law._params_by_group = {}  # type: ignore[attr-defined]
        data = _load_training()
        if data is not None:
            P = np.maximum(data["params"], 1e-12)
            T = np.maximum(data["tokens"], 1e-12)
            U = np.maximum(data["unique_tokens"], 1e-12)
            Y = np.asarray(data["loss"], dtype=float)
            G = data["group"].astype(str)

            # Fit per group
            unique_groups = sorted(list({g for g in G}))
            for g in unique_groups:
                mask = (G == g)
                if not np.any(mask):
                    continue
                best = _fit_group(Y[mask], P[mask], T[mask], U[mask])
                law._params_by_group[g] = {  # type: ignore[attr-defined]
                    "c": float(best["coef"][0]),
                    "b_p": float(best["coef"][1]),
                    "b_t": float(best["coef"][2]),
                    "b_u": float(best["coef"][3]),
                    "alpha_p": float(best["alpha_p"]),
                    "alpha_t": float(best["alpha_t"]),
                    "alpha_u": float(best["alpha_u"]),
                }

            # Also fit a GLOBAL model over all data for fallback
            best_global = _fit_group(Y, P, T, U)
            law._params_by_group["GLOBAL"] = {  # type: ignore[attr-defined]
                "c": float(best_global["coef"][0]),
                "b_p": float(best_global["coef"][1]),
                "b_t": float(best_global["coef"][2]),
                "b_u": float(best_global["coef"][3]),
                "alpha_p": float(best_global["alpha_p"]),
                "alpha_t": float(best_global["alpha_t"]),
                "alpha_u": float(best_global["alpha_u"]),
            }
        else:
            # If dataset is unavailable, fall back to a plausible generic prior.
            # Typical cross-entropy losses range ~1-5; choose a conservative baseline.
            law._params_by_group = {  # type: ignore[attr-defined]
                "GLOBAL": {
                    "c": 2.5,
                    "b_p": 1.0,
                    "b_t": 1.0,
                    "b_u": 0.5,
                    "alpha_p": 0.5,
                    "alpha_t": 0.5,
                    "alpha_u": 0.3,
                }
            }

        law._fitted = True  # type: ignore[attr-defined]

    # Retrieve parameters for the requested group; fall back to GLOBAL then any available group
    params_by_group = getattr(law, "_params_by_group", {})  # type: ignore[attr-defined]
    gkey = group if group in params_by_group else ("GLOBAL" if "GLOBAL" in params_by_group else (next(iter(params_by_group.keys())) if params_by_group else None))

    if gkey is None:
        # Absolute fallback if nothing is available
        model = {"c": 2.5, "b_p": 1.0, "b_t": 1.0, "b_u": 0.5, "alpha_p": 0.5, "alpha_t": 0.5, "alpha_u": 0.3}
    else:
        model = params_by_group[gkey]

    def _predict_one(x: Dict[str, float]) -> float:
        p = float(x.get("params", 1.0))
        t = float(x.get("tokens", 1.0))
        u = float(x.get("unique_tokens", 1.0))
        # Numerical guards
        p = max(p, 1e-12)
        t = max(t, 1e-12)
        u = max(u, 1e-12)

        # Inverse power-law contributions with group-specific exponents and weights:
        # loss = c + b_p * params^{-alpha_p} + b_t * tokens^{-alpha_t} + b_u * unique_tokens^{-alpha_u}
        val = (
            float(model["c"])
            + float(model["b_p"]) * (p ** (-float(model["alpha_p"])))
            + float(model["b_t"]) * (t ** (-float(model["alpha_t"])))
            + float(model["b_u"]) * (u ** (-float(model["alpha_u"])))
        )
        # Loss should be non-negative
        return max(0.0, float(val))

    return [{"loss": _predict_one(x)} for x in input_data]
#2 Run 2 R² = 0.884699
#3 Run 3 R² = 0.841468
#4 Run 4 R² = 0.795429
#5 Run 5 R² = 0.103650