Skip to main content

Reinforcement Learning for Robot Control

This chapter explores the application of reinforcement learning (RL) to train complex behaviors for humanoid robots using NVIDIA Isaac's RL capabilities. We'll learn how to create training environments, design reward functions, and transfer learned behaviors from simulation to real robots.

Learning Objectives

By the end of this chapter, you will be able to:

  • Set up reinforcement learning environments in Isaac Sim
  • Design effective reward functions for humanoid robot tasks
  • Implement various RL algorithms for robot control
  • Train humanoid robots to perform complex behaviors
  • Transfer learned policies from simulation to reality (sim-to-real)
  • Optimize RL training for computational efficiency
  • Validate and deploy trained policies on real hardware

Introduction to RL for Robotics

Why RL for Humanoid Robotics?

Reinforcement learning is particularly well-suited for humanoid robotics because:

  • Complex behaviors: Learn sophisticated locomotion and manipulation skills
  • Adaptive control: Adapt to different terrains and environments
  • Generalization: Transfer learned skills to new scenarios
  • Autonomous learning: Learn without explicit programming for every situation

RL Framework for Robots

Robot State → Policy → Action → Environment → Reward → Next State

Isaac's RL Ecosystem

NVIDIA Isaac provides several RL tools:

  • Isaac Gym: GPU-accelerated RL environment
  • Isaac Lab: Advanced robotics RL framework
  • TensorRT Integration: Optimized inference for trained policies
  • Simulation-to-Reality Transfer: Techniques for real-world deployment

Setting Up RL Environment in Isaac Sim

Isaac Gym vs Isaac Lab

# Isaac Gym - High-level API for RL
from isaacgym import gymapi, gymtorch
import torch
import numpy as np

class IsaacGymEnvironment:
def __init__(self, cfg):
self.gym = gymapi.acquire_gym()
self.sim = None
self.envs = []
self.cfg = cfg
self.setup_simulation()

def setup_simulation(self):
"""Setup physics simulation"""
# Configure simulation parameters
self.sim_params = gymapi.SimParams()
self.sim_params.up_axis = gymapi.UP_AXIS_Z
self.sim_params.gravity = gymapi.Vec3(0.0, 0.0, -9.81)

# Physics parameters
self.sim_params.physx.num_position_iterations = 8
self.sim_params.physx.num_velocity_iterations = 1
self.sim_params.physx.max_gpu_contact_pairs = 2**23

# Create simulation
self.sim = self.gym.create_sim(0, 0, gymapi.SIM_PHYSX, self.sim_params)

def create_env(self, env_idx, env_pos):
"""Create a single training environment"""
env_spacing = self.cfg['env_spacing']
env_lower = gymapi.Vec3(-env_spacing, -env_spacing, -env_spacing)
env_upper = gymapi.Vec3(env_spacing, env_spacing, env_spacing)

# Create environment
env = self.gym.create_env(self.sim, env_lower, env_upper, 1)

# Load robot asset
asset_options = gymapi.AssetOptions()
asset_options.fix_base_link = False
asset_options.flip_visual_attachments = False
asset_options.collapse_fixed_joints = True
asset_options.replace_cylinder_with_capsule = True
asset_options.improve_determinism = True

asset_root = self.cfg['asset_root']
asset_file = self.cfg['asset_file']
robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)

# Define start pose
start_pose = gymapi.Transform()
start_pose.p = gymapi.Vec3(0.0, 0.0, 1.0)
start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)

# Create actor
robot = self.gym.create_actor(env, robot_asset, start_pose, "robot", env_idx, 0, 0)

# Configure DOF properties
self.setup_dof_properties(robot_asset)

# Configure drive mode
self.setup_dof_drives(robot_asset, robot)

self.envs.append(env)
return env

def setup_dof_properties(self, asset):
"""Configure degree of freedom properties"""
# Get DOF properties
dof_props = self.gym.get_asset_dof_properties(asset)

# Configure joint limits and drive parameters
for i in range(len(dof_props)):
dof_props["driveMode"][i] = gymapi.DOF_MODE_POS
dof_props["stiffness"][i] = 800.0
dof_props["damping"][i] = 50.0

return dof_props

