Interrupts DB
1. Introduction
The Interrupts DB provides a structured backend service for storing and querying two types of objects critical to coordinating distributed interrupt handling:
InterruptMessage
: Represents an interrupt sent in the system, with metadata such as TTL, type, and routing policy.InterruptWaiter
: Tracks which subjects are waiting for responses to a given interrupt message and their responses.
This database interface is backed by TimescaleDB and exposes a Flask-based REST API for interacting with these objects programmatically.
Architecture
The Interrupts DB System is a distributed event-coordination and tracking service responsible for managing asynchronous interrupt messages and their acknowledgments in multi-agent environments. It ensures reliable persistence, policy-driven forwarding, and acknowledgment routing of interrupt and feedback messages between agents or services.
This system integrates a TimescaleDB backend, Flask REST and GraphQL query layers, and a NATS-based messaging fabric, enabling real-time interrupt dispatch, TTL-based expiry, and multi-target response tracking with policy enforcement.
1. Persistence and TTL Lifecycle Engine
This subsystem manages the creation, querying, and expiration of InterruptMessage
and InterruptWaiter
entries stored in TimescaleDB. It ensures time-sensitive delivery and tracks all response states.
InterruptMessage Table
Stores metadata about each interrupt including its ID, type, TTL, status, and forwarding policy. TTL is used for auto-expiry and to enforce time-bounded coordination.
InterruptWaiter Table
Tracks which subjects are expected to respond to an interrupt and maintains a mapping of received responses. This enables downstream fulfillment and resolution checks.
TTL Checker / Expiry Event Engine
Periodically scans for expired messages. When TTL elapses:
- Updates the message status to
"expired"
- Emits expiry notifications to both source and target subjects
DB Client API
Provides internal access for creating messages and waiters, updating responses, and checking completion criteria. It’s the primary interface used by all messaging and query layers.
DB Query Module
Exposes REST and GraphQL endpoints to query interrupt messages and waiters based on attributes such as type, subject, or status.
2. Messaging and ACK Coordination Engine
This subsystem handles interrupt emission, feedback acknowledgment, success routing, and policy validation using a NATS-based communication layer.
Interrupt/Feedback ACK Events Listener
A persistent NATS subscription that listens on the interrupt.ack
topic. When an ACK is received, it:
- Validates the message ID and subject
- Stores the response in the corresponding
InterruptWaiter
- Checks if forwarding policy criteria have been fulfilled
Interrupt/Feedback Forwarding Policy Checker
Evaluates the message’s forwarding_policy
(e.g., broadcast
, all
, any
). Once conditions are met:
- A success ACK is sent back to the source subject
- The
InterruptMessage
status is updated - Additional hooks (e.g., constraint enforcement) may be triggered
Interrupt/Feedback ACK Handler
Verifies if the source subject should be notified based on partial or complete responses. Delegates event emission to the forwarder or expiry handler.
Expiry Handler
When TTL expires before the forwarding policy is satisfied, this module:
- Marks the message as
"expired"
in the DB - Sends expiry notifications to all relevant subjects via NATS
Interrupts Feedback Forwarder
Publishes success ACKs and feedback payloads to the source subject once the forwarding policy is fulfilled. Acts as the dispatch point for final delivery.
3. Constraint and Validation Layer
This optional layer integrates with the broader constraint checker architecture to validate content of responses or enforce rejection rules before forwarding an ACK.
Constraint Checker Integration
Intercepts interrupt/feedback responses before ACK delivery. If the response violates DSL-defined constraints:
- Sends a reject ACK to the source subject
- Flags the interrupt as failed or unresolved
4. Public Query Interfaces
This subsystem allows agents and administrators to query the state of interrupts, waiters, and acknowledgments in both REST and GraphQL formats.
Query REST API
Exposes endpoints for searching by:
message_id
status
source_subject_id
type
,ttl
,creation_ts
Query GraphQL Middleware
Enables expressive filtering of interrupt and waiter entities using GraphQL schema. Supports nested search, partial field selection, and projection-based queries.
Schema
InterruptMessage
@dataclass
class InterruptMessage:
message_id: str
type: str
internal_data: Dict[str, Any]
status: str
ttl: int
creation_ts: int
forwarding_policy: str
Field | Type | Description |
---|---|---|
message_id |
str |
Unique ID of the interrupt message |
type |
str |
Type or category of the interrupt |
internal_data |
JSON |
Arbitrary structured data used by the system internally |
status |
str |
Status of the message (e.g., pending , resolved , expired ) |
ttl |
int |
Time to live in seconds |
creation_ts |
int |
Unix timestamp of when the message was created |
forwarding_policy |
str |
Policy describing how to forward the interrupt (e.g., broadcast , direct ) |
InterruptWaiter
@dataclass
class InterruptWaiter:
message_id: str
source_subject_id: str
destination_subject_ids: List[str]
responses: Dict[str, Any]
Field | Type | Description |
---|---|---|
message_id |
str |
ID of the interrupt message this waiter belongs to |
source_subject_id |
str |
ID of the subject that initiated the interrupt |
destination_subject_ids |
List[str] |
List of subject IDs expected to respond to this interrupt |
responses |
JSON |
Responses from the destination subjects mapped by subject ID |
APIs Documentation with cURL Examples
Insert InterruptMessage
POST /interrupt/message
curl -X POST http://localhost:8000/interrupt/message \
-H "Content-Type: application/json" \
-d '{
"message_id": "msg123",
"type": "shutdown",
"internal_data": {"reason": "maintenance"},
"status": "pending",
"ttl": 60,
"creation_ts": 1717234567,
"forwarding_policy": "broadcast"
}'
Update InterruptMessage
PUT /interrupt/message/\:message_id
curl -X PUT http://localhost:8000/interrupt/message/msg123 \
-H "Content-Type: application/json" \
-d '{
"status": "resolved",
"ttl": 120
}'
Delete InterruptMessage
DELETE /interrupt/message/\:message_id
curl -X DELETE http://localhost:8000/interrupt/message/msg123
Get InterruptMessage by ID
GET /interrupt/message/\:message_id
curl -X GET http://localhost:8000/interrupt/message/msg123
Query InterruptMessages
POST /interrupt/message/query
curl -X POST http://localhost:8000/interrupt/message/query \
-H "Content-Type: application/json" \
-d '{"status": "pending"}'
Insert InterruptWaiter
POST /interrupt/waiter
curl -X POST http://localhost:8000/interrupt/waiter \
-H "Content-Type: application/json" \
-d '{
"message_id": "msg123",
"source_subject_id": "subjectA",
"destination_subject_ids": ["subjectB", "subjectC"],
"responses": {}
}'
Update InterruptWaiter
PUT /interrupt/waiter/\:message_id
curl -X PUT http://localhost:8000/interrupt/waiter/msg123 \
-H "Content-Type: application/json" \
-d '{
"responses": {
"subjectB": {"ack": true}
}
}'
Delete InterruptWaiter
DELETE /interrupt/waiter/\:message_id
curl -X DELETE http://localhost:8000/interrupt/waiter/msg123
Get InterruptWaiter by ID
GET /interrupt/waiter/\:message_id
curl -X GET http://localhost:8000/interrupt/waiter/msg123
Query InterruptWaiters
POST /interrupt/waiter/query
curl -X POST http://localhost:8000/interrupt/waiter/query \
-H "Content-Type: application/json" \
-d '{"source_subject_id": "subjectA"}'
Certainly — here’s the documentation continuing directly from Section 4:
Creating a Message (Interrupt Registration)
To initiate a new interrupt, use the send_interrupt()
method from interrupt_emitter.py
. This registers the message in the database and publishes it to all designated target subjects via NATS.
Python Example
from interrupt_emitter import send_interrupt
send_interrupt(
message_id="msg456",
type_="reboot-request",
source_subject_id="controller-A",
destination_subject_ids=["agent-1", "agent-2"],
internal_data={"reason": "config mismatch"},
ttl=120,
forwarding_policy="all"
)
This will:
- Save the
InterruptMessage
and correspondingInterruptWaiter
to TimescaleDB. - Send NATS messages to subjects:
interrupt.agent-1
andinterrupt.agent-2
.
Flow Explanation
The full interrupt lifecycle works as follows:
-
Message Registration The source (e.g.,
controller-A
) callssend_interrupt()
with the message ID, target subjects, type, and payload. -
Persistence and Dispatch
-
Message is saved in the
interrupt_messages
table. - A
waiter
entry is created to track which subjects are expected to respond. -
NATS messages are dispatched to each
interrupt.<destination>
subject. -
ACK Listener Handling
-
Subjects send responses to the
interrupt.ack
topic. -
The listener updates the
InterruptWaiter.responses
map with each subject's response. -
Success Criteria Evaluation
-
When all destination subjects respond, the
forwarding_policy
is checked. -
If satisfied, a
Success ACK
is published back to the source. -
Constraint Enforcement
-
(Optional) Constraint checks can reject messages even if all responses are received.
-
Expiry Logic
-
If the TTL expires before all expected responses arrive:
- The message status is updat# Interrupts DB
Sending a Response (Interrupt ACK Format)
Each responding subject must publish to the interrupt.ack
subject using the format below:
{
"message_id": "msg456",
"subject_id": "agent-1",
"response": {
"accepted": true,
"details": "Reboot scheduled"
}
}
Python Example to Send ACK
import asyncio
import json
from nats.aio.client import Client as NATS
async def send_ack():
nc = NATS()
await nc.connect(servers=["nats://localhost:4222"])
ack_msg = {
"message_id": "msg456",
"subject_id": "agent-1",
"response": {
"accepted": True,
"details": "Reboot scheduled"
}
}
await nc.publish("interrupt.ack", json.dumps(ack_msg).encode())
await nc.drain()
asyncio.run(send_ack())
Receiving a Notification as a Target Subject
All target subjects must subscribe to interrupt.<your_subject_id>
to receive interrupt messages.
Python Example to Listen for Interrupts
import asyncio
from nats.aio.client import Client as NATS
async def listen_interrupts():
nc = NATS()
await nc.connect(servers=["nats://localhost:4222"])
async def callback(msg):
data = msg.data.decode()
print("Received interrupt:", data)
await nc.subscribe("interrupt.agent-1", cb=callback)
print("Listening for interrupts...")
while True:
await asyncio.sleep(1)
asyncio.run(listen_interrupts())
Expiry Notifications
If a message expires before all subjects respond (based on ttl
):
- The system updates the message status to
"expired"
in the database. -
Sends expiry notices to:
-
interrupt.<source_subject_id>
interrupt.<each_target_subject_id>
Example Expiry Notification
{
"message_id": "msg456",
"event": "expired"
}
These notifications allow components to handle stale interrupts proactively.
ed to expired
.
* Expiry notifications are sent to source and target subjects.