# EVOLVE-BLOCK-START
"""
Scaling law discovery for LLM finetuning scenarios
Improved with a Domain-Specific Power Law and Low-Rank Transfer Matrix.
Uses Weighted MSE optimization to balance domain performance and L-BFGS-B with bounds for stability.
"""
import numpy as np
from scipy.optimize import minimize
def scaling_law_func(data_points, params):
# data_points: (N, 5) array of domain proportions
# params: Flat array of 35 parameters
# Structure:
# 0-4: log A (Amplitude)
# 5-9: log alpha (Scaling exponent)
# 10-14: log E (Irreducible loss)
# 15-19: b (Target receptivity bias)
# 20-24: s (Source quality bias)
# 25-29: u (interaction factor 1)
# 30-34: v (interaction factor 2)
X = np.atleast_2d(np.asarray(data_points))
params = np.asarray(params)
# Unpack parameters
log_A = params[0:5]
log_alpha = params[5:10]
log_E = params[10:15]
b = params[15:20]
s = params[20:25]
u = params[25:30]
v = params[30:35]
A = np.exp(log_A)
alpha = np.exp(log_alpha)
E = np.exp(log_E)
# Calculate transfer matrix W (5, 5)
# W_ij = sigmoid(b_i + s_j + u_i * v_j)
# models transfer efficiency from domain j to domain i
W_logits = b[:, None] + s[None, :] + (u[:, None] * v[None, :])
W = 1.0 / (1.0 + np.exp(-W_logits))
# Calculate effective data size for each domain i
# D_eff_i = x_i + sum_{j!=i} W_ij * x_j
# We treat in-domain data x_i as having weight 1.0 (identity).
# Computation:
# 1. Weighted sum of all sources: X @ W.T
# 2. Remove diagonal contribution: X * diag(W)
# 3. Add identity contribution: X
term_interaction = X @ W.T
W_diag = np.diag(W)
term_correction = X * W_diag[None, :]
effective_data = X + term_interaction - term_correction
effective_data = np.maximum(effective_data, 1e-9)
# Power law prediction
# pred_i = A_i * (D_eff_i)^(-alpha_i) + E_i
pred = A[None, :] * (effective_data ** -alpha[None, :]) + E[None, :]
return pred
def fit_scaling_law(data_points, loss_values):
X = np.atleast_2d(np.asarray(data_points))
Y = np.atleast_2d(np.asarray(loss_values))
N, D = X.shape
# Calculate weights for objective function to balance domain importance
# We weight by inverse variance to normalize the error contribution of each domain
y_var = np.var(Y, axis=0)
y_var = np.maximum(y_var, 1e-6) # Safety floor
weights = 1.0 / y_var
weights = weights / np.sum(weights) * D # Normalize to sum to D
# --- Step 1: Robust Independent Fits ---
# Fit each domain independently to get baseline A, alpha, E and beta (avg transfer)
# Model: y = A * (x + beta * (1-x))^-alpha + E
init_params_list = []
for i in range(D):
x_i = X[:, i]
y_i = Y[:, i]
def obj_indep(p):
A_val = np.exp(p[0])
beta_val = 1.0 / (1.0 + np.exp(-p[1]))
alpha_val = np.exp(p[2])
E_val = np.exp(p[3])
eff = x_i + beta_val * (1.0 - x_i) + 1e-9
pred = A_val * (eff ** -alpha_val) + E_val
return np.mean((pred - y_i)**2)
# Grid search initialization candidates
y_min, y_max = np.min(y_i), np.max(y_i)
log_A_est = np.log(max(y_max - y_min, 0.01))
log_E_est = np.log(max(y_min - 0.1, 1e-5))
candidates = [
[log_A_est, -2.0, -0.5, log_E_est], # Default
[log_A_est, -0.5, -0.5, log_E_est], # High transfer
[log_A_est + 1.0, -2.0, -0.5, log_E_est - 1.0], # High A, Low E
[log_A_est, -4.0, -0.3, log_E_est], # Low transfer
]
best_p = candidates[0]
best_err = np.inf
for p0 in candidates:
try:
# Use L-BFGS-B with bounds for stability during init
res = minimize(obj_indep, p0, method='L-BFGS-B',
bounds=[(-5, 5), (-5, 5), (-3, 2), (-10, 5)])
if res.fun < best_err:
best_err = res.fun
best_p = res.x
except:
continue
init_params_list.append(best_p)
initial_params_indep = np.array(init_params_list)
# --- Step 2: Initialize Global Parameters ---
log_A_init = initial_params_indep[:, 0]
beta_logits_init = initial_params_indep[:, 1]
log_alpha_init = initial_params_indep[:, 2]
log_E_init = initial_params_indep[:, 3]
# Initialize Transfer Matrix components
# b: initialized to beta_logits (assuming source bias s=0 initially)
b_init = beta_logits_init
s_init = np.zeros(D)
# u, v: initialized with small noise to break symmetry
rng = np.random.RandomState(42)
u_init = rng.uniform(-0.01, 0.01, D)
v_init = rng.uniform(-0.01, 0.01, D)
p_global_0 = np.concatenate([
log_A_init, log_alpha_init, log_E_init,
b_init, s_init, u_init, v_init
])
# --- Step 3: Global Optimization with Weighted MSE ---
def global_objective(p):
pred = scaling_law_func(X, p)
# Weighted MSE across domains
mse_per_dim = np.mean((pred - Y)**2, axis=0)
weighted_mse = np.sum(mse_per_dim * weights)
# Regularization
# L2 on interaction params (s, u, v) to prevent overfitting
reg_interact = 1e-5 * np.sum(p[20:35]**2)
# Very weak L2 on others to prevent drift
reg_base = 1e-8 * np.sum(p[:20]**2)
return weighted_mse + reg_interact + reg_base
# Bounds for all 35 parameters
# A: (-5, 5), alpha: (-3, 2), E: (-10, 5), others: (-10, 10)
bounds = []
bounds.extend([(-5, 5)] * 5) # A
bounds.extend([(-3, 2)] * 5) # alpha
bounds.extend([(-10, 5)] * 5) # E
bounds.extend([(-10, 10)] * 20) # b, s, u, v
try:
# L-BFGS-B handles bounds and is robust
res = minimize(global_objective, p_global_0, method='L-BFGS-B',
bounds=bounds,
options={'maxiter': 5000, 'gtol': 1e-7})
best_p = res.x
except:
best_p = p_global_0
return best_p
# EVOLVE-BLOCK-END