def setup_dof_drives(self, asset, actor):
"""Setup joint drives for control"""
dof_props = self.gym.get_asset_dof_properties(asset)
self.gym.set_actor_dof_properties(self.envs[0], actor, dof_props)

Isaac Lab Environment (Modern Approach)

# Isaac Lab - Modern robotics RL framework
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg
from omni.isaac.orbit.envs import RLTaskCfg
from omni.isaac.orbit.utils import configclass
import math

@configclass
class HumanoidRLEnvCfg(RLTaskCfg):
def __post_init__(self):
# Environment configuration
self.scene.num_envs = 4096 # Large number of parallel environments
self.scene.env_spacing = 2.5
self.episode_length_s = 20.0 # Episode length in seconds

# Simulation configuration
self.sim.dt = 1.0 / 60.0 # 60 Hz physics update
self.sim.render_interval = 2 # Render every 2nd step
self.sim.device = "cuda:0"
self.sim.use_gpu_pipeline = True

# Robot configuration
self.robot = ArticulationCfg(
prim_path="{ENV_REGEX_NS}/Robot",
spawn_func_name="spawn_humanoid_robot",
spawn_func_args={
"usd_path": "/path/to/humanoid_robot.usd",
"position": (0.0, 0.0, 0.5),
"orientation": (1.0, 0.0, 0.0, 0.0),
},
init_state={
"joint_pos": {
".*Hip.*": 0.0,
".*Knee.*": 0.0,
".*Ankle.*": 0.0,
},
"joint_vel": {"": 0.0},
},
actuators={
"legs": {
"joint_names_expr": [".*Hip.*", ".*Knee.*", ".*Ankle.*"],
"control_type": "position",
" stiffness": 800.0,
"damping": 50.0,
"actuator_range": (-math.pi, math.pi),
},
},
)

# Scene configuration
self.scene.terrain = AssetBaseCfg(
prim_path="{ENV_REGEX_NS}/Terrain",
spawn_func_name="spawn_rough_terrain",
spawn_func_args={"size": (8.0, 8.0)},
)

# Sensor configuration
self.scene.contact_sensor = SensorCfg(
prim_path="{ENV_REGEX_NS}/Robot/torso",
update_period=0.005,
history_length=3,
track_contact_states=True,
)

Designing Reward Functions

Reward Engineering Principles

Effective reward functions should:

  1. Be differentiable: Enable gradient-based learning
  2. Be sparse but informative: Guide learning without being overly specific
  3. Be temporally consistent: Encourage long-term behavior
  4. Include safety considerations: Penalize dangerous behaviors

Walking Task Reward Function

import torch
import numpy as np

class WalkingRewardFunction:
def __init__(self, cfg):
self.cfg = cfg
self.reset()

def compute_reward(self, obs_dict, action_dict):
"""Compute reward for humanoid walking task"""
# Extract key states
base_pos = obs_dict['base_pos'] # Robot base position
base_quat = obs_dict['base_quat'] # Robot base orientation
base_lin_vel = obs_dict['base_lin_vel'] # Linear velocity
base_ang_vel = obs_dict['base_ang_vel'] # Angular velocity
joint_pos = obs_dict['joint_pos'] # Joint positions
joint_vel = obs_dict['joint_vel'] # Joint velocities
contact_forces = obs_dict['contact_forces'] # Ground contact forces

# Walking forward reward
forward_reward = self.compute_forward_reward(base_lin_vel)

# Balance reward (maintain upright orientation)
balance_reward = self.compute_balance_reward(base_quat)

# Energy efficiency (penalize excessive joint movements)
energy_reward = self.compute_energy_reward(joint_vel, action_dict['actions'])

# Smoothness reward (penalize jerky movements)
smoothness_reward = self.compute_smoothness_reward(action_dict['actions'])

# Contact reward (proper foot contact)
contact_reward = self.compute_contact_reward(contact_forces)

# Safety penalties
safety_penalty = self.compute_safety_penalty(obs_dict)

# Total reward
total_reward = (
self.cfg.forward_reward_scale * forward_reward +
self.cfg.balance_reward_scale * balance_reward +
self.cfg.energy_reward_scale * energy_reward +
self.cfg.smoothness_reward_scale * smoothness_reward +
self.cfg.contact_reward_scale * contact_reward -
self.cfg.safety_penalty_scale * safety_penalty
)

