Skip to Content
Communication protocols

Setup NATS with Python SDK

This guide shows you how to connect to NOVA’s NATS server using the nats.py  Python SDK and subscribe to real-time data streams from your robot cell.

Prerequisites

  • Python 3.8 or higher
  • Access to a NOVA instance with NATS enabled
  • Basic knowledge of Python async/await

Ensure your network configuration allows NATS communication. NATS typically uses port 4222 for client connections and port 8222 for HTTP monitoring.

Installation

Install NATS Python client

pip install nats-py

For WebSocket support (useful for web applications or restricted networks):

pip install nats-py[aiohttp]

Verify your NATS connection details

NOVA’s NATS server is accessible via WebSocket:

  • WebSocket URL: ws://{controller_ip}:80/api/nats (or wss:// for TLS)
  • Replace {controller_ip} with your NOVA instance IP address
  • Authentication: Bearer token authentication (if required by your NOVA instance)

Check with your system administrator for the exact NATS connection details and authentication requirements for your NOVA installation.

Basic connection

Here’s how to connect to NOVA’s NATS server via WebSocket:

import asyncio import nats async def main(): # Connect to NOVA's NATS server via WebSocket # Replace 'your-nova-ip' with your actual NOVA instance IP nc = await nats.connect("ws://your-nova-ip:80/api/nats") print(f"Connected to NATS at {nc.connected_url.netloc}") # Close connection when done await nc.close() if __name__ == '__main__': asyncio.run(main())

If your NOVA instance requires authentication, use the token-based approach. The token is typically a Bearer token obtained from your authentication system.

Environment variable setup

Set your token as an environment variable:

# Linux/macOS export NOVA_NATS_TOKEN="your-bearer-token" # Windows PowerShell $env:NOVA_NATS_TOKEN="your-bearer-token" # Windows CMD set NOVA_NATS_TOKEN=your-bearer-token

Authentication

NOVA supports token-based authentication for NATS connections. Here’s a practical example of how to manage authentication:

import asyncio import nats import os async def connect_to_nats(nats_url: str, token: str = None): """Connect to NATS with optional token authentication.""" try: print(f"Connecting to NATS broker: {nats_url}") # Prepare connection parameters connect_kwargs = {"servers": [nats_url]} # Use Bearer token authentication if available if token: connect_kwargs["token"] = token print("Using token authentication") # Connect to NATS nc = await nats.connect(**connect_kwargs) print("Successfully connected to NATS broker") return nc except Exception as e: print(f"Failed to connect to NATS: {e}") return None async def main(): nats_url = "ws://your-nova-ip:80/api/nats" token = os.getenv("NOVA_NATS_TOKEN") nc = await connect_to_nats(nats_url, token) if nc: # Use the connection print(f"Connected to {nc.connected_url.netloc}") # Don't forget to close when done await nc.close() if __name__ == '__main__': asyncio.run(main())

Then run your Python script:

python your_script.py

Never hardcode authentication tokens in your source code. Always use environment variables, configuration files, or secure secret management solutions.

Subscribing to cobot controller state

Stream a real-time robot controller state including joint positions, velocities, and operation mode.

The controller in the subject is the robot controller name you created in the NOVA Setup app, not a literal string. Replace it with your actual robot name (e.g., ur5, abb-robot, etc.).

import asyncio import nats import json async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") async def state_handler(msg): # Parse the robot controller state state = json.loads(msg.data) print(f"Controller: {state['controller']}") print(f"Mode: {state['mode']}") print(f"Operation Mode: {state['operation_mode']}") # Access motion group data for mg in state['motion_groups']: print(f"Motion Group: {mg['motion_group']}") print(f"Joint Position: {mg['joint_position']}") print(f"Standstill: {mg['standstill']}") # IMPORTANT: Replace these with your actual identifiers from NOVA Setup cell_name = "cell" # Your cell name (usually "cell") controller_name = "ur5-robot" # Your robot controller name from Setup app # Build the subject dynamically subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.state" await nc.subscribe(subject, cb=state_handler) print(f"Listening for robot controller state updates on: {subject}") # Keep the connection alive try: await asyncio.Event().wait() except KeyboardInterrupt: pass finally: await nc.close() if __name__ == '__main__': asyncio.run(main())

Streaming I/O values

Monitor input and output signals from your robot controller or BUS I/O service:

import asyncio import nats import json async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") async def io_handler(msg): data = json.loads(msg.data) print(f"Timestamp: {data['timestamp']}") print(f"Sequence: {data['sequence_number']}") # Process I/O values for io_value in data['io_values']: io_id = io_value['io'] value = io_value['value'] value_type = io_value['value_type'] print(f" {io_id} ({value_type}): {value}") # Replace with your actual identifiers from NOVA Setup cell_name = "cell" controller_name = "ur5-robot" # Your robot controller name # Subscribe to controller I/O updates subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.ios" await nc.subscribe(subject, cb=io_handler) # Or subscribe to BUS I/O updates # bus_io_subject = f"nova.v2.cells.{cell_name}.bus-ios.ios" # await nc.subscribe(bus_io_subject, cb=io_handler) print("Listening for I/O updates...") try: await asyncio.Event().wait() except KeyboardInterrupt: pass finally: await nc.close() if __name__ == '__main__': asyncio.run(main())

Setting I/O output values

Write to output signals using a request-reply pattern:

