# EVOLVE-BLOCK-START
"""
Scaling law discovery for LLM finetuning scenarios
Improved program using robust non-linear least squares (TRF) with soft_l1 loss.
Uses a decoupled additive scaling law model: L = bias + c1 * N^(-alpha1) + c2 * N^(-alpha2) * E^(-beta).
Implements a systematic grid-based initialization strategy to find the best starting basin.
"""
import numpy as np
from scipy.optimize import least_squares
def scaling_law_func(data_points, params):
"""
Predicts validation loss based on MoE parameters.
Model: L = bias + c1 * N^(-alpha1) + c2 * N^(-alpha2) * E^(-beta)
Inputs:
data_points: (N_samples, 2) array [num_experts, dense_parameter_count]
params: (6,) or (M, 6) array [bias, c1, alpha1, c2, alpha2, beta]
"""
X = np.atleast_2d(np.asarray(data_points))
params = np.asarray(params)
# Handle batch of parameters
if params.ndim == 1:
params = params[None, :]
# Features
# col 0: num_experts (E)
# col 1: dense_parameter_count (N)
E = X[:, 0]
N = X[:, 1]
# Normalize N for numerical stability
# N is typically 1e8 to 8e8. Normalizing by 1e9 puts it in [0.1, 0.8].
N_norm = N / 1e9
# Unpack parameters
# Use abs() to ensure the function is valid even if optimizer explores negatives
bias = params[:, 0]
c1 = np.abs(params[:, 1])
alpha1 = np.abs(params[:, 2])
c2 = np.abs(params[:, 3])
alpha2 = np.abs(params[:, 4])
beta = np.abs(params[:, 5])
# Term 1: Dense scaling component
# c1 * N^-alpha1
# Adding small epsilon to base for safety, though N_norm is usually > 0.1
term1 = c1[:, None] * ((N_norm[None, :] + 1e-9) ** -alpha1[:, None])
# Term 2: Expert scaling component
# c2 * N^-alpha2 * E^-beta
term2 = c2[:, None] * ((N_norm[None, :] + 1e-9) ** -alpha2[:, None]) * (E[None, :] ** -beta[:, None])
# Combine terms
pred = bias[:, None] + term1 + term2
# Shape handling
pred = pred.T
return pred[:, 0] if pred.shape[1] == 1 else pred
def fit_scaling_law(data_points, loss_values):
"""
Fits the scaling law parameters using Trust Region Reflective (TRF) algorithm.
Uses a systematic initialization strategy scanning bias and dense scaling.
"""
X = np.atleast_2d(np.asarray(data_points))
y = np.asarray(loss_values)
if y.ndim == 1:
y2d = y[:, None]
else:
y2d = y
n_targets = y2d.shape[1]
results = []
# Pre-compute features for initialization
E = X[:, 0]
N_norm = X[:, 1] / 1e9
# Identify dense subset (min experts) for initialization
min_E = np.min(E)
mask_dense = (E == min_E)
if np.sum(mask_dense) < 3:
# Fallback to low expert count
thresh = np.percentile(E, 25)
mask_dense = E <= thresh
if np.sum(mask_dense) < 3:
mask_dense = np.ones_like(E, dtype=bool)
for i in range(n_targets):
y_curr = y2d[:, i]
min_loss = np.min(y_curr)
candidates = []
# --- Strategy 1: Grid search on Bias + Log-Linear fit ---
# We assume L ~ bias + c1*N^-a1 for dense data
bias_grid = [min_loss - eps for eps in [0.02, 0.05, 0.1, 0.2, 0.5]]
bias_grid = [b for b in bias_grid if b > 0]
if not bias_grid: bias_grid = [max(0.0, min_loss - 0.1)]
for b_guess in bias_grid:
# 1. Fit dense term on subset
y_shift = y_curr[mask_dense] - b_guess
valid = y_shift > 1e-5
if np.sum(valid) < 2: continue
log_y = np.log(y_shift[valid])
log_N = np.log(N_norm[mask_dense][valid])
# Linear fit: log_y = log_c1 - alpha1 * log_N
A = np.vstack([np.ones(len(log_y)), -log_N]).T
try:
sol, _, _, _ = np.linalg.lstsq(A, log_y, rcond=None)
c1_est = np.exp(sol[0])
a1_est = np.clip(sol[1], 0.01, 4.0)
except:
c1_est = 1.0
a1_est = 0.5
# 2. Fit expert term on residuals of full dataset
# Residual R = y - (bias + c1*N^-a1)
# Model R ~ c2 * N^-a2 * E^-beta
term1 = c1_est * (N_norm ** -a1_est)
residuals = y_curr - b_guess - term1
valid_res = residuals > 1e-5
if np.sum(valid_res) > 5:
log_R = np.log(residuals[valid_res])
log_N_res = np.log(N_norm[valid_res])
log_E_res = np.log(E[valid_res])
# log R = log c2 - a2*log N - beta*log E
A2 = np.vstack([np.ones(len(log_R)), -log_N_res, -log_E_res]).T
try:
sol2, _, _, _ = np.linalg.lstsq(A2, log_R, rcond=None)
c2_est = np.exp(sol2[0])
a2_est = np.clip(sol2[1], 0.01, 4.0)
beta_est = np.clip(sol2[2], 0.0, 3.0)
candidates.append([b_guess, c1_est, a1_est, c2_est, a2_est, beta_est])
except:
candidates.append([b_guess, c1_est, a1_est, c1_est*0.2, a1_est, 0.5])
else:
# If residuals are negative/small, maybe expert term is negligible or we overshot bias
candidates.append([b_guess, c1_est, a1_est, 0.0, a1_est, 0.0])
# --- Strategy 2: Physics Priors / Fallbacks ---
candidates.append([min_loss * 0.9, 1.0, 0.5, 1.0, 0.5, 0.5])
candidates.append([min_loss * 0.5, 5.0, 1.0, 2.0, 1.0, 0.5])
# Deduplicate
unique_candidates = []
seen = []
for c in candidates:
c_arr = np.array(c)
if not any(np.allclose(c_arr, s, rtol=0.1, atol=0.1) for s in seen):
unique_candidates.append(c)
seen.append(c_arr)
# --- Optimization ---
def res_func(p):
return scaling_law_func(X, p) - y_curr
# Bounds: bias [0, min_loss], others [0, inf], exponents [0, 6]
lb = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
ub = [min_loss, np.inf, 6.0, np.inf, 6.0, 6.0]
best_params = None
best_cost = np.inf
for guess in unique_candidates:
try:
# Clip guess to bounds (with small margin for bias)
p0 = np.clip(guess, lb, ub)
p0[0] = min(p0[0], min_loss - 1e-4)
res = least_squares(
res_func,
p0,
bounds=(lb, ub),
method='trf',
loss='soft_l1',
f_scale=0.05, # Robustness scale
max_nfev=500,
xtol=1e-7,
ftol=1e-7
)
if res.cost < best_cost:
best_cost = res.cost
best_params = res.x
except Exception:
continue
if best_params is None:
best_params = np.array([min_loss*0.9, 1.0, 0.5, 1.0, 0.5, 0.5])
results.append(best_params)
return np.array(results) if n_targets > 1 else results[0]
# EVOLVE-BLOCK-END