return total_reward

def compute_forward_reward(self, base_lin_vel):
"""Reward for moving forward at desired speed"""
# Target forward velocity
target_vel_x = self.cfg.target_velocity

# Forward velocity achieved
forward_vel_x = base_lin_vel[:, 0]

# Difference from target
vel_error = torch.abs(forward_vel_x - target_vel_x)

# Exponential reward for achieving target velocity
forward_reward = torch.exp(-vel_error / self.cfg.velocity_scale)

return forward_reward

def compute_balance_reward(self, base_quat):
"""Reward for maintaining upright orientation"""
# Convert quaternion to roll/pitch angles
roll, pitch, _ = self.quat_to_euler(base_quat)

# Penalize deviations from upright (0, 0, *)
orientation_error = torch.sqrt(roll**2 + pitch**2)

# Exponential penalty for orientation deviation
balance_reward = torch.exp(-orientation_error / self.cfg.orientation_scale)

return balance_reward

def compute_energy_reward(self, joint_vel, actions):
"""Reward for energy efficiency"""
# Penalize excessive joint velocity
joint_power = torch.sum(torch.abs(joint_vel), dim=1)

# Also penalize action magnitude (control effort)
action_power = torch.sum(torch.abs(actions), dim=1)

# Energy penalty
energy_penalty = joint_power + action_power

# Convert to reward (negative penalty)
energy_reward = torch.exp(-energy_penalty * self.cfg.energy_scale)

return energy_reward

def compute_smoothness_reward(self, actions):
"""Reward for smooth control actions"""
# Penalize large action changes (jerk)
if hasattr(self, 'prev_actions'):
action_change = torch.sum(torch.abs(actions - self.prev_actions), dim=1)
else:
action_change = torch.zeros_like(actions[:, 0])

self.prev_actions = actions.clone()

# Smoothness reward
smoothness_reward = torch.exp(-action_change * self.cfg.smoothness_scale)

return smoothness_reward

def compute_contact_reward(self, contact_forces):
"""Reward for proper foot contact"""
# Identify feet contacts
left_foot_force = contact_forces[:, 0] # Assuming index 0 is left foot
right_foot_force = contact_forces[:, 1] # Assuming index 1 is right foot

# Reward proper alternation of foot contact during walking
contact_reward = torch.zeros_like(left_foot_force)

# More sophisticated contact pattern evaluation would go here
contact_reward += torch.exp(-torch.abs(left_foot_force - right_foot_force) / 100.0)

return contact_reward

def compute_safety_penalty(self, obs_dict):
"""Penalty for unsafe behaviors"""
base_pos = obs_dict['base_pos']
joint_pos = obs_dict['joint_pos']

# Check if robot is falling (base too low)
fall_penalty = torch.where(base_pos[:, 2] < self.cfg.min_height, 1.0, 0.0)

# Check joint limits
joint_limit_penalty = torch.sum(
torch.where(
torch.abs(joint_pos) > self.cfg.joint_limit,
torch.abs(joint_pos) - self.cfg.joint_limit,
torch.zeros_like(joint_pos)
),
dim=1
)

# Total safety penalty
safety_penalty = fall_penalty + torch.sum(joint_limit_penalty, dim=1)

return safety_penalty

def quat_to_euler(self, quat):
"""Convert quaternion to Euler angles"""
# Convert quaternion to roll, pitch, yaw
# Implementation depends on coordinate convention
pass

def reset(self):
"""Reset for new episode"""
self.prev_actions = None

Complex Task Reward Examples

class ComplexTaskRewardFunction:
"""Examples of reward functions for different tasks"""

def manipulation_reward(self, obs_dict):
"""Reward function for manipulation tasks"""
# Distance to target object
object_pos = obs_dict['object_pos']
hand_pos = obs_dict['hand_pos']
target_pos = obs_dict['target_pos']

# Grasp reward
grasp_reward = self.compute_grasp_reward(obs_dict)

# Reach reward
reach_reward = self.compute_reach_reward(hand_pos, object_pos)

# Place reward
place_reward = self.compute_place_reward(hand_pos, target_pos)

return grasp_reward + reach_reward + place_reward

