Skip to content

Observe Trigger and Interrupt library

Introduction

The Observation and Trigger Execution Framework is a modular, event-driven system designed for real-time or periodic decision-making based on external data sources. It enables continuous data observation, inference execution, and trigger resolution, using a combination of:

  • API source managers to fetch real-world data,
  • DSL workflows to process and infer insights,
  • Trigger systems to evaluate outcomes and decide next steps,
  • Human intervention hooks for assisted decision making when necessary.

This framework is ideal for use cases such as monitoring external systems, reacting to streamed or polled data, and orchestrating AI workflows where automated decisions may lead to execution of actions, human approvals, or triggering downstream processes.

Key features include:

  • Pluggable API sources with credential handling and historical tracking
  • Periodic observation loop with stateful/stateless modes
  • Asynchronous observation and trigger execution using queues
  • DSL-based inference workflows
  • Flexible trigger resolution via custom functions or DSL
  • Integrated support for human-in-the-loop actions

This documentation provides a complete breakdown of the systemโ€™s architecture, components, and lifecycle, including setup, configuration, and usage patterns.


Architecture

The Observe Trigger and Interrupt Execution System is a distributed, extensible, and event-driven framework designed to continuously monitor external signals (via APIs or event streams), infer actionable insights through DSL workflows, and execute programmable triggers. It is built for reactive decision-making in autonomous or semi-autonomous AI systems, with native support for human-in-the-loop interrupt handling and feedback acknowledgment.

This system integrates multiple subsystems, including a webhook and topic-based observation engine, trigger workflow executor, interrupt emitter, function resolver, and real-time acknowledgment routing. It operates in both periodic and event-driven modes and supports stateful observation patterns across asynchronous queues.

Download Image


1. Observation and Data Collection Engine

This subsystem manages all data intake for observation workflows, either through scheduled API polling or subscription to message broker events. It maintains webhook definitions, broker topic configurations, and DSL execution threads.

Webhook Sources (Predefined)

Supports a set of standardized webhooks (subject metrics, job status, org data, feedback chat, etc.) used to poll structured data from known REST APIs. These are defined at deployment and registered with the system on startup.

Custom Webhooks Loader

Loads and caches custom webhook functions dynamically from the Functions DB. Functions are stored as JSON definitions and downloaded on demand, supporting dynamic execution of external logic during observation.

Message Topics Configuration

Manages a list of broker topics (NATS, Kafka, etc.) that the system subscribes to for push-based event streams. Topics are associated with MessageTopic entries in the DB.

Event Listener

A passive listener that connects to the broker and routes topic-based events into the observation system. These events are queued for DSL execution.

Observe DSL Manager

Stores and manages DSL workflow definitions for observation tasks. Each observation DSL can be configured with internal timers or real-time listeners.

DSL Execution Loop

Each observation runs in its own thread and is governed by a timer. DSL workflows are triggered with data from webhook/API sources and push their results to the observation queue.

Webhook Caller

Responsible for invoking APIs, formatting requests, managing authentication, and retrieving data from registered webhook sources.


2. Trigger Evaluation and Interrupt Generator

This subsystem evaluates the output of observation workflows using DSL-based trigger logic and determines appropriate follow-up actions. Actions may include function execution, interrupt generation, or feedback signaling.

Trigger Checker DSL Manager

Stores and manages trigger-check DSLs. Each defines a set of rules to evaluate whether a trigger should fire based on observation outputs.

Trigger Router

Given a DSL output, this module routes the observation to the correct trigger checker DSL based on configuration and routing rules.

Trigger DSL Executor

Executes the associated trigger DSL logic, which may evaluate thresholds, compute results, or call helper functions.

Trigger Initiator

Determines the downstream effect of a trigger:

  • Execute a function
  • Raise an interrupt
  • Emit feedback
  • Continue the observation cycle

Function Executor Client

Executes functions retrieved from the Functions DB. If the function is local, it is executed directly; otherwise, a remote invocation is performed.


