Auto backpressure control in OpenCTI connectors messaging
Managing connectors in OpenCTI is crucial for maintaining smooth data ingestion and avoiding system overloads, particularly with RabbitMQ. Prior to version 6.2.12, managing connector execution intervals could be complex, unintuitive, and sometimes prone to errors due to the different types of intervals in place. Now, a new scheduling feature has been introduced to automate and simplify this process for all ‘External Import’ connectors.
This article explores this new feature in detail, its associated environment variables, and the benefits it offers.
Purpose of the Scheduler
Functional overview
The scheduler allows for more efficient management of connectors execution. When a connector is scheduled to run, the scheduler first checks the size of the messages accumulated in RabbitMQ before starting the connector process, except during the first execution. If this size exceeds the server capacity defined by the environment variable queue_threshold
, the connector switches to ‘Buffering’ mode and postpones its execution according to the duration_period
environment variable.
This feature is essential for preventing overloads that can occur in the queues, allowing connectors to ingest data more smoothly without manual intervention. Additionally, the OpenCTI interface now displays more precise information, such as ‘Last run,’ ‘Next run,’ and ‘Server capacity,’ which are updated every 40 seconds via pingAlive
.
New variables environment
Two new environment variables have been introduced to enhance this feature:
duration_period
: Specifies the execution period duration using the ISO 8601 format.queue_threshold
: Defines the queue threshold (in MB) at which connectors switch to ‘Buffering’ mode. By default, this threshold is set to 500 MB.
These variables allow for adjusting the scheduler’s behavior based on the specific needs of your environment.
Which types of connectors is this features limited to?
This new feature applies exclusively to ‘External Import’ connectors. These connectors require regular scheduling for importing data from external APIs, unlike other types of connectors that do not require scheduled re-execution.
If you’d like to learn more about the different types of connectors, we invite you to read our documentation: https://docs.opencti.io/latest/deployment/connectors/
Why is this important?
Principal use case
The new scheduling feature is particularly crucial for platform administrators, who must closely monitor data ingestion for each connector. Indeed, a connector might send a massive amount of data to OpenCTI within a very short time frame. If this process is not well managed, it can lead to significant issues.
Use Case #1: Initially, the Platform Team had to constantly monitor the queues and manually identify problematic connectors. To avoid overloading the queues in RabbitMQ, it was necessary to stop these connectors (without resetting their state), while allowing the workers to process the remaining data. This approach, although functional, was cumbersome as it consumed a lot of time and resources.
Use Case #2: If a connector struggles to ingest data and is not stopped in time, the RabbitMQ queues will continue to fill up if the connector’s interval is too short, eventually leading to a complete saturation of the host system’s physical memory, falling into one of the worst possible scenarios. This situation can cause errors and crashes in RabbitMQ, forcing the team to manually clear the overloaded queues and restart RabbitMQ. This intervention results in the loss of all pending data and requires resetting the connector’s state. The team must then manually restore the closest possible state, restart the connector, and closely monitor its ingestion to prevent further overloads.
Solution: With the addition of the ‘Scheduler,’ platform administrators no longer need to constantly monitor all connectors. Connectors can now automatically switch to ‘Buffering’ mode when the predefined threshold is reached. This saves significant time and resources while ensuring more efficient and secure management of RabbitMQ queues, all without heavy and potentially risky manual intervention.
How does it work?
The management of connector execution intervals has been standardized and simplified through two distinct methods:
schedule_iso() (with breaking change)
The schedule_iso()
method uses the ISO 8601 format, allowing for standardized and precise scheduling of connector execution intervals, while providing better readability for users. For example, ‘P1D’ represents a period of 1 day, while ‘PT24H’ indicates a duration of 24 hours. This approach is now the standard for all new connector integrations, ensuring clarity and consistency in managing connector execution intervals.
schedule_iso()
requires two arguments to function,message_callback
andduration_period
message_callback
: Equivalent to initiating the connector process.duration_period
: Its value must be in ISO 8601 format, and the corresponding environment variable must exist in the configuration. This internationally recognized format ensures consistent interpretation of durations, making future integrations easier to read and minimizing errors.
Implementation Example
In the config.yml or docker-compose.yml :
# config.yml
duration_period: "PT5H" # Add this variable
queue_threshold: 600 # Added variable if different from default value (500 Mo)
# docker-compose.yml
- CONNECTOR_DURATION_PERIOD=PT5H # Add this variable
- CONNECTOR_QUEUE_THRESHOLD=600 # Added variable if different from default value (500 Mo)
In the connector code:
# Get Connector config
self.duration_period = get_config_variable(
"CONNECTOR_DURATION_PERIOD", ["connector", "duration_period"], config
)
def run(self):
# Use schedule_iso() method
# In the message_callback, you must indicate the connector process, here self.starter
# New duration_period that takes into account the standardized ISO 8601 format
self.helper.schedule_iso(message_callback=self.starter, duration_period=self.duration_period) # duration_period: "PT5H" => 5 Hours
schedule_unit() (without breaking change)
The schedule_unit()
method allows you to reuse the existing connector interval by specifying the time unit, such as hours or minutes. This approach facilitates a gradual transition to the new scheduling system without immediately impacting connectors or ongoing operations, although this method is intended to be phased out over time.
schedule_unit()
requires three arguments to function,message_callback
,duration_period
, andtime_unit
message_callback
: Equivalent to initiating the connector process.duration_period
: Unlikeschedule_iso()
, theduration_period
does not need to be in the environment variables as it uses the existing interval (ex:interval_hours=6
), normally, this is a numeric value.time_unit
: This argument specifies the time unit used byduration_period
; in our example,time_unit
should be set to ‘HOURS’.
Implementation example
Unlike schedule_iso()
, there’s no need to configure config.yml
or docker-compose.yml
as this method will use the existing connector interval variable.
In the connector code:
def run(self):
# Use schedule_unit() method
# In the message_callback, you must indicate the connector process, here self.starter
# duration_period reuses the existing connector interval, here self.interval_hours
# time_unit for Retro-compatible - Enum Valid (YEARS, WEEKS, DAYS, HOURS, MINUTES, SECONDS)
self.helper.schedule_unit(message_callback=self.starter, duration_period=self.interval_hours, time_unit=self.helper.TimeUnit.HOURS)
Additional information on the Scheduler
Previously, connectors managed execution intervals using a while true
loop combined with time.sleep(self.interval)
directly in the code, with the interval being customizable in the configuration. Now, all these elements will be removed, and the same applies to the implementation of ‘Run & Terminate’ within the connector; this management will be entirely handled by the ‘Scheduler’.
Display in the user interface
Behavior and Display for Connectors in ‘Run & Terminate’ Mode
When a connector is configured in ‘Run & Terminate’ mode, the value for ‘Next run’ is defined by ‘External schedule.’ Organizations that choose this mode typically manage execution intervals using external scheduling, allowing them flexibility in managing the connector’s execution cycles.
Note: A special case has been implemented. If the duration_period
in the configuration is set to, for example: 0, ‘0’, ‘P0D’, ‘PT0S’, etc., the connector will exhibit the same behavior as a ‘Run & Terminate’ mode.
Display of Details for Connectors in ‘Buffering’ Mode
When the RabbitMQ queue capacity exceeds the defined threshold (for example, if queue_message_size
is at 9.90 MB and queue_threshold
is configured to 8 MB), the connector automatically switches to ‘Buffering’ mode.
In ‘Buffering’ mode, the connector’s execution is paused until the queue capacity falls below the specified threshold. The user interface displays visual indicators to signal this state change, including a warning message and a color change in the ‘Server Capacity’ section.
Display of Details for Connectors Not Using the Scheduler
Connectors that do not support the scheduler will display ‘Not Provided’ in the user interface.
Note: If a last_run
is present in the state and is in timestamp format, it will be automatically converted and displayed with the notation ‘(from State)’ next to the ‘Last run’ title, indicating to the user that this information comes from the state and not from the scheduler.
Conclusion
The new scheduling feature (Scheduler) introduced in OpenCTI version 6.2.12 provides more standardized, secure, and automated management for ‘External Import’ connectors. It reduces the risks associated with RabbitMQ queue overloads and manual interventions, freeing up valuable time for users to focus on higher-value tasks.
With the schedule_iso()
and schedule_unit()
methods, users can adjust the execution of connectors based on the specific needs of their environment, while allowing a gradual transition to the new feature. It is important to note that the schedule_iso()
method should now be preferred, while the schedule_unit()
method is only present to facilitate the transition from the absence of a scheduler to using schedule_iso()
.
This improvement also provides a more understandable and intuitive overview of connector execution processes in the user interface with ‘Last run,’ ‘Next run,’ and ‘Server Capacity.’ You can already test this new feature with the ‘external-import’ template or directly with connectors that already integrate the scheduler: CISA KEV, Mandiant, SEKOIA, AlienVault, Recorded Future, CrowdStrike (v6.3).
We hope this article has helped you better understand the benefits and possibilities offered by this new feature. Interested in more tips and advice on OpenCTI? Join our Slack community and connect with other users to share ideas and solutions.
Read more
Explore related topics and insights