def navigation_reward(self, obs_dict):
"""Reward function for navigation tasks"""
robot_pos = obs_dict['robot_pos']
goal_pos = obs_dict['goal_pos']
obstacles = obs_dict['obstacles']

# Distance to goal
dist_to_goal = torch.norm(robot_pos - goal_pos, dim=1)
goal_reward = torch.exp(-dist_to_goal / 0.5)

# Collision penalty
collision_penalty = self.compute_collision_penalty(obs_dict)

# Path efficiency
path_efficiency = self.compute_path_efficiency(obs_dict)

return goal_reward - collision_penalty + path_efficiency

def compute_collision_penalty(self, obs_dict):
"""Penalty for collisions with obstacles"""
# Implementation for collision detection and penalty
pass

RL Algorithms for Robotics

Deep Deterministic Policy Gradient (DDPG)

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()

self.l1 = nn.Linear(state_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, action_dim)

self.max_action = max_action

def forward(self, state):
a = F.relu(self.l1(state))
a = F.relu(self.l2(a))
return self.max_action * torch.tanh(self.l3(a))

class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()

# Q1 architecture
self.l1 = nn.Linear(state_dim + action_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, 1)

# Q2 architecture
self.l4 = nn.Linear(state_dim + action_dim, 256)
self.l5 = nn.Linear(256, 256)
self.l6 = nn.Linear(256, 1)

def forward(self, state, action):
sa = torch.cat([state, action], 1)

q1 = F.relu(self.l1(sa))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)

q2 = F.relu(self.l4(sa))
q2 = F.relu(self.l5(q2))
q2 = self.l6(q2)
return q1, q2

def Q1(self, state, action):
sa = torch.cat([state, action], 1)

q1 = F.relu(self.l1(sa))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)
return q1

class DDPGAgent:
def __init__(self, state_dim, action_dim, max_action):
self.actor = Actor(state_dim, action_dim, max_action).cuda()
self.actor_target = Actor(state_dim, action_dim, max_action).cuda()
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=1e-4)

self.critic = Critic(state_dim, action_dim).cuda()
self.critic_target = Critic(state_dim, action_dim).cuda()
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=1e-3)

self.max_action = max_action

def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).cuda()
return self.actor(state).cpu().data.numpy().flatten()

def update(self, replay_buffer, batch_size=100, discount=0.99, tau=0.005):
# Sample replay buffer
state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)

with torch.no_grad():
# Select action according to policy and add clipped noise
noise = torch.randn_like(action) * 0.2
noise = noise.clamp(-0.5, 0.5)

next_action = (self.actor_target(next_state) + noise).clamp(-self.max_action, self.max_action)

# Compute the target Q value
target_Q1, target_Q2 = self.critic_target(next_state, next_action)
target_Q = torch.min(target_Q1, target_Q2)
target_Q = reward + not_done * discount * target_Q

# Get current Q estimates
current_Q1, current_Q2 = self.critic(state, action)

# Compute critic loss
critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

# Optimize the critic
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# Compute actor loss
actor_loss = -self.critic.Q1(state, self.actor(state)).mean()

# Optimize the actor
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# Update the frozen target models
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

Soft Actor-Critic (SAC) for Continuous Control

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np

class SACActor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(SACActor, self).__init__()

self.l1 = nn.Linear(state_dim, 256)
self.l2 = nn.Linear(256, 256)

self.mean_linear = nn.Linear(256, action_dim)
self.log_std_linear = nn.Linear(256, action_dim)

self.max_action = max_action

def forward(self, state):
x = F.relu(self.l1(state))
x = F.relu(self.l2(x))

mean = self.mean_linear(x)
log_std = self.log_std_linear(x)
log_std = torch.clamp(log_std, min=-20, max=2)

return mean, log_std

def sample(self, state):
mean, log_std = self.forward(state)
std = log_std.exp()
normal = Normal(mean, std)
x_t = normal.rsample() # for reparameterization trick
y_t = torch.tanh(x_t)
action = y_t * self.max_action
log_prob = normal.log_prob(x_t)
# Enforcing Action Bound
log_prob -= torch.log(self.max_action * (1 - y_t.pow(2)) + 1e-6)
log_prob = log_prob.sum(1, keepdim=True)
return action, log_prob

class SACCritic(nn.Module):
def __init__(self, state_dim, action_dim):
super(SACCritic, self).__init__()

