Building ROS 2 Packages with Python
This chapter focuses on creating complete ROS 2 packages using Python, which is one of the most popular languages for robotics development due to its ease of use and extensive library ecosystem. You'll learn how to structure packages, create nodes, and implement complete robot applications.
Learning Objectives
By the end of this chapter, you will be able to:
- Create and structure ROS 2 packages for Python
- Implement complete robot nodes with proper lifecycle management
- Use ROS 2 parameters for configurable behavior
- Create launch files to start multiple nodes together
- Implement proper logging and error handling
- Package and distribute your ROS 2 applications
ROS 2 Package Structure
A well-structured ROS 2 package follows a standard layout:
my_robot_package/
├── CMakeLists.txt # Build configuration for C++
├── package.xml # Package metadata and dependencies
├── setup.py # Python package configuration
├── setup.cfg # Python build configuration
├── my_robot_package/ # Python module directory
│ ├── __init__.py # Python package initialization
│ ├── my_node.py # Main node implementation
│ └── my_module.py # Additional modules
├── launch/ # Launch files
│ └── my_launch_file.py
├── config/ # Configuration files
│ └── my_config.yaml
├── test/ # Unit tests
│ └── test_my_node.py
└── resource/ # Package resource files
└── my_robot_package # Package marker file
Creating a New ROS 2 Package
Using ros2 pkg Command
# Create a new Python package
ros2 pkg create --build-type ament_python my_robot_package
# Create a new package with dependencies
ros2 pkg create --build-type ament_python my_robot_package \
--dependencies rclpy std_msgs sensor_msgs geometry_msgs
Package.xml Configuration
The package.xml file contains metadata about your package:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_package</name>
<version>0.0.0</version>
<description>Package for my robot functionality</description>
<maintainer email="maintainer@todo.todo">maintainer</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
setup.py Configuration
The setup.py file configures your Python package:
from setuptools import setup
import os
from glob import glob
package_name = 'my_robot_package'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include launch files
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
# Include config files
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='your.email@todo.todo',
description='Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'my_node = my_robot_package.my_node:main',
'another_node = my_robot_package.another_node:main',
],
},
)
Implementing ROS 2 Nodes in Python
Basic Node Structure
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MyRobotNode(Node):
def __init__(self):
super().__init__('my_robot_node')
# Create a publisher
self.publisher = self.create_publisher(String, 'topic_name', 10)
# Create a subscriber
self.subscription = self.create_subscription(
String,
'input_topic',
self.listener_callback,
10
)
# Create a timer
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.get_logger().info('MyRobotNode initialized')
def timer_callback(self):
msg = String()
msg.data = f'Hello from {self.get_name()}'
self.publisher.publish(msg)
def listener_callback(self, msg):
self.get_logger().info(f'I heard: {msg.data}')
def main(args=None):
rclpy.init(args=args)
node = MyRobotNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Using Parameters
import rclpy
from rclpy.node import Node
class ParameterizedNode(Node):
def __init__(self):
super().__init__('parameterized_node')
# Declare parameters with default values
self.declare_parameter('robot_name', 'default_robot')
self.declare_parameter('max_velocity', 1.0)
self.declare_parameter('safety_mode', True)
# Get parameter values
self.robot_name = self.get_parameter('robot_name').value
self.max_velocity = self.get_parameter('max_velocity').value
self.safety_mode = self.get_parameter('safety_mode').value
# Create parameter callback
self.add_on_set_parameters_callback(self.parameter_callback)
def parameter_callback(self, params):
for param in params:
if param.name == 'max_velocity' and param.type_ == param.Type.DOUBLE:
self.max_velocity = param.value
self.get_logger().info(f'Max velocity updated to: {param.value}')
return SetParametersResult(successful=True)
Working with Custom Message Types
# Assuming you have a custom message in msg/RobotCommand.msg
from my_robot_package_interfaces.msg import RobotCommand
class CommandNode(Node):
def __init__(self):
super().__init__('command_node')
self.publisher = self.create_publisher(RobotCommand, 'robot_command', 10)
def send_command(self, command_type, target_position, velocity):
msg = RobotCommand()
msg.command_type = command_type
msg.target_position = target_position
msg.velocity = velocity
self.publisher.publish(msg)
Creating Launch Files
Launch files allow you to start multiple nodes with a single command.
Python Launch File
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
# Declare launch arguments
robot_name_launch_arg = DeclareLaunchArgument(
'robot_name',
default_value='default_robot',
description='Name of the robot'
)
# Create nodes
my_node = Node(
package='my_robot_package',
executable='my_node',
name='my_robot_node',
parameters=[
{'robot_name': LaunchConfiguration('robot_name')},
{'max_velocity': 1.0},
{'safety_mode': True}
],
remappings=[
('input_topic', 'remapped_input_topic'),
('output_topic', 'remapped_output_topic')
]
)
another_node = Node(
package='my_robot_package',
executable='another_node',
name='another_robot_node',
parameters=[{'param1': 'value1'}]
)
return LaunchDescription([
robot_name_launch_arg,
my_node,
another_node
])
Running Launch Files
# Run with default arguments
ros2 launch my_robot_package my_launch_file.py
# Run with custom arguments
ros2 launch my_robot_package my_launch_file.py robot_name:=my_robot
Configuration Files
YAML configuration files allow you to separate configuration from code:
config/robot_config.yaml
my_robot_node:
ros__parameters:
robot_name: "my_robot"
max_velocity: 2.0
safety_mode: true
sensor_topics:
- "/sensors/camera/image_raw"
- "/sensors/imu/data"
control_topics:
- "/cmd_vel"
- "/joint_commands"
Loading Configuration in Launch Files
from launch import LaunchDescription
from launch_ros.actions import Node
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
# Get config file path
config = os.path.join(
get_package_share_directory('my_robot_package'),
'config',
'robot_config.yaml'
)
# Create node with config
my_node = Node(
package='my_robot_package',
executable='my_node',
name='my_robot_node',
parameters=[config]
)
return LaunchDescription([my_node])
Advanced Node Features
Timers and Callback Groups
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
class AdvancedNode(Node):
def __init__(self):
super().__init__('advanced_node')
# Create callback groups for threading
timer_callback_group = MutuallyExclusiveCallbackGroup()
subscriber_callback_group = MutuallyExclusiveCallbackGroup()
# Create timers with specific callback groups
self.timer1 = self.create_timer(
1.0, self.timer1_callback,
callback_group=timer_callback_group
)
self.timer2 = self.create_timer(
0.5, self.timer2_callback,
callback_group=timer_callback_group
)
# Create subscriber with specific callback group
self.subscription = self.create_subscription(
String, 'input_topic', self.subscriber_callback,
10, callback_group=subscriber_callback_group
)
Client Libraries and Services
from example_interfaces.srv import AddTwoInts
class ServiceClientNode(Node):
def __init__(self):
super().__init__('service_client_node')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
# Synchronous call
future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def send_request_async(self, a, b):
self.req.a = a
self.req.b = b
# Asynchronous call
future = self.cli.call_async(self.req)
future.add_done_callback(self.response_callback)
return future
def response_callback(self, future):
try:
result = future.result()
self.get_logger().info(f'Result: {result.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
Logging and Debugging
Proper Logging
import rclpy
from rclpy.node import Node
class LoggingNode(Node):
def __init__(self):
super().__init__('logging_node')
# Different log levels
self.get_logger().debug('Debug message')
self.get_logger().info('Info message')
self.get_logger().warn('Warning message')
self.get_logger().error('Error message')
self.get_logger().fatal('Fatal message')
# Formatted logging
robot_id = 42
self.get_logger().info(f'Robot {robot_id} initialized')
# Logging with exception
try:
# Some operation that might fail
result = 10 / 0
except ZeroDivisionError as e:
self.get_logger().error(f'Division error: {e}')
Debugging Tools
# Add debugging information to your nodes
def debug_node_info(self):
# Get node information
node_name = self.get_name()
node_namespace = self.get_namespace()
# Get topic information
topics = self.get_topic_names_and_types()
# Get service information
services = self.get_service_names_and_types()
# Log for debugging
self.get_logger().info(f'Node: {node_name}, Namespace: {node_namespace}')
self.get_logger().info(f'Topics: {topics}')
self.get_logger().info(f'Services: {services}')
Unit Testing
Creating Unit Tests
# test/test_my_node.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from my_robot_package.my_node import MyRobotNode
class TestMyRobotNode(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
@classmethod
def tearDownClass(cls):
rclpy.shutdown()
def setUp(self):
self.node = MyRobotNode()
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
def tearDown(self):
self.node.destroy_node()
def test_node_initialization(self):
self.assertIsNotNone(self.node)
self.assertEqual(self.node.get_name(), 'my_robot_node')
def test_parameter_declaration(self):
# Test that parameters were declared properly
parameters = self.node.get_parameters_by_prefix('')
self.assertIn('robot_name', parameters)
self.assertIn('max_velocity', parameters)
if __name__ == '__main__':
unittest.main()
Running Tests
# Run all tests in the package
colcon test --packages-select my_robot_package
# Run specific tests
colcon test --packages-select my_robot_package --ctest-args -R test_my_node
Error Handling and Robustness
Exception Handling
class RobustNode(Node):
def __init__(self):
super().__init__('robust_node')
# Initialize with error handling
try:
self.setup_components()
except Exception as e:
self.get_logger().error(f'Failed to initialize components: {e}')
raise
def setup_components(self):
try:
# Component initialization that might fail
self.initialize_sensors()
self.initialize_actuators()
except Exception as e:
self.get_logger().error(f'Component initialization failed: {e}')
# Implement recovery strategy
self.recovery_procedure()
def recovery_procedure(self):
# Implement safe recovery
self.get_logger().warn('Attempting recovery procedure...')
# Reset components, reinitialize, etc.
Graceful Shutdown
import signal
import sys
def signal_handler(sig, frame):
print('Shutting down gracefully...')
if hasattr(node, 'destroy_node'):
node.destroy_node()
rclpy.shutdown()
sys.exit(0)
# Register signal handler
signal.signal(signal.SIGINT, signal_handler)
Performance Optimization
Efficient Message Handling
class EfficientNode(Node):
def __init__(self):
super().__init__('efficient_node')
# Use appropriate QoS settings
qos_profile = rclpy.qos.QoSProfile(
depth=10,
reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT,
durability=rclpy.qos.DurabilityPolicy.VOLATILE
)
self.subscription = self.create_subscription(
sensor_msgs.msg.Image,
'image_topic',
self.image_callback,
qos_profile
)
# Pre-allocate message objects when possible
self.msg_buffer = String()
def image_callback(self, msg):
# Process message efficiently
# Avoid unnecessary copies
# Use numpy arrays for image processing when applicable
pass
Packaging and Distribution
Creating Debian Packages
# Build the package
colcon build --packages-select my_robot_package
# Create Debian package
vcs import < dependencies.repos
rosdep install --from-paths src --ignore-src -r -y
colcon build
colcon package
Docker Containerization
# Dockerfile
FROM ros:humble-ros-base
# Install dependencies
RUN apt-get update && apt-get install -y \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Copy package
COPY . /workspace/src/my_robot_package
WORKDIR /workspace
# Build
RUN . /opt/ros/humble/setup.sh && \
colcon build --packages-select my_robot_package
# Source and run
RUN echo "source /workspace/install/setup.bash" >> ~/.bashrc
CMD ["bash"]
ROS 2 in Humanoid Robotics Context
Robot-Specific Package Structure
humanoid_robot_bringup/
├── launch/
│ ├── robot.launch.py # Bring up entire robot
│ ├── navigation.launch.py # Navigation stack
│ └── perception.launch.py # Perception stack
├── config/
│ ├── controllers.yaml # Joint controllers
│ ├── sensors.yaml # Sensor configurations
│ └── robot_description.yaml
└── scripts/ # Helper scripts
└── robot_control.py
Integration with Hardware
- Driver packages: Interface with physical sensors and actuators
- Controller packages: Handle robot control loops
- Behavior packages: Implement high-level robot behaviors
- Simulation packages: Bridge between simulation and real hardware
Summary
Creating ROS 2 packages with Python involves understanding the package structure, implementing proper node lifecycle management, using parameters for configuration, and creating launch files for system startup. Proper logging, error handling, and testing ensure robust robot applications. The modular approach of ROS 2 packages enables building complex humanoid robot systems from smaller, manageable components.
Next Steps
In the next chapter, we'll explore launch files and parameter management in more detail, learning how to configure and start complex robot systems with multiple nodes.
Estimated Reading Time: 25 minutes