3. Interrupt and Feedback Processor

This subsystem handles generation, registration, acknowledgment, and routing of interrupts and feedback signals.

Interrupt/Feedback Generator

Creates a structured interrupt or feedback object from a trigger event. These entries are persisted to RQLite and pushed to the message broker.

DB Client

Handles persistence of LocalInterrupt and LocalFeedback entries. Writes are performed before events are published.

Ack Listener

Listens for acknowledgment events related to interrupts or feedback. Updates the relevant DB entries once acknowledgments are received.

Interrupt/Feedback Ack Handler

Manages the acknowledgment lifecycle, ensuring all events are tracked and updates are reflected in real time.

Message Broker Client

Publishes interrupts and feedback to the broker for delivery to external systems, agents, or UIs.


4. Configuration and Code Resolution Layer

This subsystem manages DSL definitions, webhook configurations, and dynamic code loading from the central functions registry.

Controller

Primary orchestrator that manages additions, updates, and deletions of DSLs, webhook entries, and topic subscriptions.

Functions DB Client API

Provides access to the function metadata and code used in custom webhooks and triggers.

Functions Code Loader API

Fetches executable function code from the central store and stores it in the local cache.

Service Config Cache

Maintains in-memory configuration state (DSLs, topics, webhooks) for fast access and execution consistency.


5. RQLite-Based State Store

The RQLite database provides persistent storage for all system metadata, including topics, DSLs, webhook configurations, trigger definitions, and generated interrupts/feedback.

Table Name Description
message_topics_list Tracks all active broker topics and their associated services
observation_dsls DSL metadata and configuration for observation workflows
registered_webhooks Webhook to function mappings, including the execution plan
trigger_checker_workflows DSL IDs and associated config for all trigger workflows
local_interrupts_table Stores all raised interrupt messages with status tracking
local_feedback_table Logs feedback signals, their targets, and handler metadata

6. Internal Event Queues and Execution Flow

The internal routing and queue system ensures that observations and triggers are processed in order, at scale, and with rate control.

Internal Queue System

Each observation thread maintains a queue where observation outputs are placed. Triggers consume from these queues asynchronously.

Event Collector

Injects externally captured events (e.g., NATS topic messages) into observation queues.

Timer Ticker

Each observation thread uses a ticker to govern execution frequency, ensuring that webhooks are polled at defined intervals.

Queue-Based Decoupling

Observation and trigger DSLs communicate via internal queues to isolate evaluation frequency and ensure non-blocking execution.


SDK Structures

This section documents the core data structures used in the Observation and Trigger Execution Framework. These structures are modeled as Python @dataclass objects and represent persistent entities typically stored in MongoDB. Each data class includes helper methods from_dict() and to_dict() to support serialization and deserialization.


1. MessageTopic

@dataclass
class MessageTopic:
    subscription_id: str
    broker_svc_url: str
    message_topic: str
Field Type Description
subscription_id str Unique ID used to track the subscription.
broker_svc_url str URL of the message broker service (e.g., NATS, Kafka).
message_topic str The name of the message topic the system subscribes to.

2. ObservationDSL

@dataclass
class ObservationDSL:
    observation_id: str
    dsl_workflow_id: str
    workflow_params: Dict[str, Any] = field(default_factory=dict)
    workflow_config: Dict[str, Any] = field(default_factory=dict)
    webhook_sources_ids: List[str] = field(default_factory=list)
    event_subscription_ids: List[str] = field(default_factory=list)
Field Type Description
observation_id str Unique ID for this observation.
dsl_workflow_id str ID of the DSL workflow used for observation.
workflow_params dict Parameters passed to the DSL execution.
workflow_config dict Additional DSL configuration, e.g., scheduler settings or module config.
webhook_sources_ids List[str] List of registered webhook source IDs used by the observation.
event_subscription_ids List[str] List of subscription IDs to event streams (e.g., NATS, Kafka topics).