# Q1 architecture
self.l1 = nn.Linear(state_dim + action_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, 1)

# Q2 architecture
self.l4 = nn.Linear(state_dim + action_dim, 256)
self.l5 = nn.Linear(256, 256)
self.l6 = nn.Linear(256, 1)

def forward(self, state, action):
sa = torch.cat([state, action], 1)

q1 = F.relu(self.l1(sa))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)

q2 = F.relu(self.l4(sa))
q2 = F.relu(self.l5(q2))
q2 = self.l6(q2)
return q1, q2

class SACAgent:
def __init__(self, state_dim, action_dim, max_action, discount=0.99, tau=0.005, alpha=0.2):
self.actor = SACActor(state_dim, action_dim, max_action).cuda()
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=3e-4)

self.critic_1 = SACCritic(state_dim, action_dim).cuda()
self.critic_2 = SACCritic(state_dim, action_dim).cuda()

self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=3e-4)
self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=3e-4)

self.target_critic_1 = SACCritic(state_dim, action_dim).cuda()
self.target_critic_2 = SACCritic(state_dim, action_dim).cuda()

self.target_critic_1.load_state_dict(self.critic_1.state_dict())
self.target_critic_2.load_state_dict(self.critic_2.state_dict())

self.max_action = max_action
self.discount = discount
self.tau = tau
self.alpha = alpha

def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).cuda()
action, _ = self.actor.sample(state)
return action.cpu().data.numpy().flatten()

def update(self, replay_buffer, batch_size=256):
state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)

with torch.no_grad():
next_action, next_log_prob = self.actor.sample(next_state)
target_q1, target_q2 = self.target_critic_1(next_state, next_action), self.target_critic_2(next_state, next_action)
target_q = torch.min(target_q1, target_q2) - self.alpha * next_log_prob
target_q = reward + not_done * self.discount * target_q

# Update critics
current_q1, current_q2 = self.critic_1(state, action), self.critic_2(state, action)

critic_loss_1 = F.mse_loss(current_q1, target_q)
critic_loss_2 = F.mse_loss(current_q2, target_q)

self.critic_1_optimizer.zero_grad()
critic_loss_1.backward()
self.critic_1_optimizer.step()

self.critic_2_optimizer.zero_grad()
critic_loss_2.backward()
self.critic_2_optimizer.step()

# Update actor
new_action, log_prob = self.actor.sample(state)
q1, q2 = self.critic_1(state, new_action), self.critic_2(state, new_action)
q = torch.min(q1, q2)

