Observe Trigger and Interrupt library
Introduction
The Observation and Trigger Execution Framework is a modular, event-driven system designed for real-time or periodic decision-making based on external data sources. It enables continuous data observation, inference execution, and trigger resolution, using a combination of:
- API source managers to fetch real-world data,
- DSL workflows to process and infer insights,
- Trigger systems to evaluate outcomes and decide next steps,
- Human intervention hooks for assisted decision making when necessary.
This framework is ideal for use cases such as monitoring external systems, reacting to streamed or polled data, and orchestrating AI workflows where automated decisions may lead to execution of actions, human approvals, or triggering downstream processes.
Key features include:
- Pluggable API sources with credential handling and historical tracking
- Periodic observation loop with stateful/stateless modes
- Asynchronous observation and trigger execution using queues
- DSL-based inference workflows
- Flexible trigger resolution via custom functions or DSL
- Integrated support for human-in-the-loop actions
This documentation provides a complete breakdown of the systemโs architecture, components, and lifecycle, including setup, configuration, and usage patterns.
Architecture
The Observe Trigger and Interrupt Execution System is a distributed, extensible, and event-driven framework designed to continuously monitor external signals (via APIs or event streams), infer actionable insights through DSL workflows, and execute programmable triggers. It is built for reactive decision-making in autonomous or semi-autonomous AI systems, with native support for human-in-the-loop interrupt handling and feedback acknowledgment.
This system integrates multiple subsystems, including a webhook and topic-based observation engine, trigger workflow executor, interrupt emitter, function resolver, and real-time acknowledgment routing. It operates in both periodic and event-driven modes and supports stateful observation patterns across asynchronous queues.
1. Observation and Data Collection Engine
This subsystem manages all data intake for observation workflows, either through scheduled API polling or subscription to message broker events. It maintains webhook definitions, broker topic configurations, and DSL execution threads.
Webhook Sources (Predefined)
Supports a set of standardized webhooks (subject metrics, job status, org data, feedback chat, etc.) used to poll structured data from known REST APIs. These are defined at deployment and registered with the system on startup.
Custom Webhooks Loader
Loads and caches custom webhook functions dynamically from the Functions DB. Functions are stored as JSON definitions and downloaded on demand, supporting dynamic execution of external logic during observation.
Message Topics Configuration
Manages a list of broker topics (NATS, Kafka, etc.) that the system subscribes to for push-based event streams. Topics are associated with MessageTopic
entries in the DB.
Event Listener
A passive listener that connects to the broker and routes topic-based events into the observation system. These events are queued for DSL execution.
Observe DSL Manager
Stores and manages DSL workflow definitions for observation tasks. Each observation DSL can be configured with internal timers or real-time listeners.
DSL Execution Loop
Each observation runs in its own thread and is governed by a timer. DSL workflows are triggered with data from webhook/API sources and push their results to the observation queue.
Webhook Caller
Responsible for invoking APIs, formatting requests, managing authentication, and retrieving data from registered webhook sources.
2. Trigger Evaluation and Interrupt Generator
This subsystem evaluates the output of observation workflows using DSL-based trigger logic and determines appropriate follow-up actions. Actions may include function execution, interrupt generation, or feedback signaling.
Trigger Checker DSL Manager
Stores and manages trigger-check DSLs. Each defines a set of rules to evaluate whether a trigger should fire based on observation outputs.
Trigger Router
Given a DSL output, this module routes the observation to the correct trigger checker DSL based on configuration and routing rules.
Trigger DSL Executor
Executes the associated trigger DSL logic, which may evaluate thresholds, compute results, or call helper functions.
Trigger Initiator
Determines the downstream effect of a trigger:
- Execute a function
- Raise an interrupt
- Emit feedback
- Continue the observation cycle
Function Executor Client
Executes functions retrieved from the Functions DB. If the function is local, it is executed directly; otherwise, a remote invocation is performed.
3. Interrupt and Feedback Processor
This subsystem handles generation, registration, acknowledgment, and routing of interrupts and feedback signals.
Interrupt/Feedback Generator
Creates a structured interrupt or feedback object from a trigger event. These entries are persisted to RQLite and pushed to the message broker.
DB Client
Handles persistence of LocalInterrupt
and LocalFeedback
entries. Writes are performed before events are published.
Ack Listener
Listens for acknowledgment events related to interrupts or feedback. Updates the relevant DB entries once acknowledgments are received.
Interrupt/Feedback Ack Handler
Manages the acknowledgment lifecycle, ensuring all events are tracked and updates are reflected in real time.
Message Broker Client
Publishes interrupts and feedback to the broker for delivery to external systems, agents, or UIs.
4. Configuration and Code Resolution Layer
This subsystem manages DSL definitions, webhook configurations, and dynamic code loading from the central functions registry.
Controller
Primary orchestrator that manages additions, updates, and deletions of DSLs, webhook entries, and topic subscriptions.
Functions DB Client API
Provides access to the function metadata and code used in custom webhooks and triggers.
Functions Code Loader API
Fetches executable function code from the central store and stores it in the local cache.
Service Config Cache
Maintains in-memory configuration state (DSLs, topics, webhooks) for fast access and execution consistency.
5. RQLite-Based State Store
The RQLite database provides persistent storage for all system metadata, including topics, DSLs, webhook configurations, trigger definitions, and generated interrupts/feedback.
Table Name | Description |
---|---|
message_topics_list |
Tracks all active broker topics and their associated services |
observation_dsls |
DSL metadata and configuration for observation workflows |
registered_webhooks |
Webhook to function mappings, including the execution plan |
trigger_checker_workflows |
DSL IDs and associated config for all trigger workflows |
local_interrupts_table |
Stores all raised interrupt messages with status tracking |
local_feedback_table |
Logs feedback signals, their targets, and handler metadata |
6. Internal Event Queues and Execution Flow
The internal routing and queue system ensures that observations and triggers are processed in order, at scale, and with rate control.
Internal Queue System
Each observation thread maintains a queue where observation outputs are placed. Triggers consume from these queues asynchronously.
Event Collector
Injects externally captured events (e.g., NATS topic messages) into observation queues.
Timer Ticker
Each observation thread uses a ticker to govern execution frequency, ensuring that webhooks are polled at defined intervals.
Queue-Based Decoupling
Observation and trigger DSLs communicate via internal queues to isolate evaluation frequency and ensure non-blocking execution.
SDK Structures
This section documents the core data structures used in the Observation and Trigger Execution Framework. These structures are modeled as Python @dataclass
objects and represent persistent entities typically stored in MongoDB. Each data class includes helper methods from_dict()
and to_dict()
to support serialization and deserialization.
1. MessageTopic
@dataclass
class MessageTopic:
subscription_id: str
broker_svc_url: str
message_topic: str
Field | Type | Description |
---|---|---|
subscription_id |
str | Unique ID used to track the subscription. |
broker_svc_url |
str | URL of the message broker service (e.g., NATS, Kafka). |
message_topic |
str | The name of the message topic the system subscribes to. |
2. ObservationDSL
@dataclass
class ObservationDSL:
observation_id: str
dsl_workflow_id: str
workflow_params: Dict[str, Any] = field(default_factory=dict)
workflow_config: Dict[str, Any] = field(default_factory=dict)
webhook_sources_ids: List[str] = field(default_factory=list)
event_subscription_ids: List[str] = field(default_factory=list)
Field | Type | Description |
---|---|---|
observation_id |
str | Unique ID for this observation. |
dsl_workflow_id |
str | ID of the DSL workflow used for observation. |
workflow_params |
dict | Parameters passed to the DSL execution. |
workflow_config |
dict | Additional DSL configuration, e.g., scheduler settings or module config. |
webhook_sources_ids |
List[str] | List of registered webhook source IDs used by the observation. |
event_subscription_ids |
List[str] | List of subscription IDs to event streams (e.g., NATS, Kafka topics). |
3. RegisteredWebhook
@dataclass
class RegisteredWebhook:
webhook_id: str
webhook_function_id: str
function_json: Dict[str, Any]
Field | Type | Description |
---|---|---|
webhook_id |
str | Unique ID for the webhook instance. |
webhook_function_id |
str | ID of the function invoked when the webhook is triggered. |
function_json |
dict | Complete function definition/metadata in JSON format. |
4. TriggerCheckerWorkflow
@dataclass
class TriggerCheckerWorkflow:
trigger_checker_id: str
trigger_checker_dsl_workflow_id: str
trigger_dsl_workflow_config: Dict[str, Any] = field(default_factory=dict)
observation_ids: List[str] = field(default_factory=list)
Field | Type | Description |
---|---|---|
trigger_checker_id |
str | Unique ID identifying the trigger workflow instance. |
trigger_checker_dsl_workflow_id |
str | ID of the DSL workflow responsible for trigger evaluation. |
trigger_dsl_workflow_config |
dict | Configuration settings passed to the trigger DSL workflow. |
observation_ids |
List[str] | References to associated observation workflow IDs. |
5. LocalInterrupt
@dataclass
class LocalInterrupt:
interrupt_id: str
interrupt_topic: str
interrupt_message_data: Dict[str, Any] = field(default_factory=dict)
target_subject_ids: List[str] = field(default_factory=list)
interrupt_ts: str = ""
interrupt_status: str = ""
interrupt_handled_by_function_id: str = ""
Field | Type | Description |
---|---|---|
interrupt_id |
str | Unique ID for this interrupt. |
interrupt_topic |
str | Topic or category under which the interrupt is classified. |
interrupt_message_data |
dict | Actual content/payload of the interrupt message. |
target_subject_ids |
List[str] | Subjects or actors this interrupt is intended for. |
interrupt_ts |
str | Timestamp when the interrupt was raised. |
interrupt_status |
str | Status of the interrupt (e.g., pending, handled, ignored). |
interrupt_handled_by_function_id |
str | ID of the function that handled the interrupt, if applicable. |
6. LocalFeedback
@dataclass
class LocalFeedback:
feedback_id: str
feedback_topic: str
feedback_message_data: Dict[str, Any] = field(default_factory=dict)
target_subject_ids: List[str] = field(default_factory=list)
feedback_ts: str = ""
feedback_status: str = ""
feedback_handled_by_function_id: str = ""
Field | Type | Description |
---|---|---|
feedback_id |
str | Unique ID for the feedback entry. |
feedback_topic |
str | Topic/category of the feedback. |
feedback_message_data |
dict | Payload content for the feedback. |
target_subject_ids |
List[str] | Recipients or entities the feedback applies to. |
feedback_ts |
str | Timestamp of when the feedback was received or generated. |
feedback_status |
str | Status of the feedback (e.g., acknowledged, processed, pending). |
feedback_handled_by_function_id |
str | Function ID that processed this feedback, if any. |
Installation Instructions
The observation_trigger
library provides the foundational components for observation pipelines, trigger execution, and interrupt handling in event-driven systems. To use this package in your own project or environment, follow the steps below.
1. Clone or Download the Repository
If you're working from a local codebase:
git clone <your-repo-url>
cd observation_trigger
2. Install the Package
You can install the package using pip
in editable mode (for development) or standard mode (for usage).
โ Standard Installation
pip install .
This installs the package along with its required dependencies as defined in install_requires
.
Development Installation
If you intend to modify or test the codebase:
pip install -e .[dev]
This includes development dependencies such as pytest
and flake8
.
Project Structure (Key Components)
After installation, the following core modules become available for import:
Module | Description |
---|---|
observation_trigger.dsl_executor |
Executes DSL-based workflows |
observation_trigger.events |
Generates timed events for triggering observations |
observation_trigger.trigger |
Runs the trigger pipeline with DSL inference |
observation_trigger.webhooks_manager |
Manages external API data sources and query logic |
observation_trigger.resolver |
Resolves DSL output into executable actions |
Python Version
The package requires Python 3.7 or above, as specified in python_requires
.
Runtime Dependencies
Package | Version | Purpose |
---|---|---|
requests |
2.31.0 | HTTP API calls |
Flask |
3.0.3 | (Optional) exposing REST/websocket APIs |
websockets |
13.1 | Real-time event communication |
These will be automatically installed via pip install .
.
General Usage
This section explains how to use the observation_trigger
library to initialize and run an observation-trigger execution pipeline. The main entry point for setting up this system is the new_observe_trigger_flow
function, which encapsulates the creation and configuration of both the observation and trigger layers.
Function: new_observe_trigger_flow(...)
def new_observe_trigger_flow(
observation_dsl_id: str,
trigger_dsl_id: str,
stateful=True,
interval=30,
observation_queue_size=100,
init_observation_apis={}
) -> ObserveTrigger
Parameter | Type | Description |
---|---|---|
observation_dsl_id |
str | The DSL ID to be used for periodic observation. |
trigger_dsl_id |
str | The DSL ID responsible for evaluating trigger logic. |
stateful |
bool | If True , uses historical API data in observation (default: True ). |
interval |
int | Interval (in seconds) between observation events (default: 30 ). |
observation_queue_size |
int | Max number of events to buffer between observation and trigger (default: 100 ). |
init_observation_apis |
dict | Configuration for initial API sources (URL, method, headers, etc). |
Returns an instance of ObserveTrigger
, ready for initialization and execution.
Example Usage
from observation_trigger import new_observe_trigger_flow
# Define observation and trigger DSL workflow IDs
observation_id = "observe_temperature_sensors"
trigger_id = "check_for_temperature_alerts"
# Define initial API sources to be queried by observation workflow
api_config = {
"temperature_sensor_api": {
"full_url": "http://iot-sensors.local/api/temperature",
"method": "GET",
"headers": {"Authorization": "Bearer abc123"},
"params": {"location": "warehouse-7"},
"history_size": 20
}
}
# Create the ObserveTrigger instance
observe_trigger = new_observe_trigger_flow(
observation_dsl_id=observation_id,
trigger_dsl_id=trigger_id,
interval=15,
stateful=True,
init_observation_apis=api_config
)
# Initialize the DSL executor
observe_trigger.init_observation()
# Start observation and trigger loop in background threads
observe_trigger.start_observation()
observe_trigger.start_trigger()
# Continuously listen for trigger outputs
for trigger_result in observe_trigger.listen_for_trigger_results():
print(f"[TRIGGER] {trigger_result}")
Execution Flow
-
API Configuration You define one or more external API endpoints used to fetch real-time data (e.g., sensor metrics, prices, system health, etc.).
-
Observation Initialization The observation DSL executor is initialized with the defined workflow ID and configured API sources.
-
Observation Loop (Threaded) Every
interval
seconds, API sources are queried. The result is passed into the observation DSL workflow, and the output is pushed to a queue. -
Trigger Execution (Threaded) The trigger DSL consumes this observation queue, applies its logic, and produces results.
-
Trigger Result Handling Results can be received via a streaming interface (
listen_for_trigger_results()
) and routed to other systems or resolvers (e.g., function calls, human approvals, etc.).
Registering Webhooks
The observation_trigger
system supports integration with external APIs via a webhook-based mechanism, enabling real-time or scheduled data fetching from HTTP endpoints. Webhooks are managed through the APISourcesManager
class, which allows you to dynamically register, configure, and query remote APIs.
What is a Webhook in This Context?
In this framework, a "webhook" is an HTTP-based API endpoint that is periodically queried or called as part of an observation workflow. These endpoints return data that becomes input to a DSL workflow. You can register multiple such sources and fetch them individually or collectively.
Step 1: Define a Webhook
Each webhook is defined using the following parameters:
Parameter | Type | Description |
---|---|---|
name |
str | Unique name for the webhook. Used to reference the source. |
full_url |
str | Complete URL of the external API. |
method |
str | HTTP method (GET or POST). |
headers |
dict | Optional request headers (e.g., auth token, content-type). |
credential_callback |
func | Optional function to dynamically inject auth headers. |
history_size |
int | Number of historical responses to retain (sliding window). |
params |
dict | Optional query params for GET requests. |
data |
dict | Optional body for POST requests. |
Step 2: Registering a Webhook via APISourcesManager
from observation_trigger.webhooks_manager import APISourcesManager
# Initialize manager
api_manager = APISourcesManager()
# Register a webhook
api_manager.register_api(
name="weather_data",
full_url="https://api.weather.com/v3/wx/conditions/current",
method="GET",
headers={"Authorization": "Bearer YOUR_TOKEN"},
history_size=10,
params={"geocode": "37.7749,-122.4194", "format": "json"}
)
Step 3: Query a Webhook
# Query the webhook
response = api_manager.query_api("weather_data")
# Query and include history
response, history = api_manager.query_api_with_history("weather_data")
Step 4: Use with Observation Flow
Once the webhook(s) are registered, you can pass the configuration to the observation system:
webhook_config = api_manager.get_config()
observe_trigger = new_observe_trigger_flow(
observation_dsl_id="weather_observer",
trigger_dsl_id="weather_alert_checker",
interval=60,
init_observation_apis=webhook_config
)
observe_trigger.init_observation()
observe_trigger.start_observation()
observe_trigger.start_trigger()
Loading Webhooks from Configuration
You can also define all your API sources in a dictionary and load them at once:
config = {
"sensor_api": {
"full_url": "http://sensors.local/api/metrics",
"method": "GET",
"headers": {"Authorization": "Bearer xyz"},
"history_size": 20,
"params": {"zone": "A1"},
}
}
api_manager.load_from_config(config)
Factory Functions and Database Managers
The observation_trigger
framework provides several MongoDB-backed database managers, each responsible for managing a specific type of document in the system (e.g., observation definitions, triggers, feedback). These managers are instantiated using factory functions that abstract away the initialization logic.
Each manager includes standard CRUD methods:
insert()
update()
delete()
query()
get_by_id()
These allow you to persist, modify, and retrieve entries using the associated @dataclass
schema.
Factory Functions and Their Responsibilities
Factory Function | Returns | Purpose |
---|---|---|
new_message_topic_manager() |
MessageTopicDatabase |
Manages message topic subscriptions (e.g., broker topic metadata). |
new_observation_dsl_manager() |
ObservationDSLDatabase |
Manages observation DSL workflows and API source references. |
new_registered_webhook_manager() |
RegisteredWebhookDatabase |
Manages webhook-function mappings used in response triggers. |
new_trigger_checker_workflow_manager() |
TriggerCheckerWorkflowDatabase |
Manages trigger evaluation workflows. |
new_local_interrupt_manager() |
LocalInterruptDatabase |
Stores and manages interrupt signals and metadata. |
new_local_feedback_manager() |
LocalFeedbackDatabase |
Stores and manages feedback records from users/systems. |
Methods Provided by Each Manager
1. insert(obj: Dataclass) -> Tuple[bool, Union[str, None]]
Description |
---|
Inserts the object into the database. |
Returns a tuple with status and inserted ID or error string. |
2. update(id: str, update_fields: Dict) -> Tuple[bool, Union[int, str]]
Description |
---|
Updates the document with given ID using the provided fields. |
Returns status and number of modified documents or error. |
3. delete(id: str) -> Tuple[bool, Union[int, str]]
Description |
---|
Deletes the document matching the given ID. |
Returns status and number of deletions or error. |
4. query(filter: Dict) -> Tuple[bool, Union[List[Dict], str]]
Description |
---|
Queries documents using the given MongoDB-style filter. |
Returns list of matching documents or error message. |
5. get_by_id(id: str) -> Tuple[bool, Union[Dataclass, str]]
Description |
---|
Retrieves a single document using the primary key. |
Returns deserialized object or error message. |
Example Usage
from observation_trigger.factories import new_observation_dsl_manager
obs_db = new_observation_dsl_manager()
# Insert
success, result = obs_db.insert(ObservationDSL(
observation_id="obs_001",
dsl_workflow_id="observe_temp_dsl",
webhook_sources_ids=["weather_api"],
event_subscription_ids=[]
))
# Query
success, results = obs_db.query({"dsl_workflow_id": "observe_temp_dsl"})