3. RegisteredWebhook

@dataclass
class RegisteredWebhook:
    webhook_id: str
    webhook_function_id: str
    function_json: Dict[str, Any]
Field Type Description
webhook_id str Unique ID for the webhook instance.
webhook_function_id str ID of the function invoked when the webhook is triggered.
function_json dict Complete function definition/metadata in JSON format.

4. TriggerCheckerWorkflow

@dataclass
class TriggerCheckerWorkflow:
    trigger_checker_id: str
    trigger_checker_dsl_workflow_id: str
    trigger_dsl_workflow_config: Dict[str, Any] = field(default_factory=dict)
    observation_ids: List[str] = field(default_factory=list)
Field Type Description
trigger_checker_id str Unique ID identifying the trigger workflow instance.
trigger_checker_dsl_workflow_id str ID of the DSL workflow responsible for trigger evaluation.
trigger_dsl_workflow_config dict Configuration settings passed to the trigger DSL workflow.
observation_ids List[str] References to associated observation workflow IDs.

5. LocalInterrupt

@dataclass
class LocalInterrupt:
    interrupt_id: str
    interrupt_topic: str
    interrupt_message_data: Dict[str, Any] = field(default_factory=dict)
    target_subject_ids: List[str] = field(default_factory=list)
    interrupt_ts: str = ""
    interrupt_status: str = ""
    interrupt_handled_by_function_id: str = ""
Field Type Description
interrupt_id str Unique ID for this interrupt.
interrupt_topic str Topic or category under which the interrupt is classified.
interrupt_message_data dict Actual content/payload of the interrupt message.
target_subject_ids List[str] Subjects or actors this interrupt is intended for.
interrupt_ts str Timestamp when the interrupt was raised.
interrupt_status str Status of the interrupt (e.g., pending, handled, ignored).
interrupt_handled_by_function_id str ID of the function that handled the interrupt, if applicable.

6. LocalFeedback

@dataclass
class LocalFeedback:
    feedback_id: str
    feedback_topic: str
    feedback_message_data: Dict[str, Any] = field(default_factory=dict)
    target_subject_ids: List[str] = field(default_factory=list)
    feedback_ts: str = ""
    feedback_status: str = ""
    feedback_handled_by_function_id: str = ""
Field Type Description
feedback_id str Unique ID for the feedback entry.
feedback_topic str Topic/category of the feedback.
feedback_message_data dict Payload content for the feedback.
target_subject_ids List[str] Recipients or entities the feedback applies to.
feedback_ts str Timestamp of when the feedback was received or generated.
feedback_status str Status of the feedback (e.g., acknowledged, processed, pending).
feedback_handled_by_function_id str Function ID that processed this feedback, if any.

Installation Instructions

The observation_trigger library provides the foundational components for observation pipelines, trigger execution, and interrupt handling in event-driven systems. To use this package in your own project or environment, follow the steps below.


1. Clone or Download the Repository

If you're working from a local codebase:

git clone <your-repo-url>
cd observation_trigger

2. Install the Package

You can install the package using pip in editable mode (for development) or standard mode (for usage).

โœ… Standard Installation

pip install .

This installs the package along with its required dependencies as defined in install_requires.

Development Installation

If you intend to modify or test the codebase:

pip install -e .[dev]

This includes development dependencies such as pytest and flake8.


Project Structure (Key Components)

After installation, the following core modules become available for import:

Module Description
observation_trigger.dsl_executor Executes DSL-based workflows
observation_trigger.events Generates timed events for triggering observations
observation_trigger.trigger Runs the trigger pipeline with DSL inference
observation_trigger.webhooks_manager Manages external API data sources and query logic
observation_trigger.resolver Resolves DSL output into executable actions

Python Version

The package requires Python 3.7 or above, as specified in python_requires.


Runtime Dependencies

