class AdvancedNeuralAgent:
def __init__(self, input_size, hidden_layers=[64, 32], output_size=1, learning_rate=0.001):
"""Advanced AI Agent with stable training and decision making capabilities"""
self.lr = learning_rate
self.initial_lr = learning_rate
self.layers = []
self.memory = []
self.performance_history = []
self.epsilon = 1e-8
layer_sizes = [input_size] + hidden_layers + [output_size]
for i in range(len(layer_sizes) - 1):
fan_in, fan_out = layer_sizes[i], layer_sizes[i+1]
limit = np.sqrt(6.0 / (fan_in + fan_out))
layer = {
'weights': np.random.uniform(-limit, limit, (layer_sizes[i], layer_sizes[i+1])),
'bias': np.zeros((1, layer_sizes[i+1])),
'momentum_w': np.zeros((layer_sizes[i], layer_sizes[i+1])),
'momentum_b': np.zeros((1, layer_sizes[i+1]))
}
self.layers.append(layer)
def activation(self, x, func="relu"):
"""Stable activation functions with clipping"""
x = np.clip(x, -50, 50)
if func == 'relu':
return np.maximum(0, x)
elif func == 'sigmoid':
return 1 / (1 + np.exp(-x))
elif func == 'tanh':
return np.tanh(x)
elif func == 'leaky_relu':
return np.where(x > 0, x, x * 0.01)
elif func == 'linear':
return x
def activation_derivative(self, x, func="relu"):
"""Stable derivatives"""
x = np.clip(x, -50, 50)
if func == 'relu':
return (x > 0).astype(float)
elif func == 'sigmoid':
s = self.activation(x, 'sigmoid')
return s * (1 - s)
elif func == 'tanh':
return 1 - np.tanh(x)**2
elif func == 'leaky_relu':
return np.where(x > 0, 1, 0.01)
elif func == 'linear':
return np.ones_like(x)
def forward(self, X):
"""Forward pass with gradient clipping"""
self.activations = [X]
self.z_values = []
current_input = X
for i, layer in enumerate(self.layers):
z = np.dot(current_input, layer['weights']) + layer['bias']
z = np.clip(z, -50, 50)
self.z_values.append(z)
if i < len(self.layers) - 1:
a = self.activation(z, 'leaky_relu')
else:
a = self.activation(z, 'linear')
self.activations.append(a)
current_input = a
return current_input
def clip_gradients(self, gradients, max_norm=1.0):
"""Gradient clipping to prevent explosion"""
grad_norm = np.linalg.norm(gradients)
if grad_norm > max_norm:
gradients = gradients * (max_norm / (grad_norm + self.epsilon))
return gradients
def backward(self, X, y, output):
"""Stable backpropagation with gradient clipping"""
m = X.shape[0]
dz = (output - y.reshape(-1, 1)) / m
dz = np.clip(dz, -10, 10)
for i in reversed(range(len(self.layers))):
layer = self.layers[i]
dw = np.dot(self.activations[i].T, dz)
db = np.sum(dz, axis=0, keepdims=True)
dw = self.clip_gradients(dw, max_norm=1.0)
db = self.clip_gradients(db, max_norm=1.0)
momentum = 0.9
layer['momentum_w'] = momentum * layer['momentum_w'] + (1 - momentum) * dw
layer['momentum_b'] = momentum * layer['momentum_b'] + (1 - momentum) * db
weight_decay = 0.0001
layer['weights'] -= self.lr * (layer['momentum_w'] + weight_decay * layer['weights'])
layer['bias'] -= self.lr * layer['momentum_b']
if i > 0:
activation_func="leaky_relu" if i > 1 else 'leaky_relu'
dz = np.dot(dz, layer['weights'].T) * self.activation_derivative(
self.z_values[i-1], activation_func)
dz = np.clip(dz, -10, 10)
def adapt_learning_rate(self, epoch, performance_history):
"""Adaptive learning rate with performance-based adjustment"""
if epoch > 10:
recent_performance = performance_history[-10:]
if len(recent_performance) >= 5:
if recent_performance[-1] >= recent_performance[-5]:
self.lr = max(self.lr * 0.95, self.initial_lr * 0.01)
elif recent_performance[-1] < recent_performance[-5] * 0.98:
self.lr = min(self.lr * 1.02, self.initial_lr * 2)
def calculate_loss(self, y_true, y_pred):
"""Stable loss calculation"""
y_true = y_true.reshape(-1, 1)
y_pred = np.clip(y_pred, -1e6, 1e6)
mse = np.mean((y_true - y_pred) ** 2)
mae = np.mean(np.abs(y_true - y_pred))
if not np.isfinite(mse):
mse = 1e6
if not np.isfinite(mae):
mae = 1e6
return mse, mae
def store_experience(self, state, action, reward, next_state):
"""Experience replay for RL aspects"""
experience = {
'state': state,
'action': action,
'reward': reward,
'next_state': next_state,
'timestamp': len(self.memory)
}
self.memory.append(experience)
if len(self.memory) > 1000:
self.memory.pop(0)
def make_decision(self, X, exploration_rate=0.1):
"""Stable decision making"""
prediction = self.forward(X)
if np.random.random() < exploration_rate:
noise_scale = np.std(prediction) * 0.1 if np.std(prediction) > 0 else 0.1
noise = np.random.normal(0, noise_scale, prediction.shape)
prediction += noise
return np.clip(prediction, -1e6, 1e6)
def reset_if_unstable(self):
"""Reset network if training becomes unstable"""
print("🔄 Resetting network due to instability...")
for i, layer in enumerate(self.layers):
fan_in, fan_out = layer['weights'].shape
limit = np.sqrt(6.0 / (fan_in + fan_out))
layer['weights'] = np.random.uniform(-limit, limit, (fan_in, fan_out))
layer['bias'] = np.zeros((1, fan_out))
layer['momentum_w'] = np.zeros((fan_in, fan_out))
layer['momentum_b'] = np.zeros((1, fan_out))
self.lr = self.initial_lr
def train(self, X, y, epochs=500, batch_size=32, validation_split=0.2, verbose=True):
"""Robust training with stability checks"""
y_mean, y_std = np.mean(y), np.std(y)
y_normalized = (y - y_mean) / (y_std + self.epsilon)
X_trn, X_val, y_trn, y_val = train_test_split(
X, y_normalized, test_size=validation_split, random_state=42)
best_val_loss = float('inf')
patience = 30
patience_counter = 0
train_losses, val_losses = [], []
reset_count = 0
for epoch in range(epochs):
if epoch > 0 and (not np.isfinite(train_losses[-1]) or train_losses[-1] > 1e6):
if reset_count < 2:
self.reset_if_unstable()
reset_count += 1
continue
else:
print("🚫 Training unstable, stopping...")
break
indices = np.random.permutation(len(X_train))
X_train_shuffled = X_train[indices]
y_train_shuffled = y_train[indices]
epoch_loss = 0
batches = 0
for i in range(0, len(X_trn), batch_size):
batch_X = X_train_shuffled[i:i+batch_size]
batch_y = y_train_shuffled[i:i+batch_size]
if len(batch_X) == 0:
continue
output = self.forward(batch_X)
self.backward(batch_X, batch_y, output)
loss, _ = self.calculate_loss(batch_y, output)
epoch_loss += loss
batches += 1
avg_train_loss = epoch_loss / max(batches, 1)
val_output = self.forward(X_val)
val_loss, val_mae = self.calculate_loss(y_val, val_output)
train_losses.append(avg_train_loss)
val_losses.append(val_loss)
self.performance_history.append(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
if verbose:
print(f"✋ Early stopping at epoch {epoch}")
break
if epoch > 0:
self.adapt_learning_rate(epoch, self.performance_history)
if verbose and (epoch % 50 == 0 or epoch < 10):
print(f"Epoch {epoch:3d}: Train Loss = {avg_train_loss:.4f}, "
f"Val Loss = {val_loss:.4f}, LR = {self.lr:.6f}")
self.y_mean, self.y_std = y_mean, y_std
return train_losses, val_losses
def predict(self, X):
"""Make predictions with denormalization"""
normalized_pred = self.forward(X)
if hasattr(self, 'y_mean') and hasattr(self, 'y_std'):
return normalized_pred * self.y_std + self.y_mean
return normalized_pred
def evaluate_performance(self, X, y):
"""Comprehensive performance evaluation"""
predictions = self.predict(X)
mse, mae = self.calculate_loss(y, predictions)
y_mean = np.mean(y)
ss_tot = np.sum((y - y_mean) ** 2)
ss_res = np.sum((y.reshape(-1, 1) - predictions) ** 2)
r2 = 1 - (ss_res / (ss_tot + self.epsilon))
return {
'mse': float(mse) if np.isfinite(mse) else float('inf'),
'mae': float(mae) if np.isfinite(mae) else float('inf'),
'r2': float(r2) if np.isfinite(r2) else -float('inf'),
'predictions': predictions.flatten()
}
def visualize_training(self, train_losses, val_losses):
"""Visualize training progress"""
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label="Training Loss", alpha=0.8)
plt.plot(val_losses, label="Validation Loss", alpha=0.8)
plt.title('Training Progress')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.subplot(1, 3, 2)
if len(self.performance_history) > 0:
plt.plot(self.performance_history)
plt.title('Performance History')
plt.xlabel('Epoch')
plt.ylabel('Validation Loss')
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.subplot(1, 3, 3)
if hasattr(self, 'lr_history'):
plt.plot(self.lr_history)
plt.title('Learning Rate Schedule')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()