Skip to main content

Building ROS 2 Packages with Python

This chapter focuses on creating complete ROS 2 packages using Python, which is one of the most popular languages for robotics development due to its ease of use and extensive library ecosystem. You'll learn how to structure packages, create nodes, and implement complete robot applications.

Learning Objectives

By the end of this chapter, you will be able to:

  • Create and structure ROS 2 packages for Python
  • Implement complete robot nodes with proper lifecycle management
  • Use ROS 2 parameters for configurable behavior
  • Create launch files to start multiple nodes together
  • Implement proper logging and error handling
  • Package and distribute your ROS 2 applications

ROS 2 Package Structure

A well-structured ROS 2 package follows a standard layout:

my_robot_package/
├── CMakeLists.txt # Build configuration for C++
├── package.xml # Package metadata and dependencies
├── setup.py # Python package configuration
├── setup.cfg # Python build configuration
├── my_robot_package/ # Python module directory
│ ├── __init__.py # Python package initialization
│ ├── my_node.py # Main node implementation
│ └── my_module.py # Additional modules
├── launch/ # Launch files
│ └── my_launch_file.py
├── config/ # Configuration files
│ └── my_config.yaml
├── test/ # Unit tests
│ └── test_my_node.py
└── resource/ # Package resource files
└── my_robot_package # Package marker file

Creating a New ROS 2 Package

Using ros2 pkg Command

# Create a new Python package
ros2 pkg create --build-type ament_python my_robot_package

# Create a new package with dependencies
ros2 pkg create --build-type ament_python my_robot_package \
--dependencies rclpy std_msgs sensor_msgs geometry_msgs

Package.xml Configuration

The package.xml file contains metadata about your package:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_package</name>
<version>0.0.0</version>
<description>Package for my robot functionality</description>
<maintainer email="maintainer@todo.todo">maintainer</maintainer>
<license>TODO: License declaration</license>

<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>

setup.py Configuration

The setup.py file configures your Python package:

from setuptools import setup
import os
from glob import glob

package_name = 'my_robot_package'

setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include launch files
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
# Include config files
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='your.email@todo.todo',
description='Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'my_node = my_robot_package.my_node:main',
'another_node = my_robot_package.another_node:main',
],
},
)

Implementing ROS 2 Nodes in Python

Basic Node Structure

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MyRobotNode(Node):
def __init__(self):
super().__init__('my_robot_node')

# Create a publisher
self.publisher = self.create_publisher(String, 'topic_name', 10)

# Create a subscriber
self.subscription = self.create_subscription(
String,
'input_topic',
self.listener_callback,
10
)

# Create a timer
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)

self.get_logger().info('MyRobotNode initialized')

def timer_callback(self):
msg = String()
msg.data = f'Hello from {self.get_name()}'
self.publisher.publish(msg)

def listener_callback(self, msg):
self.get_logger().info(f'I heard: {msg.data}')

def main(args=None):
rclpy.init(args=args)
node = MyRobotNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Using Parameters

import rclpy
from rclpy.node import Node

class ParameterizedNode(Node):
def __init__(self):
super().__init__('parameterized_node')

# Declare parameters with default values
self.declare_parameter('robot_name', 'default_robot')
self.declare_parameter('max_velocity', 1.0)
self.declare_parameter('safety_mode', True)

# Get parameter values
self.robot_name = self.get_parameter('robot_name').value
self.max_velocity = self.get_parameter('max_velocity').value
self.safety_mode = self.get_parameter('safety_mode').value

# Create parameter callback
self.add_on_set_parameters_callback(self.parameter_callback)

def parameter_callback(self, params):
for param in params:
if param.name == 'max_velocity' and param.type_ == param.Type.DOUBLE:
self.max_velocity = param.value
self.get_logger().info(f'Max velocity updated to: {param.value}')
return SetParametersResult(successful=True)

Working with Custom Message Types

# Assuming you have a custom message in msg/RobotCommand.msg
from my_robot_package_interfaces.msg import RobotCommand

class CommandNode(Node):
def __init__(self):
super().__init__('command_node')
self.publisher = self.create_publisher(RobotCommand, 'robot_command', 10)

def send_command(self, command_type, target_position, velocity):
msg = RobotCommand()
msg.command_type = command_type
msg.target_position = target_position
msg.velocity = velocity
self.publisher.publish(msg)

Creating Launch Files

Launch files allow you to start multiple nodes with a single command.