Package Version Purpose
requests 2.31.0 HTTP API calls
Flask 3.0.3 (Optional) exposing REST/websocket APIs
websockets 13.1 Real-time event communication

These will be automatically installed via pip install ..


General Usage

This section explains how to use the observation_trigger library to initialize and run an observation-trigger execution pipeline. The main entry point for setting up this system is the new_observe_trigger_flow function, which encapsulates the creation and configuration of both the observation and trigger layers.


Function: new_observe_trigger_flow(...)

def new_observe_trigger_flow(
    observation_dsl_id: str,
    trigger_dsl_id: str,
    stateful=True,
    interval=30,
    observation_queue_size=100,
    init_observation_apis={}
) -> ObserveTrigger
Parameter Type Description
observation_dsl_id str The DSL ID to be used for periodic observation.
trigger_dsl_id str The DSL ID responsible for evaluating trigger logic.
stateful bool If True, uses historical API data in observation (default: True).
interval int Interval (in seconds) between observation events (default: 30).
observation_queue_size int Max number of events to buffer between observation and trigger (default: 100).
init_observation_apis dict Configuration for initial API sources (URL, method, headers, etc).

Returns an instance of ObserveTrigger, ready for initialization and execution.


Example Usage

from observation_trigger import new_observe_trigger_flow

# Define observation and trigger DSL workflow IDs
observation_id = "observe_temperature_sensors"
trigger_id = "check_for_temperature_alerts"

# Define initial API sources to be queried by observation workflow
api_config = {
    "temperature_sensor_api": {
        "full_url": "http://iot-sensors.local/api/temperature",
        "method": "GET",
        "headers": {"Authorization": "Bearer abc123"},
        "params": {"location": "warehouse-7"},
        "history_size": 20
    }
}

# Create the ObserveTrigger instance
observe_trigger = new_observe_trigger_flow(
    observation_dsl_id=observation_id,
    trigger_dsl_id=trigger_id,
    interval=15,
    stateful=True,
    init_observation_apis=api_config
)

# Initialize the DSL executor
observe_trigger.init_observation()

# Start observation and trigger loop in background threads
observe_trigger.start_observation()
observe_trigger.start_trigger()

# Continuously listen for trigger outputs
for trigger_result in observe_trigger.listen_for_trigger_results():
    print(f"[TRIGGER] {trigger_result}")

Execution Flow

  1. API Configuration You define one or more external API endpoints used to fetch real-time data (e.g., sensor metrics, prices, system health, etc.).

  2. Observation Initialization The observation DSL executor is initialized with the defined workflow ID and configured API sources.

  3. Observation Loop (Threaded) Every interval seconds, API sources are queried. The result is passed into the observation DSL workflow, and the output is pushed to a queue.

  4. Trigger Execution (Threaded) The trigger DSL consumes this observation queue, applies its logic, and produces results.

  5. Trigger Result Handling Results can be received via a streaming interface (listen_for_trigger_results()) and routed to other systems or resolvers (e.g., function calls, human approvals, etc.).


Registering Webhooks

The observation_trigger system supports integration with external APIs via a webhook-based mechanism, enabling real-time or scheduled data fetching from HTTP endpoints. Webhooks are managed through the APISourcesManager class, which allows you to dynamically register, configure, and query remote APIs.


What is a Webhook in This Context?

In this framework, a "webhook" is an HTTP-based API endpoint that is periodically queried or called as part of an observation workflow. These endpoints return data that becomes input to a DSL workflow. You can register multiple such sources and fetch them individually or collectively.


Step 1: Define a Webhook

Each webhook is defined using the following parameters:

Parameter Type Description
name str Unique name for the webhook. Used to reference the source.
full_url str Complete URL of the external API.
method str HTTP method (GET or POST).
headers dict Optional request headers (e.g., auth token, content-type).
credential_callback func Optional function to dynamically inject auth headers.
history_size int Number of historical responses to retain (sliding window).
params dict Optional query params for GET requests.
data dict Optional body for POST requests.

