# EVOLVE-BLOCK-START
"""
Robust Scaling Law Discovery using Shifted Power Law: L(D) = A * (D + D0)^(-alpha) + E.
Optimization Pipeline:
1. Vectorized Variable Projection:
- Performs a dense grid search over non-linear parameters (alpha, D0).
- Analytically solves for linear parameters (A, E) using constrained least squares (A>=0, E>=0).
- Identifies the global basin of attraction and the best "pure" power law (D0=0) basin.
2. Dual-Model Refinement:
- Refines the best 4-parameter candidate (A, alpha, E, D0).
- Refines the best 3-parameter candidate (A, alpha, E, D0=0).
- Uses Trust Region Reflective (TRF) optimization with physical bounds.
3. Statistical Model Selection:
- Uses AICc (Corrected Akaike Information Criterion) to select the most parsimonious model.
- Includes a bias against negligible D0 parameters to prevent over-parameterization.
"""
import numpy as np
from scipy.optimize import curve_fit
def scaling_law_func(data_points, params):
# data_points: (N, 1) array
# params: [A, alpha, E, D0]
X = np.atleast_2d(np.asarray(data_points))
x = X[:, 0]
params = np.asarray(params)
if params.ndim == 1:
params = params[None, :]
# Pad to 4 params if needed
if params.shape[1] < 4:
p_new = np.zeros((params.shape[0], 4))
p_new[:, :params.shape[1]] = params
params = p_new
A, alpha, E, D0 = params[:, 0], params[:, 1], params[:, 2], params[:, 3]
# Model: L = A * (x + D0)^-alpha + E
# Safe computation for base
base = np.maximum(x[:, None] + D0[None, :], 1e-10)
pred = A[None, :] * np.power(base, -alpha[None, :]) + E[None, :]
if pred.shape[1] == 1:
return pred[:, 0]
return pred
def fit_scaling_law(data_points, loss_values):
X = np.atleast_2d(np.asarray(data_points))
x = X[:, 0].astype(float)
y_in = np.asarray(loss_values)
y_2d = y_in[:, None] if y_in.ndim == 1 else y_in
results = np.zeros((y_2d.shape[1], 4))
# Normalize inputs for numerical stability
x_max = np.max(x) if x.size > 0 else 1.0
x_n = x / x_max
N = x_n.shape[0]
# --- Stage 1: Vectorized Grid Search (Variable Projection) ---
# We search over (alpha, D0) and solve for (A, E) analytically.
# Alpha grid: Dense in common scaling range [0, 1.0]
alphas = np.concatenate([
np.linspace(0.005, 1.0, 40),
np.linspace(1.1, 3.0, 10)
])
# D0 grid: 0.0 and log-spaced small values.
# Large D0 (>>1) behaves linearly/constantly, which we want to avoid unless necessary.
d0s = np.concatenate([
[0.0],
np.logspace(-5, 0.5, 24) # up to ~3.16 * x_max
])
# Create meshgrid: (G_alpha, G_d0)
aa, dd = np.meshgrid(alphas, d0s, indexing='ij')
aa_flat = aa.ravel()
dd_flat = dd.ravel()
# Compute Basis Functions H: (G, N)
# H[g, i] = (x_i + d0_g)^(-alpha_g)
base = x_n[None, :] + dd_flat[:, None]
base = np.maximum(base, 1e-10)
H = np.power(base, -aa_flat[:, None])
# Precompute Determinant terms for Linear Regression
Sum_H = np.sum(H, axis=1) # (G,)
Sum_H2 = np.sum(H**2, axis=1) # (G,)
Det = N * Sum_H2 - Sum_H**2 # Determinant of X^T X
valid_det = Det > 1e-10
# Pre-allocate arrays for loop
G = len(aa_flat)
for i in range(y_2d.shape[1]):
y = y_2d[:, i]
y_max = np.max(y) if np.max(y) > 0 else 1.0
y_n = y / y_max
min_y = np.min(y_n)
# Target specific sums
Sum_Y = np.sum(y_n)
Sum_HY = np.sum(H * y_n[None, :], axis=1)
# Initialize storage for grid results
RSS = np.full(G, np.inf)
A_est = np.zeros(G)
E_est = np.zeros(G)
# --- Vectorized Solve ---
if np.any(valid_det):
idx = np.where(valid_det)[0]
# 1. Unconstrained OLS solution
# Cramer's rule for system [[Sum_H2, Sum_H], [Sum_H, N]] * [A, E]^T = [Sum_HY, Sum_Y]^T
# A = (N * Sum_HY - Sum_H * Sum_Y) / Det
A_u = (N * Sum_HY[idx] - Sum_H[idx] * Sum_Y) / Det[idx]
E_u = (Sum_Y - A_u * Sum_H[idx]) / N
# 2. Apply Constraints (A >= 0, E >= 0)
# Mask where unconstrained is valid
mask_valid = (A_u >= 0) & (E_u >= 0)
# Mask where E < 0 (Active constraint E=0)
# Re-solve A = Sum_HY / Sum_H2
mask_neg_E = (E_u < 0) & (A_u >= 0)
# Initialize temp arrays
A_temp = np.zeros_like(A_u)
E_temp = np.zeros_like(E_u)
# Fill valid
A_temp[mask_valid] = A_u[mask_valid]
E_temp[mask_valid] = E_u[mask_valid]
# Fill E=0 case
if np.any(mask_neg_E):
A_re = Sum_HY[idx][mask_neg_E] / (Sum_H2[idx][mask_neg_E] + 1e-12)
A_temp[mask_neg_E] = np.maximum(A_re, 0)
E_temp[mask_neg_E] = 0.0
# Handle A < 0 or remaining cases (Set A=0, E=mean(y))
# If A_u < 0, it means correlation is negative (loss increasing), so flat line is best fit.
mask_neg_A = (A_u < 0) | ((A_temp == 0) & (E_temp == 0) & (~mask_neg_E))
if np.any(mask_neg_A):
A_temp[mask_neg_A] = 0.0
E_temp[mask_neg_A] = Sum_Y / N
# Store estimates
A_est[idx] = A_temp
E_est[idx] = E_temp
# Compute RSS
Pred = A_temp[:, None] * H[idx] + E_temp[:, None]
RSS[idx] = np.sum((Pred - y_n[None, :])**2, axis=1)
# --- Select Initial Points ---
# 1. Best global point (for 4-param model)
best_idx = np.argmin(RSS)
p_init_4 = [A_est[best_idx], aa_flat[best_idx], E_est[best_idx], dd_flat[best_idx]]
rss_grid_4 = RSS[best_idx]
# 2. Best point with D0 = 0 (for 3-param model)
mask_d0_0 = (dd_flat == 0.0)
if np.any(mask_d0_0):
idx_3 = np.where(mask_d0_0)[0]
best_local_3 = np.argmin(RSS[idx_3])
best_idx_3 = idx_3[best_local_3]
p_init_3 = [A_est[best_idx_3], aa_flat[best_idx_3], E_est[best_idx_3]]
else:
p_init_3 = [p_init_4[0], p_init_4[1], p_init_4[2]] # Fallback
# --- Stage 2: Dual-Model Refinement ---
def model_4(x_v, a, alf, e, d0):
return a * np.power(np.maximum(x_v + d0, 1e-10), -alf) + e
def model_3(x_v, a, alf, e):
return a * np.power(np.maximum(x_v, 1e-10), -alf) + e
# Fit 4-Parameter Model
# Bounds: A>=0, alpha in [0, 5], E near min, D0 reasonable
bounds_4 = ([0, 0, 0, 0], [np.inf, 5.0, min_y + 0.1, 10.0])
try:
popt_4, _ = curve_fit(model_4, x_n, y_n, p0=p_init_4, bounds=bounds_4, method='trf', maxfev=1000)
rss_4 = np.sum((model_4(x_n, *popt_4) - y_n)**2)
except:
popt_4 = p_init_4
rss_4 = rss_grid_4
# Fit 3-Parameter Model (D0=0)
bounds_3 = ([0, 0, 0], [np.inf, 5.0, min_y + 0.1])
try:
popt_3, _ = curve_fit(model_3, x_n, y_n, p0=p_init_3, bounds=bounds_3, method='trf', maxfev=1000)
rss_3 = np.sum((model_3(x_n, *popt_3) - y_n)**2)
except:
popt_3 = p_init_3
rss_3 = np.inf
# --- Stage 3: AICc Model Selection ---
def get_aicc(rss, k, n_samples):
if rss <= 1e-20: return -np.inf
if n_samples <= k + 1: return np.inf
aic = n_samples * np.log(rss / n_samples) + 2 * k
correction = (2 * k * (k + 1)) / (n_samples - k - 1)
return aic + correction
aicc_4 = get_aicc(rss_4, 4, N)
aicc_3 = get_aicc(rss_3, 3, N)
# Penalize 4-param model if D0 is effectively zero
if popt_4[3] < 1e-4:
aicc_4 += 10.0
if aicc_4 < aicc_3:
final_p = popt_4
else:
final_p = list(popt_3) + [0.0]
# Denormalize
A_n, alf, E_n, D0_n = final_p
results[i] = [
A_n * y_max * np.power(x_max, alf),
alf,
E_n * y_max,
D0_n * x_max
]
return results[0] if y_in.ndim == 1 else results
# EVOLVE-BLOCK-END