Python Launch File

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
# Declare launch arguments
robot_name_launch_arg = DeclareLaunchArgument(
'robot_name',
default_value='default_robot',
description='Name of the robot'
)

# Create nodes
my_node = Node(
package='my_robot_package',
executable='my_node',
name='my_robot_node',
parameters=[
{'robot_name': LaunchConfiguration('robot_name')},
{'max_velocity': 1.0},
{'safety_mode': True}
],
remappings=[
('input_topic', 'remapped_input_topic'),
('output_topic', 'remapped_output_topic')
]
)

another_node = Node(
package='my_robot_package',
executable='another_node',
name='another_robot_node',
parameters=[{'param1': 'value1'}]
)

return LaunchDescription([
robot_name_launch_arg,
my_node,
another_node
])

Running Launch Files

# Run with default arguments
ros2 launch my_robot_package my_launch_file.py

# Run with custom arguments
ros2 launch my_robot_package my_launch_file.py robot_name:=my_robot

Configuration Files

YAML configuration files allow you to separate configuration from code:

config/robot_config.yaml

my_robot_node:
ros__parameters:
robot_name: "my_robot"
max_velocity: 2.0
safety_mode: true
sensor_topics:
- "/sensors/camera/image_raw"
- "/sensors/imu/data"
control_topics:
- "/cmd_vel"
- "/joint_commands"

Loading Configuration in Launch Files

from launch import LaunchDescription
from launch_ros.actions import Node
import os
from ament_index_python.packages import get_package_share_directory

def generate_launch_description():
# Get config file path
config = os.path.join(
get_package_share_directory('my_robot_package'),
'config',
'robot_config.yaml'
)

# Create node with config
my_node = Node(
package='my_robot_package',
executable='my_node',
name='my_robot_node',
parameters=[config]
)

return LaunchDescription([my_node])

Advanced Node Features

Timers and Callback Groups

from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor

class AdvancedNode(Node):
def __init__(self):
super().__init__('advanced_node')

# Create callback groups for threading
timer_callback_group = MutuallyExclusiveCallbackGroup()
subscriber_callback_group = MutuallyExclusiveCallbackGroup()

# Create timers with specific callback groups
self.timer1 = self.create_timer(
1.0, self.timer1_callback,
callback_group=timer_callback_group
)
self.timer2 = self.create_timer(
0.5, self.timer2_callback,
callback_group=timer_callback_group
)

# Create subscriber with specific callback group
self.subscription = self.create_subscription(
String, 'input_topic', self.subscriber_callback,
10, callback_group=subscriber_callback_group
)

Client Libraries and Services

from example_interfaces.srv import AddTwoInts

class ServiceClientNode(Node):
def __init__(self):
super().__init__('service_client_node')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')

while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')

self.req = AddTwoInts.Request()

def send_request(self, a, b):
self.req.a = a
self.req.b = b

# Synchronous call
future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, future)
return future.result()

def send_request_async(self, a, b):
self.req.a = a
self.req.b = b

# Asynchronous call
future = self.cli.call_async(self.req)
future.add_done_callback(self.response_callback)
return future

def response_callback(self, future):
try:
result = future.result()
self.get_logger().info(f'Result: {result.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')

Logging and Debugging

Proper Logging

import rclpy
from rclpy.node import Node

class LoggingNode(Node):
def __init__(self):
super().__init__('logging_node')

# Different log levels
self.get_logger().debug('Debug message')
self.get_logger().info('Info message')
self.get_logger().warn('Warning message')
self.get_logger().error('Error message')
self.get_logger().fatal('Fatal message')

# Formatted logging
robot_id = 42
self.get_logger().info(f'Robot {robot_id} initialized')

# Logging with exception
try:
# Some operation that might fail
result = 10 / 0
except ZeroDivisionError as e:
self.get_logger().error(f'Division error: {e}')

Debugging Tools

# Add debugging information to your nodes
def debug_node_info(self):
# Get node information
node_name = self.get_name()
node_namespace = self.get_namespace()

# Get topic information
topics = self.get_topic_names_and_types()

# Get service information
services = self.get_service_names_and_types()

# Log for debugging
self.get_logger().info(f'Node: {node_name}, Namespace: {node_namespace}')
self.get_logger().info(f'Topics: {topics}')
self.get_logger().info(f'Services: {services}')

Unit Testing

Creating Unit Tests

