# EVOLVE-BLOCK-START
"""
Scaling law for LLM parallel training with enhanced optimization
Uses 4-parameter model: loss = a * N^b / P^c + d
- N = num_params, P = parallel_size
- Captures both parameter scaling and parallel benefit with superior fitting
"""
import numpy as np
from scipy.optimize import minimize, differential_evolution
def scaling_law_func(data_points, params):
"""
Scaling law: loss = a * num_params^b / parallel_size^c + d
params: [a, b, c, d] (exactly 4 parameters)
- a: coefficient (> 0)
- b: exponent for num_params (typically -0.1 to 0.2)
- c: exponent for parallel_size (typically 0 to 0.5)
- d: baseline loss offset
"""
X = np.atleast_2d(np.asarray(data_points, dtype=np.float64))
params = np.asarray(params, dtype=np.float64)
num_params = X[:, 0]
parallel_size = X[:, 1]
a = params[0]
b = params[1]
c = params[2]
d = params[3]
# Numerically stable computation with epsilon guards
eps = 1e-10
num_params_safe = np.maximum(num_params, eps)
parallel_size_safe = np.maximum(parallel_size, eps)
# Compute: a * N^b / P^c + d
numerator = a * np.power(num_params_safe, b)
denominator = np.power(parallel_size_safe, c)
loss = numerator / (denominator + eps) + d
return loss
def fit_scaling_law(data_points, loss_values):
"""
Fit 4-parameter scaling law using three-phase optimization:
Phase 1: Smart local optimization from data-driven initialization
Phase 2: Global optimization if needed with fine convergence
Phase 3: Local refinement on best solution found
"""
X = np.atleast_2d(np.asarray(data_points, dtype=np.float64))
y = np.asarray(loss_values, dtype=np.float64).ravel()
num_params = X[:, 0]
parallel_size = X[:, 1]
# Data statistics
n_min, n_max = num_params.min(), num_params.max()
p_min, p_max = parallel_size.min(), parallel_size.max()
y_min, y_max = y.min(), y.max()
y_range = y_max - y_min
def objective(params):
"""Objective function with robustness checks"""
try:
pred = scaling_law_func(X, params)
if np.any(np.isnan(pred)) or np.any(np.isinf(pred)):
return 1e10
mse = np.mean((pred - y) ** 2)
return max(float(mse), 0)
except:
return 1e10
# Data-driven bounds based on observations
bounds = [
(1e-6, 1e3), # a: coefficient (positive)
(-0.2, 0.2), # b: num_params exponent (small)
(0.0, 0.5), # c: parallel_size exponent (small positive)
(y_min - 0.5, y_max + 0.5) # d: baseline offset
]
# Enhanced smart initialization from data characteristics
y_span = y_range if y_range > 1e-6 else 1.0
a_init = y_span / np.power(np.maximum(n_max, 1), 0.05)
b_init = -0.05
c_init = 0.15
d_init = y_min - 0.05 * y_span
x0 = np.array([a_init, b_init, c_init, d_init])
x0 = np.clip(x0, [b[0] for b in bounds], [b[1] for b in bounds])
# Phase 1: Local optimization from smart initialization with tight convergence
result_local = minimize(
objective,
x0,
method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 1000, 'ftol': 1e-11, 'gtol': 1e-9}
)
best_params = result_local.x if result_local.success else x0
best_loss = objective(best_params)
# Phase 2: Global optimization if local result is suboptimal
if best_loss > 0.005:
result_global = differential_evolution(
objective,
bounds,
maxiter=500,
popsize=20,
seed=42,
atol=1e-11,
tol=1e-11,
workers=1,
updating='deferred',
strategy='best1bin'
)
if result_global.fun < best_loss:
best_params = result_global.x
best_loss = result_global.fun
# Phase 3: Local refinement on global solution
result_local2 = minimize(
objective,
best_params,
method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 1000, 'ftol': 1e-11, 'gtol': 1e-9}
)
if result_local2.fun < best_loss:
best_params = result_local2.x
else:
# Fine-tune already-good local result
result_local_refined = minimize(
objective,
best_params,
method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 1000, 'ftol': 1e-12, 'gtol': 1e-10}
)
if result_local_refined.fun < best_loss:
best_params = result_local_refined.x
return best_params
# EVOLVE-BLOCK-END