How We Learn Step-Level Rewards from Preferences to Solve Sparse-Reward Environments Using Online Process Reward Learning

byrn
By byrn
11 Min Read


In this tutorial, we explore Online Process Reward Learning (OPRL) and demonstrate how we can learn dense, step-level reward signals from trajectory preferences to solve sparse-reward reinforcement learning tasks. We walk through each component, from the maze environment and reward-model network to preference generation, training loops, and evaluation, while observing how the agent gradually improves its behaviour through online preference-driven shaping. By running this end-to-end implementation, we gain a practical understanding of how OPRL enables better credit assignment, faster learning, and more stable policy optimization in challenging environments where the agent would otherwise struggle to discover meaningful rewards. Check out the FULL CODE NOTEBOOK.

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random


torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


class MazeEnv:
   def __init__(self, size=8):
       self.size = size
       self.start = (0, 0)
       self.goal = (size-1, size-1)
       self.obstacles = set([(i, size//2) for i in range(1, size-2)])
       self.reset()
  
   def reset(self):
       self.pos = self.start
       self.steps = 0
       return self._get_state()
  
   def _get_state(self):
       state = np.zeros(self.size * self.size)
       state[self.pos[0] * self.size + self.pos[1]] = 1
       return state
  
   def step(self, action):
       moves = [(-1,0), (0,1), (1,0), (0,-1)]
       new_pos = (self.pos[0] + moves[action][0],
                  self.pos[1] + moves[action][1])
       if (0 <= new_pos[0] < self.size and
           0 <= new_pos[1] < self.size and
           new_pos not in self.obstacles):
           self.pos = new_pos
       self.steps += 1
       done = self.pos == self.goal or self.steps >= 60
       reward = 10.0 if self.pos == self.goal else 0.0
       return self._get_state(), reward, done
  
   def render(self):
       grid = [['.' for _ in range(self.size)] for _ in range(self.size)]
       for obs in self.obstacles:
           grid[obs[0]][obs[1]] = '█'
       grid[self.goal[0]][self.goal[1]] = 'G'
       grid[self.pos[0]][self.pos[1]] = 'A'
       return '\n'.join([''.join(row) for row in grid])


class ProcessRewardModel(nn.Module):
   def __init__(self, state_dim, hidden=128):
       super().__init__()
       self.net = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, 1),
           nn.Tanh()
       )
   def forward(self, states):
       return self.net(states)
   def trajectory_reward(self, states):
       return self.forward(states).sum()


class PolicyNetwork(nn.Module):
   def __init__(self, state_dim, action_dim, hidden=128):
       super().__init__()
       self.backbone = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.ReLU()
       )
       self.actor = nn.Linear(hidden, action_dim)
       self.critic = nn.Linear(hidden, 1)
   def forward(self, state):
       features = self.backbone(state)
       return self.actor(features), self.critic(features)

We set up the entire foundation of our OPRL system by importing libraries, defining the maze environment, and building the reward and policy networks. We establish how states are represented, how obstacles block movement, and how the sparse reward structure works. We also design the core neural models that will later learn process rewards and drive the policy’s decisions. Check out the FULL CODE NOTEBOOK.

class OPRLAgent:
   def __init__(self, state_dim, action_dim, lr=3e-4):
       self.policy = PolicyNetwork(state_dim, action_dim)
       self.reward_model = ProcessRewardModel(state_dim)
       self.policy_opt = Adam(self.policy.parameters(), lr=lr)
       self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
       self.trajectories = deque(maxlen=200)
       self.preferences = deque(maxlen=500)
       self.action_dim = action_dim
  
   def select_action(self, state, epsilon=0.1):
       if random.random() < epsilon:
           return random.randint(0, self.action_dim - 1)
       state_t = torch.FloatTensor(state).unsqueeze(0)
       with torch.no_grad():
           logits, _ = self.policy(state_t)
           probs = F.softmax(logits, dim=-1)
           return torch.multinomial(probs, 1).item()
  
   def collect_trajectory(self, env, epsilon=0.1):
       states, actions, rewards = [], [], []
       state = env.reset()
       done = False
       while not done:
           action = self.select_action(state, epsilon)
           next_state, reward, done = env.step(action)
           states.append(state)
           actions.append(action)
           rewards.append(reward)
           state = next_state
       traj = {
           'states': torch.FloatTensor(np.array(states)),
           'actions': torch.LongTensor(actions),
           'rewards': torch.FloatTensor(rewards),
           'return': float(sum(rewards))
       }
       self.trajectories.append(traj)
       return traj

We begin constructing the OPRL agent by implementing action selection and trajectory collection. We use an ε-greedy strategy to ensure exploration and gather sequences of states, actions, and returns. As we run the agent through the maze, we store entire trajectories that will later serve as preference data for shaping the reward model. Check out the FULL CODE NOTEBOOK.

  def generate_preference(self):
       if len(self.trajectories) < 2:
           return
       t1, t2 = random.sample(list(self.trajectories), 2)
       label = 1.0 if t1['return'] > t2['return'] else 0.0
       self.preferences.append({'t1': t1, 't2': t2, 'label': label})
  
   def train_reward_model(self, n_updates=5):
       if len(self.preferences) < 32:
           return 0.0
       total_loss = 0.0
       for _ in range(n_updates):
           batch = random.sample(list(self.preferences), 32)
           loss = 0.0
           for item in batch:
               r1 = self.reward_model.trajectory_reward(item['t1']['states'])
               r2 = self.reward_model.trajectory_reward(item['t2']['states'])
               logit = r1 - r2
               pred_prob = torch.sigmoid(logit)
               label = item['label']
               loss += -(label * torch.log(pred_prob + 1e-8) +
                        (1-label) * torch.log(1 - pred_prob + 1e-8))
           loss = loss / len(batch)
           self.reward_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
           self.reward_opt.step()
           total_loss += loss.item()
       return total_loss / n_updates