actor_loss = (self.alpha * log_prob - q).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# Update target networks
for param, target_param in zip(self.critic_1.parameters(), self.target_critic_1.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

for param, target_param in zip(self.critic_2.parameters(), self.target_critic_2.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

Training Loop Implementation

Isaac Lab Training Loop

from omni.isaac.orbit.envs import ManagerBasedRLEnv
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.dict import class_to_dict
from omni.isaac.orbit_tasks.utils.training import get_default_env_cfg_cli_args, parse_env_cfg
from omni.isaac.orbit_tasks.locomotion.velocity.velocity_env_cfg import LocomotionVelocityRoughEnvCfg
import hydra
from hydra import compose, initialize_config_dir
from hydra.core.global_hydra import GlobalHydra
import torch
import wandb

class IsaacRLTrainer:
def __init__(self, cfg):
self.cfg = cfg
self.device = cfg.device
self.env = None
self.agent = None
self.setup_training()

def setup_training(self):
"""Setup the training environment and agent"""
# Parse environment configuration
env_cfg = self.cfg.env_cfg
env_cfg = parse_env_cfg(env_cfg)

# Create environment
self.env = ManagerBasedRLEnv(cfg=env_cfg)

# Initialize agent (DDPG, SAC, PPO, etc.)
self.agent = self.initialize_agent()

# Initialize logging
if self.cfg.use_wandb:
wandb.init(
project="humanoid-rl",
config=class_to_dict(self.cfg),
name=self.cfg.experiment_name
)

def initialize_agent(self):
"""Initialize the RL agent based on configuration"""
if self.cfg.algorithm == "sac":
from omni.isaac.orbit_assets.rl_agents.sac import SAC
return SAC(
state_dim=self.env.observation_space.shape[0],
action_dim=self.env.action_space.shape[0],
action_bound=self.env.action_space.high[0],
device=self.device
)
elif self.cfg.algorithm == "ppo":
from omni.isaac.orbit_assets.rl_agents.ppo import PPO
return PPO(
state_dim=self.env.observation_space.shape[0],
action_dim=self.env.action_space.shape[0],
device=self.device
)
else:
raise ValueError(f"Unknown algorithm: {self.cfg.algorithm}")

def train(self, num_iterations=1000000):
"""Main training loop"""
obs = self.env.reset()
episode_return = 0
episode_length = 0
best_return = float('-inf')

for iteration in range(num_iterations):
# Select action
action = self.agent.select_action(obs)

# Add exploration noise if needed
if self.cfg.exploration_noise > 0:
action = action + np.random.normal(0, self.cfg.exploration_noise, size=action.shape)
action = np.clip(action, self.env.action_space.low, self.env.action_space.high)

# Step environment
next_obs, reward, terminated, truncated, info = self.env.step(action)

# Store transition
self.agent.store_transition(obs, action, reward, next_obs, terminated)

# Update episode stats
episode_return += reward
episode_length += 1

# Update agent
if iteration >= self.cfg.start_timesteps and iteration % self.cfg.update_freq == 0:
for _ in range(self.cfg.update_freq):
self.agent.update()

# Check for episode termination
if terminated or truncated:
# Log episode info
if self.cfg.use_wandb:
wandb.log({
"episode_return": episode_return,
"episode_length": episode_length,
"iteration": iteration
})

# Save best model
if episode_return > best_return:
best_return = episode_return
self.save_model(f"best_model_{iteration}.pth")

# Reset for new episode
obs = self.env.reset()
episode_return = 0
episode_length = 0
else:
obs = next_obs

# Save checkpoints
if iteration % self.cfg.save_freq == 0:
self.save_model(f"checkpoint_{iteration}.pth")

# Log progress
if iteration % self.cfg.log_freq == 0:
print(f"Iteration {iteration}: Episode Return = {episode_return:.2f}")

def save_model(self, path):
"""Save trained model"""
torch.save({
'agent_state_dict': self.agent.state_dict(),
'env_cfg': self.env.cfg
}, path)

def load_model(self, path):
"""Load trained model"""
checkpoint = torch.load(path)
self.agent.load_state_dict(checkpoint['agent_state_dict'])

Sim-to-Real Transfer Techniques

Domain Randomization

import random
import numpy as np

class DomainRandomization:
def __init__(self, env):
self.env = env
self.randomization_params = {
'friction': (0.5, 1.5), # Randomize friction coefficients
'mass': (0.9, 1.1), # Randomize mass by ±10%
'com_offset': 0.01, # Randomize center of mass offset
'actuator_delay': (0, 0.02), # Randomize actuator delays
'sensor_noise': (0.0, 0.01) # Randomize sensor noise
}

def randomize_environment(self):
"""Apply domain randomization to environment"""
# Randomize friction
friction_range = self.randomization_params['friction']
new_friction = random.uniform(*friction_range)
self.set_environment_friction(new_friction)

# Randomize robot mass
mass_range = self.randomization_params['mass']
mass_multiplier = random.uniform(*mass_range)
self.multiply_robot_mass(mass_multiplier)

# Randomize center of mass
com_offset_range = (-self.randomization_params['com_offset'],
self.randomization_params['com_offset'])
com_offset = [random.uniform(*com_offset_range) for _ in range(3)]
self.offset_center_of_mass(com_offset)

# Randomize actuator properties
delay_range = self.randomization_params['actuator_delay']
actuator_delay = random.uniform(*delay_range)
self.set_actuator_delay(actuator_delay)

# Randomize sensor noise
noise_range = self.randomization_params['sensor_noise']
sensor_noise = random.uniform(*noise_range)
self.set_sensor_noise(sensor_noise)

def set_environment_friction(self, friction):
"""Set friction parameters in simulation"""
# Apply to ground plane and robot links
pass

def multiply_robot_mass(self, multiplier):
"""Multiply robot link masses by a factor"""
# Update each link's mass
pass

def offset_center_of_mass(self, offset):
"""Apply offset to center of mass"""
# Update robot's center of mass
pass

def set_actuator_delay(self, delay):
"""Add delay to actuator responses"""
# Implement actuator delay
pass

def set_sensor_noise(self, noise):
"""Add noise to sensor readings"""
# Add noise to all sensors
pass

System Identification and Parameter Adaptation

import numpy as np
from scipy.optimize import minimize

class SystemIdentification:
def __init__(self, robot_model):
self.robot_model = robot_model
self.sim_params = {}
self.real_params = {}

def collect_data(self, inputs, method='step_response'):
"""Collect data from simulation and real robot"""
if method == 'step_response':
sim_outputs = self.simulate_step_response(inputs)
real_outputs = self.measure_real_step_response(inputs)
elif method == 'frequency_response':
sim_outputs = self.simulate_frequency_response(inputs)
real_outputs = self.measure_real_frequency_response(inputs)

return sim_outputs, real_outputs

def identify_parameters(self, sim_data, real_data):
"""Identify physical parameters using collected data"""
# Define objective function to minimize
def objective_function(params):
# Update simulation with new parameters
self.update_simulation_params(params)

# Simulate with new parameters
sim_response = self.simulate_with_params(params)

# Calculate error between simulation and real data
error = np.mean((sim_response - real_data)**2)
return error

# Initial parameter guess
initial_params = self.get_initial_parameters()

# Optimize parameters
result = minimize(
objective_function,
initial_params,
method='BFGS',
options={'disp': True}
)

return result.x

def adapt_controller(self, identified_params):
"""Adapt controller based on identified parameters"""
# Update controller gains based on new parameters
# Apply adaptive control techniques
pass

def update_simulation_params(self, params):
"""Update simulation with new parameters"""
# Update friction, mass, inertia, etc.
pass

Deployment and Real Robot Integration

Policy Deployment

import torch
import numpy as np

class PolicyDeployer:
def __init__(self, model_path, robot_interface):
self.policy = self.load_policy(model_path)
self.robot_interface = robot_interface
self.normalize_obs = True
self.denormalize_action = True

def load_policy(self, model_path):
"""Load trained policy for deployment"""
# Load model
model = torch.load(model_path)

# Set to evaluation mode
model.eval()

# Optimize for inference if possible
if torch.cuda.is_available():
model = model.cuda()

return model

def deploy(self):
"""Deploy policy to robot"""
# Reset robot to initial state
self.robot_interface.reset()

# Start control loop
obs = self.robot_interface.get_observation()

while True:
# Preprocess observation
processed_obs = self.preprocess_observation(obs)

# Get action from policy
with torch.no_grad():
action = self.policy(processed_obs)

# Postprocess action
processed_action = self.postprocess_action(action)

# Execute action
obs, reward, done, info = self.robot_interface.step(processed_action)

if done:
self.robot_interface.reset()
obs = self.robot_interface.get_observation()

def preprocess_observation(self, obs):
"""Preprocess observation for policy"""
# Convert to tensor
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)

# Normalize if needed
if self.normalize_obs:
obs_tensor = self.normalize(obs_tensor)

# Move to device
if torch.cuda.is_available():
obs_tensor = obs_tensor.cuda()

return obs_tensor

def postprocess_action(self, action):
"""Postprocess action from policy"""
# Convert to numpy
action_np = action.cpu().numpy().flatten()

# Denormalize if needed
if self.denormalize_action:
action_np = self.denormalize_action_values(action_np)

return action_np

def normalize(self, obs):
"""Apply normalization to observation"""
# Apply learned normalization parameters
pass

def denormalize_action_values(self, action):
"""Denormalize action values to robot limits"""
# Scale actions to robot joint limits
pass

Training Optimization and Best Practices

Multi-GPU Training

import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import torch.multiprocessing as mp

class MultiGPORLTrainer:
def __init__(self, num_gpus, env_cfg, model_cfg):
self.num_gpus = num_gpus
self.env_cfg = env_cfg
self.model_cfg = model_cfg
self.device_ids = list(range(num_gpus))

def setup_distributed(self, rank, world_size):
"""Setup distributed training"""
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank)

