Skip to content

Event Callbacks

The EventManager enables asynchronous target state notifications from OpenOCD. When enabled, OpenOCD pushes messages over a dedicated TCP connection whenever the target halts, resumes, resets, or a GDB client attaches/detaches. You register callbacks to react to these events without polling.

The Session class also provides convenience shortcuts (on_halt, on_reset) for the most common cases.

OpenOCD’s TCL RPC notification system uses a separate TCP connection from the command channel. This prevents notifications from interleaving with command responses on the same stream.

When you enable notifications:

  1. A second TCP connection opens to the same OpenOCD host and port
  2. The command tcl_notifications on is sent on this second connection
  3. A background asyncio task reads notification messages from this dedicated socket
  4. Incoming messages are dispatched to registered callbacks

The command connection remains unaffected — you can send commands and receive notifications simultaneously without race conditions.

The simplest way to react to events is through the Session.on_halt() and Session.on_reset() methods. These register notification callbacks that filter for specific keywords in the message.

import asyncio
from openocd import Session
async def main():
async with await Session.connect() as session:
# Register event handlers
session.on_halt(lambda msg: print(f"Target halted: {msg}"))
session.on_reset(lambda msg: print(f"Target reset: {msg}"))
# Resume the target and wait for it to hit a breakpoint
await session.breakpoints.add(0x0800_1234, hw=True)
await session.target.resume()
# Give it time to trigger
await asyncio.sleep(2.0)
asyncio.run(main())

For finer control, use the EventManager class. It supports multiple event types and allows registering and unregistering individual callbacks.

enable() opens the notification socket and starts the background listener. Call this before registering callbacks.

from openocd.events import EventManager
async with await Session.connect() as session:
events = EventManager(session._conn)
await events.enable()
print(f"Notifications enabled: {events.enabled}")

on(event_type, callback) registers a callback for a specific event. Matching is case-insensitive substring: a notification containing “halted” anywhere in its text triggers all callbacks registered for the "halted" event type.

from openocd.events import (
EventManager,
EVENT_HALTED,
EVENT_RESUMED,
EVENT_RESET,
EVENT_GDB_ATTACHED,
EVENT_GDB_DETACHED,
)
def on_halted(msg: str) -> None:
print(f"[HALT] {msg}")
def on_resumed(msg: str) -> None:
print(f"[RESUME] {msg}")
events = EventManager(session._conn)
await events.enable()
events.on(EVENT_HALTED, on_halted)
events.on(EVENT_RESUMED, on_resumed)
events.on(EVENT_RESET, lambda msg: print(f"[RESET] {msg}"))
events.on(EVENT_GDB_ATTACHED, lambda msg: print(f"[GDB+] {msg}"))
events.on(EVENT_GDB_DETACHED, lambda msg: print(f"[GDB-] {msg}"))

off(event_type, callback) removes a previously registered callback. If the callback was not registered, off() silently does nothing.

events.off(EVENT_HALTED, on_halted)

The events module defines constants for known notification types:

ConstantString ValueTrigger
EVENT_HALTED"halted"Target entered halted state
EVENT_RESUMED"resumed"Target resumed execution
EVENT_RESET"reset"Target was reset
EVENT_GDB_ATTACHED"gdb-attached"GDB client connected
EVENT_GDB_DETACHED"gdb-detached"GDB client disconnected

These are not an exhaustive list — OpenOCD may emit other notification strings depending on configuration. You can register callbacks for any substring pattern.

import asyncio
from openocd import Session
from openocd.events import EventManager, EVENT_HALTED, EVENT_RESUMED
halt_count = 0
def count_halts(msg: str) -> None:
global halt_count
halt_count += 1
print(f"Halt #{halt_count}: {msg}")
async def main():
async with await Session.connect() as session:
# Set up event monitoring
events = EventManager(session._conn)
await events.enable()
events.on(EVENT_HALTED, count_halts)
events.on(EVENT_RESUMED, lambda m: print(f"Resumed: {m}"))
# Set a breakpoint and let the target run
await session.breakpoints.add(0x0800_1234, hw=True)
await session.target.resume()
# Monitor for 5 seconds
await asyncio.sleep(5.0)
print(f"Total halts observed: {halt_count}")
asyncio.run(main())

Callbacks run synchronously within the notification reader’s asyncio task. Keep these points in mind:

  • Callbacks receive the full notification message string as their single argument.
  • Callbacks should be fast and non-blocking. Dispatch long-running work to a separate task.
  • Exceptions in callbacks are caught and logged — they do not crash the notification loop.
  • Multiple callbacks for the same event type are called in registration order.
  • The same callback function will not be registered twice for the same event type.
# Fast callback -- good
def on_halt(msg: str) -> None:
print(f"Halted: {msg}")
# Dispatching slow work -- good
def on_halt_with_work(msg: str) -> None:
asyncio.create_task(analyze_state(msg))
MemberTypeDescription
enable()asyncSend tcl_notifications on, open notification socket
on(event_type, callback)syncRegister a callback (case-insensitive substring match)
off(event_type, callback)syncUnregister a callback
enabled (property)boolWhether notifications are active
MethodTrigger KeywordDescription
session.on_halt(callback)"halted"Register a halt callback on the connection
session.on_reset(callback)"reset"Register a reset callback on the connection

These shortcuts register directly on the connection’s notification handler and work whenever the notification listener is active.