# EVOLVE-BLOCK-START
"""
Optimized scaling law: loss = a * N^b + c / (1 + d * P)
Uses hyperbolic decay for parallel benefits with streamlined fitting
Key: Simpler code with better parameter estimation
"""
import numpy as np
from scipy.optimize import minimize, differential_evolution
def scaling_law_func(data_points, params):
"""
Scaling law: loss = a * (N/1e9)^b + c / (1 + d * P)
params[0] = a: base scale for power law
params[1] = b: power exponent (negative)
params[2] = c: parallel benefit scale
params[3] = d: parallel saturation rate
"""
data_points = np.atleast_2d(np.asarray(data_points))
params = np.asarray(params).ravel()
N_norm = np.maximum(data_points[:, 0] / 1e9, 0.1)
P = np.maximum(data_points[:, 1], 1.0)
a, b, c, d = params
# Power law + hyperbolic parallel benefit
loss = a * np.power(N_norm, b) + c / (1.0 + d * P)
return loss
def fit_scaling_law(data_points, loss_values):
"""
Streamlined hybrid optimization
"""
data_points = np.atleast_2d(np.asarray(data_points))
loss_values = np.asarray(loss_values).ravel()
P = data_points[:, 1]
# Data-driven estimates
high_p = P >= np.percentile(P, 70)
low_p = P <= np.percentile(P, 30)
baseline = np.mean(loss_values[high_p]) if np.any(high_p) else np.min(loss_values)
parallel_gain = (np.mean(loss_values[low_p]) - baseline) if np.any(low_p) and np.any(high_p) else np.std(loss_values) * 0.5
parallel_gain = max(parallel_gain, 0.01)
mean_loss = np.mean(loss_values)
max_loss = np.max(loss_values)
def objective(params):
pred = scaling_law_func(data_points, params)
mse = np.mean((pred - loss_values) ** 2)
reg = 1e-8 * (params[0]**2 + params[2]**2)
return mse + reg
# Optimized bounds
bounds = [
(baseline * 0.3, max_loss * 2), # a
(-0.4, 0.05), # b
(0.001, parallel_gain * 3), # c
(0.001, 5.0) # d
]
best_params = None
best_score = float('inf')
# Global search
try:
res = differential_evolution(
objective, bounds, seed=42,
maxiter=400, popsize=15,
atol=1e-10, tol=1e-10,
workers=1, polish=True
)
best_params = res.x
best_score = res.fun
except:
pass
# Strategic local searches
inits = [
[baseline * 1.05, -0.06, parallel_gain * 0.9, 0.35],
[baseline * 0.95, -0.08, parallel_gain * 1.1, 0.45],
[mean_loss * 0.9, -0.05, parallel_gain * 0.7, 0.25],
[baseline * 1.2, -0.09, parallel_gain * 1.3, 0.6],
]
if best_params is not None:
inits.insert(0, best_params)
for init in inits:
try:
res = minimize(
objective, init,
method='L-BFGS-B', bounds=bounds,
options={'maxiter': 2000, 'ftol': 1e-12, 'gtol': 1e-9}
)
if res.fun < best_score:
best_score = res.fun
best_params = res.x
except:
continue
# Final polish
if best_params is not None:
try:
res = minimize(
objective, best_params,
method='L-BFGS-B', bounds=bounds,
options={'maxiter': 1500, 'ftol': 1e-13}
)
if res.fun < best_score:
best_params = res.x
except:
pass
# Fallback
if best_params is None:
best_params = np.array([baseline * 1.05, -0.06, parallel_gain * 0.9, 0.35])
return best_params
# EVOLVE-BLOCK-END