def train_distributed(self):
"""Start distributed training"""
world_size = self.num_gpus
mp.spawn(
self.train_worker,
args=(world_size,),
nprocs=world_size,
join=True
)

def train_worker(self, rank, world_size):
"""Training worker process"""
self.setup_distributed(rank, world_size)

# Create environment
env = self.create_env()

# Create model and wrap with DDP
model = self.create_model().cuda(rank)
ddp_model = DDP(model, device_ids=[rank])

# Create optimizer
optimizer = torch.optim.Adam(ddp_model.parameters())

# Training loop
for episode in range(self.model_cfg.num_episodes):
# Sample batch
batch = env.sample_batch()

# Compute loss
loss = self.compute_loss(ddp_model, batch)

# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()

# Synchronize across GPUs
if episode % self.model_cfg.sync_frequency == 0:
self.synchronize_parameters(ddp_model)

def create_model(self):
"""Create RL model"""
# Create actor-critic network
pass

def create_env(self):
"""Create distributed environment"""
# Create parallel environments across GPUs
pass

def synchronize_parameters(self, model):
"""Synchronize model parameters across GPUs"""
# Average parameters across all GPUs
pass

Validation and Testing

Policy Evaluation

import numpy as np

class PolicyEvaluator:
def __init__(self, policy, env):
self.policy = policy
self.env = env
self.metrics = {}