import asyncio import nats import json async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") # Replace with your actual identifiers cell_name = "cell" controller_name = "ur5-robot" # Your robot controller name # Prepare I/O values to set io_values = [ { "io": "output1", "value": True, "value_type": "boolean" }, { "io": "output2", "value": "42", "value_type": "integer" } ] # Build the subject for setting I/O subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.ios.set" # Set I/O values (with reply) try: response = await nc.request( subject, json.dumps(io_values).encode(), timeout=2.0 ) # Check for errors in response if response.data: result = json.loads(response.data) if 'message' in result: print(f"Error: {result['message']}") else: print("I/O values set successfully") except asyncio.TimeoutError: print("Request timed out") await nc.close() if __name__ == '__main__': asyncio.run(main())

Subscribing with wildcards

Use wildcards to subscribe to multiple subjects at once:

import asyncio import nats import json async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") async def multi_handler(msg): print(f"Received on {msg.subject}") data = json.loads(msg.data) # Process the message print(json.dumps(data, indent=2)) # Subscribe to all cell statuses (wildcard for any cell) await nc.subscribe("nova.v2.cells.*.status", cb=multi_handler) # Subscribe to all controller states in a specific cell # The * wildcard matches any controller name cell_name = "cell" await nc.subscribe( f"nova.v2.cells.{cell_name}.controllers.*.state", cb=multi_handler ) # Subscribe to all system events await nc.subscribe("nova.v2.events.>", cb=multi_handler) print("Listening for multiple subjects...") try: await asyncio.Event().wait() except KeyboardInterrupt: pass finally: await nc.close() if __name__ == '__main__': asyncio.run(main())

Using JetStream for persistence

Access persisting state and events using JetStream:

import asyncio import nats async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") # Create JetStream context js = nc.jetstream() # Pull the latest system state try: # Create a pull consumer psub = await js.pull_subscribe( "nova.v2.system.status", stream="system-state" ) # Fetch the latest message msgs = await psub.fetch(1, timeout=2.0) for msg in msgs: print("Latest system status:") print(msg.data.decode()) await msg.ack() except Exception as e: print(f"Error accessing JetStream: {e}") await nc.close() if __name__ == '__main__': asyncio.run(main())

Monitoring system events

Track system update lifecycle events:

import asyncio import nats import json async def main(): nc = await nats.connect("ws://your-nova-ip:80/api/nats") async def event_handler(msg): event = json.loads(msg.data) # CloudEvents v1.0 structure print(f"Event Type: {event['type']}") print(f"Event ID: {event['id']}") print(f"Source: {event['source']}") print(f"Time: {event['time']}") # Event-specific data if 'data' in event: data = event['data'] if event['type'] == 'nova.v2.events.system.update.started': print(f"Update started: {data['oldVersion']} -> {data['newVersion']}") elif event['type'] == 'nova.v2.events.system.update.completed': if 'error' in data: print(f"Update failed: {data['error']}") else: print(f"Update completed: {data['newVersion']}") # Subscribe to update events await nc.subscribe( "nova.v2.events.system.update.started", cb=event_handler ) await nc.subscribe( "nova.v2.events.system.update.completed", cb=event_handler ) print("Monitoring system events...") try: await asyncio.Event().wait() except KeyboardInterrupt: pass finally: await nc.close() if __name__ == '__main__': asyncio.run(main())

Advanced: Connection with error handling

It provides production-ready connection with reconnection logic, error handling, and authentication:

import asyncio import nats from nats.errors import ConnectionClosedError, TimeoutError import os async def main(): async def disconnected_cb(): print("Got disconnected from NATS!") async def reconnected_cb(): print("Reconnected to NATS") async def error_cb(e): print(f"NATS error: {e}") async def closed_cb(): print("Connection to NATS is closed") # Prepare connection parameters nats_url = "ws://your-nova-ip:80/api/nats" token = os.getenv("NATS_TOKEN") # Get token from environment variable connect_kwargs = { "servers": [nats_url], "reconnected_cb": reconnected_cb, "disconnected_cb": disconnected_cb, "error_cb": error_cb, "closed_cb": closed_cb, "max_reconnect_attempts": 10, "reconnect_time_wait": 2, } # Add token if available if token: connect_kwargs["token"] = token print("Connecting with token authentication...") # Connect with all settings nc = await nats.connect(**connect_kwargs) print(f"Connected to {nc.connected_url.netloc}") async def message_handler(msg): try: # Process message print(f"Received on {msg.subject}: {msg.data.decode()}") except Exception as e: print(f"Error processing message: {e}") # Subscribe with error handling cell_name = "cell" controller_name = "ur5-robot" # Your robot controller name from Setup subject = f"nova.v2.cells.{cell_name}.controllers.{controller_name}.state" try: await nc.subscribe(subject, cb=message_handler) # Keep connection alive await asyncio.Event().wait() except KeyboardInterrupt: print("Shutting down...") except ConnectionClosedError: print("Connection closed unexpectedly") finally: if nc.is_connected: await nc.close() if __name__ == '__main__': asyncio.run(main())

Variable meanings

  • {cell}: Your cell identifier (typically "cell" unless you have multiple cells)
  • {controller}: The robot controller name you created in the NOVA Setup app (e.g., "ur5-robot", "abb-irb1600", "fanuc-m10id12")
  • {motion-group}: The motion group name within the controller (e.g., "manipulator", "robot")

Important: The {controller} is the actual name you assigned to your robot in the NOVA Setup app. Find this name in Setup under your cell’s robot controllers list.

How to find identifiers

  1. Open the NOVA Setup app
  2. Navigate to your cell
  3. Look at the robot controllers section
  4. The name shown there is your {controller} value to use in subjects

For example, if your Setup shows a robot named fanuc-m10id12, use:

subject = "nova.v2.cells.cell.controllers.fanuc-m10id12.state"

Next steps

Need help? The NATS community provides extensive examples in the nats.py repository .

Last updated on