# test/test_my_node.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from my_robot_package.my_node import MyRobotNode

class TestMyRobotNode(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def setUp(self):
self.node = MyRobotNode()
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)

def tearDown(self):
self.node.destroy_node()

def test_node_initialization(self):
self.assertIsNotNone(self.node)
self.assertEqual(self.node.get_name(), 'my_robot_node')

def test_parameter_declaration(self):
# Test that parameters were declared properly
parameters = self.node.get_parameters_by_prefix('')
self.assertIn('robot_name', parameters)
self.assertIn('max_velocity', parameters)

if __name__ == '__main__':
unittest.main()

Running Tests

# Run all tests in the package
colcon test --packages-select my_robot_package

# Run specific tests
colcon test --packages-select my_robot_package --ctest-args -R test_my_node

Error Handling and Robustness

Exception Handling

class RobustNode(Node):
def __init__(self):
super().__init__('robust_node')
# Initialize with error handling
try:
self.setup_components()
except Exception as e:
self.get_logger().error(f'Failed to initialize components: {e}')
raise

def setup_components(self):
try:
# Component initialization that might fail
self.initialize_sensors()
self.initialize_actuators()
except Exception as e:
self.get_logger().error(f'Component initialization failed: {e}')
# Implement recovery strategy
self.recovery_procedure()

def recovery_procedure(self):
# Implement safe recovery
self.get_logger().warn('Attempting recovery procedure...')
# Reset components, reinitialize, etc.

Graceful Shutdown

import signal
import sys

def signal_handler(sig, frame):
print('Shutting down gracefully...')
if hasattr(node, 'destroy_node'):
node.destroy_node()
rclpy.shutdown()
sys.exit(0)

# Register signal handler
signal.signal(signal.SIGINT, signal_handler)

Performance Optimization

Efficient Message Handling

class EfficientNode(Node):
def __init__(self):
super().__init__('efficient_node')

# Use appropriate QoS settings
qos_profile = rclpy.qos.QoSProfile(
depth=10,
reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT,
durability=rclpy.qos.DurabilityPolicy.VOLATILE
)

self.subscription = self.create_subscription(
sensor_msgs.msg.Image,
'image_topic',
self.image_callback,
qos_profile
)

# Pre-allocate message objects when possible
self.msg_buffer = String()

def image_callback(self, msg):
# Process message efficiently
# Avoid unnecessary copies
# Use numpy arrays for image processing when applicable
pass

Packaging and Distribution

Creating Debian Packages

# Build the package
colcon build --packages-select my_robot_package

# Create Debian package
vcs import < dependencies.repos
rosdep install --from-paths src --ignore-src -r -y
colcon build
colcon package

Docker Containerization

# Dockerfile
FROM ros:humble-ros-base

# Install dependencies
RUN apt-get update && apt-get install -y \
python3-pip \
&& rm -rf /var/lib/apt/lists/*

# Copy package
COPY . /workspace/src/my_robot_package
WORKDIR /workspace

# Build
RUN . /opt/ros/humble/setup.sh && \
colcon build --packages-select my_robot_package

# Source and run
RUN echo "source /workspace/install/setup.bash" >> ~/.bashrc
CMD ["bash"]

ROS 2 in Humanoid Robotics Context

Robot-Specific Package Structure

humanoid_robot_bringup/
├── launch/
│ ├── robot.launch.py # Bring up entire robot
│ ├── navigation.launch.py # Navigation stack
│ └── perception.launch.py # Perception stack
├── config/
│ ├── controllers.yaml # Joint controllers
│ ├── sensors.yaml # Sensor configurations
│ └── robot_description.yaml
└── scripts/ # Helper scripts
└── robot_control.py

Integration with Hardware

  • Driver packages: Interface with physical sensors and actuators
  • Controller packages: Handle robot control loops
  • Behavior packages: Implement high-level robot behaviors
  • Simulation packages: Bridge between simulation and real hardware

Summary

Creating ROS 2 packages with Python involves understanding the package structure, implementing proper node lifecycle management, using parameters for configuration, and creating launch files for system startup. Proper logging, error handling, and testing ensure robust robot applications. The modular approach of ROS 2 packages enables building complex humanoid robot systems from smaller, manageable components.

Next Steps

In the next chapter, we'll explore launch files and parameter management in more detail, learning how to configure and start complex robot systems with multiple nodes.


Estimated Reading Time: 25 minutes