# EVOLVE-BLOCK-START
"""
Evolved scaling law for multi-domain LLM finetuning
Simplified architecture with improved interaction modeling
Uses 32 parameters for better expressivity and fit
"""
import numpy as np
from scipy.optimize import minimize, differential_evolution
def scaling_law_func(data_points, params):
"""
Refined scaling law architecture:
- 5 base coefficients for direct domain effects (linear)
- 5 power-law exponents for logarithmic terms
- 10 domain-specific interaction coefficients (one per domain pair)
- 7 global/cross-domain parameters for mixture effects
Total: 32 parameters (within 35 limit)
Functional form:
loss_d = base_coeff_d * X_d + exp_coeff_d * log(X_d + eps)
+ sum_i(inter_coeff_di * X_i * X_d) + global_effect
"""
X = np.atleast_2d(np.asarray(data_points, dtype=np.float64))
N, F = X.shape # N samples, F=5 domains
params = np.asarray(params, dtype=np.float64).ravel()
# Parse parameters
base_coeff = params[0:5] # (5,) - linear coefficients
exp_coeff = params[5:10] # (5,) - log coefficients
inter_coeff_matrix = params[10:20] # (10,) - pairwise interactions
global_bias = params[20:25] # (5,) - domain biases
cross_scale = params[25:27] # (2,) - cross-domain scaling
entropy_coeff = params[27:32] # (5,) - entropy-based regularization
# Clamp exponents for stability
exp_coeff = np.clip(exp_coeff, -2.0, 2.0)
cross_scale = np.clip(cross_scale, -2.0, 2.0)
entropy_coeff = np.clip(entropy_coeff, -1.0, 1.0)
# Initialize predictions
pred = np.zeros((N, 5), dtype=np.float64)
# 1. Direct linear terms
pred += base_coeff[np.newaxis, :] * X
# 2. Log-linear terms for power-law scaling behavior
X_safe = np.clip(X, 1e-8, 1.0)
X_log = np.log(X_safe)
pred += exp_coeff[np.newaxis, :] * X_log
# 3. Pairwise interactions with domain-specific effects
# Each domain gets affected by its interactions with all other domains
pairs = [
(0, 1), (0, 2), (0, 3), (0, 4),
(1, 2), (1, 3), (1, 4),
(2, 3), (2, 4),
(3, 4)
]
interaction_matrix = np.zeros((N, 5))
for idx, (i, j) in enumerate(pairs):
product = X[:, i] * X[:, j]
coeff = inter_coeff_matrix[idx]
# Both domains affected by the interaction
interaction_matrix[:, i] += coeff * product
interaction_matrix[:, j] += coeff * product
pred += interaction_matrix
# 4. Domain-specific biases
pred += global_bias[np.newaxis, :]
# 5. Cross-domain coupling effects
# Using mixture entropy and concentration
X_sum = np.sum(X, axis=1, keepdims=True) # Should be ~1.0
X_entropy = -np.sum(X * np.log(np.clip(X, 1e-8, 1.0)), axis=1, keepdims=True)
# Cross-scale effect 1: proportional to concentration
concentration_effect = cross_scale[0] * (1.0 - X_entropy / np.log(5.0))
pred += concentration_effect
# Cross-scale effect 2: maximum domain proportion effect
X_max = np.max(X, axis=1, keepdims=True)
max_effect = cross_scale[1] * X_max
pred += max_effect
# 6. Entropy-based regularization per domain
entropy_reg = entropy_coeff[np.newaxis, :] * (X_entropy / np.log(5.0))
pred += entropy_reg
# Ensure predictions are in valid loss range
pred = np.clip(pred, 0.5, 5.0)
return pred
def fit_scaling_law(data_points, loss_values):
"""
Optimized fitting strategy:
1. Smart initialization using correlation analysis
2. Efficient multi-start local optimization
3. Global exploration with differential evolution
4. Targeted refinement
"""
X = np.atleast_2d(np.asarray(data_points, dtype=np.float64))
y = np.atleast_2d(np.asarray(loss_values, dtype=np.float64))
if y.ndim == 1 or y.shape[1] == 1:
y = y.reshape(-1, 1)
N, F = X.shape # F=5
n_domains = y.shape[1]
# Data statistics
y_mean = np.mean(y, axis=0)
y_std = np.std(y, axis=0) + 1e-8
X_mean = np.mean(X, axis=0)
def initialize_params(strategy=0):
"""Multiple initialization strategies"""
params = np.zeros(32)
if strategy == 0:
# Strategy 0: Data-driven from correlation
# Linear model: y ≈ base_coeff * X + bias
X_pinv = np.linalg.pinv(np.hstack([X, np.ones((N, 1))]))
linear_sol = X_pinv @ y
params[0:5] = np.clip(linear_sol[:5, 0], -5.0, 5.0)
params[20:25] = np.clip(linear_sol[5:, 0], -3.0, 3.0)
# Log coefficients: small regularized values
params[5:10] = np.random.randn(5) * 0.15 - 0.05
# Interactions: correlated pairs get larger effects
params[10:20] = np.random.randn(10) * 0.1
# Cross-domain: medium magnitude
params[25:27] = np.random.randn(2) * 0.3
# Entropy coefficients: small
params[27:32] = np.random.randn(5) * 0.05
elif strategy == 1:
# Strategy 1: Conservative baseline
params[0:5] = 1.8 + np.random.randn(5) * 0.3
params[5:10] = -0.15 + np.random.randn(5) * 0.1
params[20:25] = (y_mean - 2.0) / 2.0
params[25:27] = np.random.randn(2) * 0.2
params[27:32] = np.random.randn(5) * 0.05
else:
# Strategy 2: Diverse random
params[0:5] = np.random.uniform(0.5, 3.5, 5)
params[5:10] = np.random.uniform(-1.0, 1.0, 5)
params[10:20] = np.random.randn(10) * 0.25
params[20:25] = np.random.randn(5) * 0.5
params[25:27] = np.random.randn(2) * 0.5
params[27:32] = np.random.randn(5) * 0.1
return params
def objective(params_flat):
"""MSE loss with adaptive regularization"""
try:
pred = scaling_law_func(X, params_flat)
pred = np.atleast_2d(pred)
if pred.shape[1] != n_domains:
pred = pred.T
# MSE loss
error = pred - y
mse = np.mean(error ** 2)
# Adaptive L2 regularization
reg_base = 0.0002 * np.sum(params_flat[0:5] ** 2)
reg_exp = 0.0003 * np.sum(params_flat[5:10] ** 2)
reg_inter = 0.0001 * np.sum(params_flat[10:20] ** 2)
reg_cross = 0.0002 * np.sum(params_flat[25:27] ** 2)
return mse + reg_base + reg_exp + reg_inter + reg_cross
except:
return 1e10
# Parameter bounds
bounds = [
(-5.0, 5.0), # base_coeff (5)
(-5.0, 5.0),
(-5.0, 5.0),
(-5.0, 5.0),
(-5.0, 5.0),
(-2.0, 2.0), # exp_coeff (5)
(-2.0, 2.0),
(-2.0, 2.0),
(-2.0, 2.0),
(-2.0, 2.0),
(-1.5, 1.5), # inter_coeff_matrix (10)
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-1.5, 1.5),
(-3.0, 3.0), # global_bias (5)
(-3.0, 3.0),
(-3.0, 3.0),
(-3.0, 3.0),
(-3.0, 3.0),
(-2.0, 2.0), # cross_scale (2)
(-2.0, 2.0),
(-1.0, 1.0), # entropy_coeff (5)
(-1.0, 1.0),
(-1.0, 1.0),
(-1.0, 1.0),
(-1.0, 1.0),
]
best_params = None
best_loss = float('inf')
# Phase 1: Multi-start local optimization
for attempt in range(5):
init_params = initialize_params(strategy=attempt % 3)
try:
result = minimize(
objective,
init_params,
method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 1000, 'ftol': 1e-10, 'gtol': 1e-8}
)
if result.fun < best_loss:
best_loss = result.fun
best_params = result.x
except Exception:
pass
# Phase 2: Global optimization
try:
result_de = differential_evolution(
objective,
bounds,
maxiter=400,
popsize=30,
seed=42,
atol=1e-10,
tol=1e-10,
workers=1,
polish=True
)
if result_de.fun < best_loss:
best_loss = result_de.fun
best_params = result_de.x
except Exception:
pass
# Phase 3: Final local refinement
if best_params is not None:
try:
result_final = minimize(
objective,
best_params,
method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 800, 'ftol': 1e-11}
)
if result_final.fun < best_loss:
best_params = result_final.x
except Exception:
pass
if best_params is None:
best_params = initialize_params(strategy=0)
return best_params
# EVOLVE-BLOCK-END