Setup NATS with Python SDK
This guide shows you how to connect to NOVA’s NATS server using the nats.py Python SDK and subscribe to real-time data streams from your robot cell.
Prerequisites
- Python 3.8 or higher
- Access to a NOVA instance with NATS enabled
- Basic knowledge of Python async/await
Ensure your network configuration allows NATS communication. NATS typically uses port 4222 for client connections and port 8222 for HTTP monitoring.
Installation
Install NATS Python client
pip install nats-py
For WebSocket support (useful for web applications or restricted networks):
pip install nats-py[aiohttp]
Verify your NATS connection details
NOVA’s NATS server is accessible via WebSocket:
- WebSocket URL:
ws://{controller_ip}:80/api/nats
(orwss://
for TLS) - Replace
{controller_ip}
with your NOVA instance IP address - Authentication: Bearer token authentication (if required by your NOVA instance)
Check with your system administrator for the exact NATS connection details and authentication requirements for your NOVA installation.
Basic connection
Here’s how to connect to NOVA’s NATS server via WebSocket:
Without Authentication
import asyncio
import nats
async def main():
# Connect to NOVA's NATS server via WebSocket
# Replace 'your-nova-ip' with your actual NOVA instance IP
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
print(f"Connected to NATS at {nc.connected_url.netloc}")
# Close connection when done
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
If your NOVA instance requires authentication, use the token-based approach. The token is typically a Bearer token obtained from your authentication system.
Environment variable setup
Set your token as an environment variable:
# Linux/macOS
export NOVA_NATS_TOKEN="your-bearer-token"
# Windows PowerShell
$env:NOVA_NATS_TOKEN="your-bearer-token"
# Windows CMD
set NOVA_NATS_TOKEN=your-bearer-token
Authentication
NOVA supports token-based authentication for NATS connections. Here’s a practical example of how to manage authentication:
import asyncio
import nats
import os
async def connect_to_nats(nats_url: str, token: str = None):
"""Connect to NATS with optional token authentication."""
try:
print(f"Connecting to NATS broker: {nats_url}")
# Prepare connection parameters
connect_kwargs = {"servers": [nats_url]}
# Use Bearer token authentication if available
if token:
connect_kwargs["token"] = token
print("Using token authentication")
# Connect to NATS
nc = await nats.connect(**connect_kwargs)
print("Successfully connected to NATS broker")
return nc
except Exception as e:
print(f"Failed to connect to NATS: {e}")
return None
async def main():
nats_url = "ws://your-nova-ip:80/api/nats"
token = os.getenv("NOVA_NATS_TOKEN")
nc = await connect_to_nats(nats_url, token)
if nc:
# Use the connection
print(f"Connected to {nc.connected_url.netloc}")
# Don't forget to close when done
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Then run your Python script:
python your_script.py
Never hardcode authentication tokens in your source code. Always use environment variables, configuration files, or secure secret management solutions.
Subscribing to cobot controller state
Stream a real-time robot controller state including joint positions, velocities, and operation mode.
The controller
in the subject is the robot controller name you created in the NOVA Setup app, not a literal string. Replace it with your actual robot name (e.g., ur5
, abb-robot
, etc.).
import asyncio
import nats
import json
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
async def state_handler(msg):
# Parse the robot controller state
state = json.loads(msg.data)
print(f"Controller: {state['controller']}")
print(f"Mode: {state['mode']}")
print(f"Operation Mode: {state['operation_mode']}")
# Access motion group data
for mg in state['motion_groups']:
print(f"Motion Group: {mg['motion_group']}")
print(f"Joint Position: {mg['joint_position']}")
print(f"Standstill: {mg['standstill']}")
# IMPORTANT: Replace these with your actual identifiers from NOVA Setup
cell_name = "cell" # Your cell name (usually "cell")
controller_name = "ur5-robot" # Your robot controller name from Setup app
# Build the subject dynamically
subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.state"
await nc.subscribe(subject, cb=state_handler)
print(f"Listening for robot controller state updates on: {subject}")
# Keep the connection alive
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
pass
finally:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Streaming I/O values
Monitor input and output signals from your robot controller or BUS I/O service:
import asyncio
import nats
import json
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
async def io_handler(msg):
data = json.loads(msg.data)
print(f"Timestamp: {data['timestamp']}")
print(f"Sequence: {data['sequence_number']}")
# Process I/O values
for io_value in data['io_values']:
io_id = io_value['io']
value = io_value['value']
value_type = io_value['value_type']
print(f" {io_id} ({value_type}): {value}")
# Replace with your actual identifiers from NOVA Setup
cell_name = "cell"
controller_name = "ur5-robot" # Your robot controller name
# Subscribe to controller I/O updates
subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.ios"
await nc.subscribe(subject, cb=io_handler)
# Or subscribe to BUS I/O updates
# bus_io_subject = f"nova.v2.cells.{cell_name}.bus-ios.ios"
# await nc.subscribe(bus_io_subject, cb=io_handler)
print("Listening for I/O updates...")
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
pass
finally:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Setting I/O output values
Write to output signals using a request-reply pattern:
import asyncio
import nats
import json
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
# Replace with your actual identifiers
cell_name = "cell"
controller_name = "ur5-robot" # Your robot controller name
# Prepare I/O values to set
io_values = [
{
"io": "output1",
"value": True,
"value_type": "boolean"
},
{
"io": "output2",
"value": "42",
"value_type": "integer"
}
]
# Build the subject for setting I/O
subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.ios.set"
# Set I/O values (with reply)
try:
response = await nc.request(
subject,
json.dumps(io_values).encode(),
timeout=2.0
)
# Check for errors in response
if response.data:
result = json.loads(response.data)
if 'message' in result:
print(f"Error: {result['message']}")
else:
print("I/O values set successfully")
except asyncio.TimeoutError:
print("Request timed out")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Subscribing with wildcards
Use wildcards to subscribe to multiple subjects at once:
import asyncio
import nats
import json
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
async def multi_handler(msg):
print(f"Received on {msg.subject}")
data = json.loads(msg.data)
# Process the message
print(json.dumps(data, indent=2))
# Subscribe to all cell statuses (wildcard for any cell)
await nc.subscribe("nova.v2.cells.*.status", cb=multi_handler)
# Subscribe to all controller states in a specific cell
# The * wildcard matches any controller name
cell_name = "cell"
await nc.subscribe(
f"nova.v2.cells.{cell_name}.controllers.*.state",
cb=multi_handler
)
# Subscribe to all system events
await nc.subscribe("nova.v2.events.>", cb=multi_handler)
print("Listening for multiple subjects...")
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
pass
finally:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Using JetStream for persistence
Access persisting state and events using JetStream:
import asyncio
import nats
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
# Create JetStream context
js = nc.jetstream()
# Pull the latest system state
try:
# Create a pull consumer
psub = await js.pull_subscribe(
"nova.v2.system.status",
stream="system-state"
)
# Fetch the latest message
msgs = await psub.fetch(1, timeout=2.0)
for msg in msgs:
print("Latest system status:")
print(msg.data.decode())
await msg.ack()
except Exception as e:
print(f"Error accessing JetStream: {e}")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Monitoring system events
Track system update lifecycle events:
import asyncio
import nats
import json
async def main():
nc = await nats.connect("ws://your-nova-ip:80/api/nats")
async def event_handler(msg):
event = json.loads(msg.data)
# CloudEvents v1.0 structure
print(f"Event Type: {event['type']}")
print(f"Event ID: {event['id']}")
print(f"Source: {event['source']}")
print(f"Time: {event['time']}")
# Event-specific data
if 'data' in event:
data = event['data']
if event['type'] == 'nova.v2.events.system.update.started':
print(f"Update started: {data['oldVersion']} -> {data['newVersion']}")
elif event['type'] == 'nova.v2.events.system.update.completed':
if 'error' in data:
print(f"Update failed: {data['error']}")
else:
print(f"Update completed: {data['newVersion']}")
# Subscribe to update events
await nc.subscribe(
"nova.v2.events.system.update.started",
cb=event_handler
)
await nc.subscribe(
"nova.v2.events.system.update.completed",
cb=event_handler
)
print("Monitoring system events...")
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
pass
finally:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Advanced: Connection with error handling
It provides production-ready connection with reconnection logic, error handling, and authentication:
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError
import os
async def main():
async def disconnected_cb():
print("Got disconnected from NATS!")
async def reconnected_cb():
print("Reconnected to NATS")
async def error_cb(e):
print(f"NATS error: {e}")
async def closed_cb():
print("Connection to NATS is closed")
# Prepare connection parameters
nats_url = "ws://your-nova-ip:80/api/nats"
token = os.getenv("NATS_TOKEN") # Get token from environment variable
connect_kwargs = {
"servers": [nats_url],
"reconnected_cb": reconnected_cb,
"disconnected_cb": disconnected_cb,
"error_cb": error_cb,
"closed_cb": closed_cb,
"max_reconnect_attempts": 10,
"reconnect_time_wait": 2,
}
# Add token if available
if token:
connect_kwargs["token"] = token
print("Connecting with token authentication...")
# Connect with all settings
nc = await nats.connect(**connect_kwargs)
print(f"Connected to {nc.connected_url.netloc}")
async def message_handler(msg):
try:
# Process message
print(f"Received on {msg.subject}: {msg.data.decode()}")
except Exception as e:
print(f"Error processing message: {e}")
# Subscribe with error handling
cell_name = "cell"
controller_name = "ur5-robot" # Your robot controller name from Setup
subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.state"
try:
await nc.subscribe(subject, cb=message_handler)
# Keep connection alive
await asyncio.Event().wait()
except KeyboardInterrupt:
print("Shutting down...")
except ConnectionClosedError:
print("Connection closed unexpectedly")
finally:
if nc.is_connected:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Variable meanings
{cell}
: Your cell identifier (typically"cell"
unless you have multiple cells){controller}
: The robot controller name you created in the NOVA Setup app (e.g.,"ur5-robot"
,"abb-irb1600"
,"fanuc-m10id12"
){motion-group}
: The motion group name within the controller (e.g.,"manipulator"
,"robot"
)
Important: The {controller}
is the actual name you assigned to your robot in the NOVA Setup app. Find this name in Setup under your cell’s robot controllers list.
How to find identifiers
- Open the NOVA Setup app
- Navigate to your cell
- Look at the robot controllers section
- The name shown there is your
{controller}
value to use in subjects
For example, if your Setup shows a robot named fanuc-m10id12
, use:
subject = "nova.v2.cells.cell.controllers.fanuc-m10id12.state"
Next steps
- Explore the NATS Python SDK documentation for advanced features
- Check the AsyncAPI specification for complete message schemas
- Review the NOVA API documentation for REST API integration
Need help? The NATS community provides extensive examples in the nats.py repository .