Step 2: Registering a Webhook via APISourcesManager

from observation_trigger.webhooks_manager import APISourcesManager

# Initialize manager
api_manager = APISourcesManager()

# Register a webhook
api_manager.register_api(
    name="weather_data",
    full_url="https://api.weather.com/v3/wx/conditions/current",
    method="GET",
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    history_size=10,
    params={"geocode": "37.7749,-122.4194", "format": "json"}
)

Step 3: Query a Webhook

# Query the webhook
response = api_manager.query_api("weather_data")

# Query and include history
response, history = api_manager.query_api_with_history("weather_data")

Step 4: Use with Observation Flow

Once the webhook(s) are registered, you can pass the configuration to the observation system:

webhook_config = api_manager.get_config()

observe_trigger = new_observe_trigger_flow(
    observation_dsl_id="weather_observer",
    trigger_dsl_id="weather_alert_checker",
    interval=60,
    init_observation_apis=webhook_config
)

observe_trigger.init_observation()
observe_trigger.start_observation()
observe_trigger.start_trigger()

Loading Webhooks from Configuration

You can also define all your API sources in a dictionary and load them at once:

config = {
    "sensor_api": {
        "full_url": "http://sensors.local/api/metrics",
        "method": "GET",
        "headers": {"Authorization": "Bearer xyz"},
        "history_size": 20,
        "params": {"zone": "A1"},
    }
}

api_manager.load_from_config(config)

Factory Functions and Database Managers

The observation_trigger framework provides several MongoDB-backed database managers, each responsible for managing a specific type of document in the system (e.g., observation definitions, triggers, feedback). These managers are instantiated using factory functions that abstract away the initialization logic.

Each manager includes standard CRUD methods:

  • insert()
  • update()
  • delete()
  • query()
  • get_by_id()

These allow you to persist, modify, and retrieve entries using the associated @dataclass schema.


Factory Functions and Their Responsibilities

Factory Function Returns Purpose
new_message_topic_manager() MessageTopicDatabase Manages message topic subscriptions (e.g., broker topic metadata).
new_observation_dsl_manager() ObservationDSLDatabase Manages observation DSL workflows and API source references.
new_registered_webhook_manager() RegisteredWebhookDatabase Manages webhook-function mappings used in response triggers.
new_trigger_checker_workflow_manager() TriggerCheckerWorkflowDatabase Manages trigger evaluation workflows.
new_local_interrupt_manager() LocalInterruptDatabase Stores and manages interrupt signals and metadata.
new_local_feedback_manager() LocalFeedbackDatabase Stores and manages feedback records from users/systems.

Methods Provided by Each Manager

1. insert(obj: Dataclass) -> Tuple[bool, Union[str, None]]

Description
Inserts the object into the database.
Returns a tuple with status and inserted ID or error string.

2. update(id: str, update_fields: Dict) -> Tuple[bool, Union[int, str]]

Description
Updates the document with given ID using the provided fields.
Returns status and number of modified documents or error.

3. delete(id: str) -> Tuple[bool, Union[int, str]]

Description
Deletes the document matching the given ID.
Returns status and number of deletions or error.

4. query(filter: Dict) -> Tuple[bool, Union[List[Dict], str]]

Description
Queries documents using the given MongoDB-style filter.
Returns list of matching documents or error message.

5. get_by_id(id: str) -> Tuple[bool, Union[Dataclass, str]]

Description
Retrieves a single document using the primary key.
Returns deserialized object or error message.

Example Usage

from observation_trigger.factories import new_observation_dsl_manager

obs_db = new_observation_dsl_manager()

# Insert
success, result = obs_db.insert(ObservationDSL(
    observation_id="obs_001",
    dsl_workflow_id="observe_temp_dsl",
    webhook_sources_ids=["weather_api"],
    event_subscription_ids=[]
))

# Query
success, results = obs_db.query({"dsl_workflow_id": "observe_temp_dsl"})