We generate preference pairs from collected trajectories and train the process reward model using the Bradley–Terry formulation. We compare trajectory-level scores, compute probabilities, and update the reward model to reflect which behaviours appear better. This allows us to learn dense, differentiable, step-level rewards that guide the agent even when the environment itself is sparse. Check out the FULL CODE NOTEBOOK.

 def train_policy(self, n_updates=3, gamma=0.98):
       if len(self.trajectories) < 5:
           return 0.0
       total_loss = 0.0
       for _ in range(n_updates):
           traj = random.choice(list(self.trajectories))
           with torch.no_grad():
               process_rewards = self.reward_model(traj['states']).squeeze()
           shaped_rewards = traj['rewards'] + 0.1 * process_rewards
           returns = []
           G = 0
           for r in reversed(shaped_rewards.tolist()):
               G = r + gamma * G
               returns.insert(0, G)
           returns = torch.FloatTensor(returns)
           returns = (returns - returns.mean()) / (returns.std() + 1e-8)
           logits, values = self.policy(traj['states'])
           log_probs = F.log_softmax(logits, dim=-1)
           action_log_probs = log_probs.gather(1, traj['actions'].unsqueeze(1))
           advantages = returns - values.squeeze().detach()
           policy_loss = -(action_log_probs.squeeze() * advantages).mean()
           value_loss = F.mse_loss(values.squeeze(), returns)
           entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).mean()
           loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
           self.policy_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
           self.policy_opt.step()
           total_loss += loss.item()
       return total_loss / n_updates


def train_oprl(episodes=500, render_interval=100):
   env = MazeEnv(size=8)
   agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
   returns, reward_losses, policy_losses = [], [], []
   success_rate = []
   for ep in range(episodes):
       epsilon = max(0.05, 0.5 - ep / 1000)
       traj = agent.collect_trajectory(env, epsilon)
       returns.append(traj['return'])
       if ep % 2 == 0 and ep > 10:
           agent.generate_preference()
       if ep > 20 and ep % 2 == 0:
           rew_loss = agent.train_reward_model(n_updates=3)
           reward_losses.append(rew_loss)
       if ep > 10:
           pol_loss = agent.train_policy(n_updates=2)
           policy_losses.append(pol_loss)
       success = 1 if traj['return'] > 5 else 0
       success_rate.append(success)
       if ep % render_interval == 0 and ep > 0:
           test_env = MazeEnv(size=8)
           agent.collect_trajectory(test_env, epsilon=0)
           print(test_env.render())
   return returns, reward_losses, policy_losses, success_rate

We train the policy using shaped rewards produced by the learned process reward model. We compute returns, advantages, value estimates, and entropy bonuses, enabling the agent to improve its strategy over time. We then build a full training loop in which exploration decays, preferences accumulate, and both the reward model and the policy are updated continuously. Check out the FULL CODE NOTEBOOK.

print("Training OPRL Agent on Sparse Reward Maze...\n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)


fig, axes = plt.subplots(2, 2, figsize=(14, 10))


axes[0,0].plot(returns, alpha=0.3)
axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode="valid"), linewidth=2)
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Return')
axes[0,0].set_title('Agent Performance')
axes[0,0].grid(alpha=0.3)


success_smooth = np.convolve(success, np.ones(20)/20, mode="valid")
axes[0,1].plot(success_smooth, linewidth=2, color="green")
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Success Rate')
axes[0,1].set_title('Goal Success Rate')
axes[0,1].grid(alpha=0.3)


axes[1,0].plot(rew_losses, linewidth=2, color="orange")
axes[1,0].set_xlabel('Update Step')
axes[1,0].set_ylabel('Loss')
axes[1,0].set_title('Reward Model Loss')
axes[1,0].grid(alpha=0.3)


axes[1,1].plot(pol_losses, linewidth=2, color="red")
axes[1,1].set_xlabel('Update Step')
axes[1,1].set_ylabel('Loss')
axes[1,1].set_title('Policy Loss')
axes[1,1].grid(alpha=0.3)


plt.tight_layout()
plt.show()


print("OPRL Training Complete!")
print("Process rewards, preference learning, reward shaping, and online updates demonstrated.")

We visualize the learning dynamics by plotting returns, success rates, reward-model loss, and policy loss. We monitor how the agent’s performance evolves as OPRL shapes the reward landscape. By the end of the visualization, we clearly see the impact of process rewards on solving a challenging, sparse-reward maze.

In conclusion, we see how OPRL transforms sparse terminal outcomes into rich online feedback that continuously guides the agent’s behaviour. We watch the process reward model learn preferences, shape the return signal, and accelerate the policy’s ability to reach the goal. With larger mazes, varying shaping strengths, or even real human preference feedback, we appreciate how OPRL provides a flexible and powerful framework for credit assignment in complex decision-making tasks. We finish with a clear, hands-on understanding of how OPRL operates and how we can extend it to more advanced agentic RL settings.


Check out the FULL CODE NOTEBOOK and Paper. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.



Source link

Share This Article
Leave a Comment

Leave a Reply

Your email address will not be published. Required fields are marked *