Skip to content

Session Lifecycle

Session is the single entry point to openocd-python. It manages the TCP connection to OpenOCD, optionally manages an OpenOCD subprocess, and provides lazy access to every subsystem (target, memory, registers, flash, JTAG, breakpoints, RTT, SVD, transport, and events).

There are two factory methods, each with an async and a sync variant:

MethodReturnsPurpose
Session.connect()SessionConnect to an already-running OpenOCD
Session.start()SessionSpawn an OpenOCD process, then connect
Session.connect_sync()SyncSessionSync wrapper around connect()
Session.start_sync()SyncSessionSync wrapper around start()

connect() — attach to a running instance

Section titled “connect() — attach to a running instance”
@classmethod
async def connect(
cls,
host: str = "localhost",
port: int = 6666,
timeout: float = 10.0,
) -> Session

Creates a TclRpcConnection, opens a TCP socket to the given host and port, and returns a Session. The timeout applies to the initial TCP connection attempt.

import asyncio
from openocd import Session
async def main():
async with Session.connect(host="localhost", port=6666) as ocd:
print(await ocd.command("version"))
asyncio.run(main())
@classmethod
async def start(
cls,
config: str | Path,
*,
tcl_port: int = 6666,
openocd_bin: str | None = None,
timeout: float = 10.0,
extra_args: list[str] | None = None,
) -> Session

This method:

  1. Creates an OpenOCDProcess instance
  2. Spawns OpenOCD with the given config and port settings
  3. Polls the TCL RPC port until it accepts connections (or the timeout expires)
  4. Opens a TclRpcConnection to the now-ready port
  5. Returns a Session that owns both the connection and the process

If the TCP connection fails after OpenOCD starts, the process is automatically stopped before the exception propagates.

import asyncio
from openocd import Session
async def main():
async with Session.start(
"-f interface/cmsis-dap.cfg -f target/stm32f1x.cfg",
tcl_port=6666,
timeout=15.0,
extra_args=["-d2"],
) as ocd:
state = await ocd.target.state()
print(state.name, state.state)
asyncio.run(main())

Session implements __aenter__ / __aexit__ (and SyncSession implements __enter__ / __exit__). When the context manager exits, close() is called, which:

  1. Closes the TCP connection to OpenOCD
  2. If this session spawned an OpenOCD process, terminates it (sends SIGTERM, waits up to 5 seconds, then sends SIGKILL if needed)
async with Session.start(config) as ocd:
await ocd.target.halt()
# At this point:
# - TCP connection is closed
# - OpenOCD process has been terminated

For manual lifecycle management without a context manager:

ocd = await Session.connect()
try:
await ocd.target.state()
finally:
await ocd.close()

Session exposes ten subsystem properties. Each is created on first access — not at connection time. This means connecting to OpenOCD is fast and you only pay the cost of subsystems you actually use.

PropertyTypePurpose
targetTargetHalt, resume, step, reset, state queries
memoryMemoryRead/write memory at various widths
registersRegistersCPU register read/write
flashFlashFlash programming, erase, verify
jtagJTAGControllerJTAG chain scanning, TAP state control
breakpointsBreakpointManagerBreakpoint and watchpoint management
rttRTTManagerReal-Time Transfer communication
svdSVDManagerSVD-based peripheral register decoding
transportTransportTransport selection and adapter configuration

Each subsystem holds a reference to the shared TclRpcConnection. The SVDManager is special — it also receives a reference to the Memory subsystem so it can read hardware registers.

async with Session.connect() as ocd:
# No subsystems created yet
state = await ocd.target.state()
# Target subsystem now exists
pc = await ocd.registers.pc()
# Registers subsystem now exists
# Memory, flash, jtag, etc. are still None internally

The SyncSession wrapper mirrors this pattern. Each sync property creates the corresponding Sync* wrapper on first access:

with Session.connect_sync() as ocd:
# ocd is a SyncSession
state = ocd.target.state() # Creates SyncTarget wrapping Target
pc = ocd.registers.pc() # Creates SyncRegisters wrapping Registers

Both Session and SyncSession expose a command() method for sending arbitrary OpenOCD TCL commands:

async def command(self, cmd: str) -> str

This sends the command string over the TCL RPC connection and returns the raw response. Use it when you need functionality not covered by the typed subsystems.

async with Session.connect() as ocd:
# Get OpenOCD version
version = await ocd.command("version")
# Set adapter speed directly
await ocd.command("adapter speed 8000")
# Run arbitrary TCL
await ocd.command("set x [expr {1 + 2}]")

When using Session.start(), the library creates an OpenOCDProcess that manages the subprocess.

PropertyTypeDescription
pidint | NoneProcess ID of the OpenOCD subprocess
runningboolWhether the process is still alive
tcl_portintThe TCL RPC port this process was started on

If openocd_bin is not provided, the library uses shutil.which("openocd") to find OpenOCD on the system PATH. If not found, a ProcessError is raised before any process is spawned.

The config parameter accepts several formats. The process builder parses the string and wraps bare filenames with -f flags:

# These are equivalent:
await Session.start("interface/cmsis-dap.cfg")
await Session.start("-f interface/cmsis-dap.cfg")
# Multiple configs:
await Session.start("-f interface/cmsis-dap.cfg -f target/stm32f1x.cfg")
# Inline commands:
await Session.start("-f interface/cmsis-dap.cfg -c 'adapter speed 4000'")

The TCL port flag (-c "tcl_port 6666") is always appended automatically based on the tcl_port parameter.

After spawning the process, wait_ready() polls the TCL RPC port at 250ms intervals until a TCP connection succeeds or the timeout expires. If the process exits before becoming ready, a ProcessError is raised with the last 500 bytes of stderr output.

stop() terminates the process gracefully:

  1. Send SIGTERM
  2. Wait up to 5 seconds for the process to exit
  3. If still alive, send SIGKILL and wait

Session provides shortcut methods for registering callbacks on common target events:

async with Session.connect() as ocd:
ocd.on_halt(lambda msg: print(f"Target halted: {msg}"))
ocd.on_reset(lambda msg: print(f"Target reset: {msg}"))
# Events are delivered on the notification socket
# Enable notifications to start receiving them
await ocd._conn.enable_notifications()

These callbacks filter the raw notification stream by keyword (“halted” or “reset” respectively). The notifications arrive on a separate TCP connection to prevent them from interleaving with command responses.