def evaluate_policy(self, num_episodes=100, render=False):
"""Evaluate policy performance"""
returns = []
episode_lengths = []
success_rates = []

for episode in range(num_episodes):
obs = self.env.reset()
total_return = 0
episode_length = 0
success = False

while True:
# Get action from policy
with torch.no_grad():
action = self.policy.select_action(obs)

# Step environment
next_obs, reward, done, info = self.env.step(action)

total_return += reward
episode_length += 1

if done:
# Check if task was completed successfully
success = info.get('success', False)
break

obs = next_obs

returns.append(total_return)
episode_lengths.append(episode_length)
success_rates.append(success)

# Calculate metrics
self.metrics = {
'mean_return': np.mean(returns),
'std_return': np.std(returns),
'mean_length': np.mean(episode_lengths),
'success_rate': np.mean(success_rates),
'median_return': np.median(returns)
}

return self.metrics

def robustness_test(self, noise_levels=[0.01, 0.05, 0.1]):
"""Test policy robustness to sensor noise"""
robustness_results = {}

for noise_level in noise_levels:
# Add noise to sensors
self.env.set_sensor_noise(noise_level)

# Evaluate policy
metrics = self.evaluate_policy(num_episodes=50)
robustness_results[noise_level] = metrics

return robustness_results

Best Practices for Isaac RL

1. Environment Design

  • Start with simplified environments
  • Gradually increase complexity
  • Use curriculum learning
  • Implement proper reset logic

2. Reward Engineering

  • Keep rewards simple initially
  • Use sparse rewards where possible
  • Add shaping rewards carefully
  • Test reward function separately

3. Hyperparameter Tuning

  • Use population-based training
  • Implement automated hyperparameter search
  • Monitor training curves closely
  • Validate on multiple seeds

4. Safety Considerations

  • Implement safety constraints
  • Use action clipping
  • Monitor robot states during training
  • Implement emergency stops

Troubleshooting Common Issues

Training Issues

  • Exploding gradients: Reduce learning rate, use gradient clipping
  • Poor exploration: Adjust exploration parameters, reward shaping
  • Stability issues: Regularize policies, improve reward design

Performance Issues

  • Slow training: Use more parallel environments, optimize code
  • Memory issues: Reduce batch size, optimize memory usage
  • Convergence problems: Adjust hyperparameters, check reward design

Deployment Issues

  • Sim-to-real gap: Use domain randomization, system identification
  • Timing constraints: Optimize inference, reduce model complexity
  • Hardware limitations: Quantize models, use TensorRT optimization

Summary

Reinforcement learning provides powerful tools for training complex humanoid robot behaviors. Isaac's GPU-accelerated environments and optimized RL frameworks enable efficient training of sophisticated policies. Success requires careful reward design, proper environment setup, and systematic sim-to-real transfer techniques.

Next Steps

In the next chapter, we'll explore sim-to-real transfer techniques in more detail, including advanced methods for bridging the reality gap between simulation and real-world robot performance.


Estimated Reading Time: 36 minutes