Source code for prefilter.prefilter

import datetime
import os
import sys
import uuid
import asyncio
import marshmallow_dataclass
from collections import defaultdict

sys.path.append(os.getcwd())
from src.base.clickhouse_kafka_sender import ClickHouseKafkaSender
from src.base.data_classes.batch import Batch
from src.base.logline_handler import LoglineHandler
from src.base.kafka_handler import (
    ExactlyOnceKafkaProduceHandler,
    ExactlyOnceKafkaConsumeHandler,
    KafkaMessageFetchException,
)
from src.base.log_config import get_logger
from src.base.utils import (
    setup_config,
    get_zeek_sensor_topic_base_names,
    generate_collisions_resistant_uuid,
)

module_name = "log_filtering.prefilter"
logger = get_logger(module_name)

config = setup_config()

CONSUME_TOPIC_PREFIX = config["environment"]["kafka_topics_prefix"]["pipeline"][
    "batch_sender_to_prefilter"
]
PRODUCE_TOPIC_PREFIX = config["environment"]["kafka_topics_prefix"]["pipeline"][
    "prefilter_to_inspector"
]

SENSOR_PROTOCOLS = get_zeek_sensor_topic_base_names(config)
PREFILTERS = config["pipeline"]["log_filtering"]
INSPECTORS = config["pipeline"]["data_inspection"]
COLLECTORS = [
    collector for collector in config["pipeline"]["log_collection"]["collectors"]
]


[docs] class Prefilter: """Main component of the Log Filtering stage to process and filter batches Consumes batches from the Log Collection stage and applies relevance-based filtering using the :class:`LoglineHandler`. Filters out irrelevant loglines and forwards only relevant data to the next pipeline stage for anomaly detection. """
[docs] def __init__( self, validation_config, consume_topic, produce_topics, relevance_function_name ): """Initializes a new ``Prefilter`` instance with the specified configuration. Args: validation_config (list): Configuration for validating log line fields consume_topic (str): Kafka topic to consume data from produce_topics (list[str]): Kafka topics to produce filtered data to relevance_function_name (str): Name of the relevance function to apply """ self.name = None self.consume_topic = consume_topic self.produce_topics = produce_topics self.batch_id = None self.begin_timestamp = None self.end_timestamp = None self.subnet_id = None self.parent_row_id = None self.relevance_function_name = relevance_function_name self.unfiltered_data = [] self.filtered_data = [] self.logline_handler = LoglineHandler(validation_config) self.kafka_consume_handler = ExactlyOnceKafkaConsumeHandler(self.consume_topic) self.kafka_produce_handler = ExactlyOnceKafkaProduceHandler() # databases self.batch_tree = ClickHouseKafkaSender("batch_tree") self.batch_timestamps = ClickHouseKafkaSender("batch_timestamps") self.logline_timestamps = ClickHouseKafkaSender("logline_timestamps") self.fill_levels = ClickHouseKafkaSender("fill_levels") self.fill_levels.insert( dict( timestamp=datetime.datetime.now(), stage=module_name, entry_type="total_loglines", entry_count=0, ) )
[docs] def get_and_fill_data(self) -> None: """Retrieves and processes new data from Kafka. This method: 1. Clears any existing data 2. Consumes a new batch of data from Kafka 3. Extracts batch metadata (ID, timestamps, subnet ID) 4. Stores the unfiltered data internally 5. Records processing timestamps and metrics Note: This method blocks until data is available on the Kafka topic. Empty batches are handled gracefully but logged for monitoring. """ self.clear_data() # clear in case we already have data stored key, data = self.kafka_consume_handler.consume_as_object() self.subnet_id = key if data.data: self.parent_row_id = data.batch_tree_row_id self.batch_id = data.batch_id self.begin_timestamp = data.begin_timestamp self.end_timestamp = data.end_timestamp self.unfiltered_data = data.data self.batch_timestamps.insert( dict( batch_id=self.batch_id, stage=module_name, instance_name=self.name, status="in_process", timestamp=datetime.datetime.now(), is_active=True, message_count=len(self.unfiltered_data), ) ) row_id = generate_collisions_resistant_uuid() self.batch_tree.insert( dict( batch_row_id=row_id, stage=module_name, instance_name=self.name, status="in_process", timestamp=datetime.datetime.now(), parent_batch_row_id=self.parent_row_id, batch_id=self.batch_id, ) ) self.fill_levels.insert( dict( timestamp=datetime.datetime.now(), stage=module_name, entry_type="total_loglines", entry_count=len(self.unfiltered_data), ) ) if not self.unfiltered_data: logger.info( f"Received message:\n" f" ⤷ Empty data field: No unfiltered data available. subnet_id: '{self.subnet_id}'" ) else: logger.info( f"{self.consume_topic} Received message:\n" f" ⤷ Contains data field of {len(self.unfiltered_data)} message(s) with " f"subnet_id: '{self.subnet_id}'." )
[docs] def check_data_relevance_using_rules(self) -> None: """Applies relevance filtering to the unfiltered data. This method: 1. Iterates through each log line in unfiltered_data 2. Applies the configured relevance function to determine if the log line is relevant 3. Adds relevant log lines to filtered_data 4. Records non-relevant log lines as filtered out in the database Note: The specific relevance function used is determined by the relevance_function_name parameter provided during initialization. """ for logline in self.unfiltered_data: if self.logline_handler.check_relevance( logline_dict=logline, function_name=self.relevance_function_name ): self.filtered_data.append(logline) else: # not relevant, filtered out logline_id = uuid.UUID(logline.get("logline_id")) self.logline_timestamps.insert( dict( logline_id=logline_id, stage=module_name, status="filtered_out", timestamp=datetime.datetime.now(), is_active=False, ) ) self.fill_levels.insert( dict( timestamp=datetime.datetime.now(), stage=module_name, entry_type="total_loglines", entry_count=len(self.filtered_data), ) )
[docs] def send_filtered_data(self): """Sends the filtered data to the configured Kafka topics. This method: 1. Verifies there is filtered data to send 2. Prepares the data in the required batch format 3. Records completion timestamps in the database 4. Sends the data to all configured produce topics Raises: ValueError: If there is no filtered data to send """ row_id = generate_collisions_resistant_uuid() if not self.filtered_data: logger.debug("Failed to send data: No filtered data.") return data_to_send = { "batch_tree_row_id": row_id, "batch_id": self.batch_id, "begin_timestamp": self.begin_timestamp, "end_timestamp": self.end_timestamp, "data": self.filtered_data, } # important to finish before sending, otherwise inspector can process before finished here! self.batch_timestamps.insert( dict( batch_id=self.batch_id, stage=module_name, instance_name=self.name, status="finished", timestamp=datetime.datetime.now(), is_active=True, message_count=len(self.filtered_data), ) ) self.batch_tree.insert( dict( batch_row_id=row_id, stage=module_name, instance_name=self.name, status="finished", timestamp=datetime.datetime.now(), parent_batch_row_id=self.parent_row_id, batch_id=self.batch_id, ) ) self.fill_levels.insert( dict( timestamp=datetime.datetime.now(), stage=module_name, entry_type="total_loglines", entry_count=0, ) ) batch_schema = marshmallow_dataclass.class_schema(Batch)() for topic in self.produce_topics: self.kafka_produce_handler.produce( topic=topic, data=batch_schema.dumps(data_to_send), key=self.subnet_id, ) logger.info( f"Filtered data was successfully sent:\n" f" ⤷ Contains data field of {len(self.filtered_data)} message(s). Originally: " f"{len(self.unfiltered_data)} message(s). Belongs to subnet_id '{self.subnet_id}'." )
[docs] def clear_data(self) -> None: """Clears all data from the internal data structures. Resets both unfiltered_data and filtered_data lists to empty state, preparing for the next batch processing cycle. """ self.unfiltered_data = [] self.filtered_data = []
[docs] def bootstrap_prefiltering_process(self): """Runs the main prefiltering process loop. This method implements an infinite loop that: 1. Fetches new data from Kafka 2. Filters the data for relevance 3. Sends the filtered data to inspectors """ logger.info(f"I am {self.consume_topic}") while True: self.get_and_fill_data() self.check_data_relevance_using_rules() self.send_filtered_data()
[docs] async def start(self): # pragma: no cover """Starts the ``Prefilter`` processing loop. This method: 1. Logs startup information 2. Runs the main processing loop in a separate thread 3. Handles graceful shutdown on interruption Args: one_iteration (bool): If True, processes only one batch (for testing). Default: False """ loop = asyncio.get_running_loop() logger.info( "Prefilter started:\n" f" ⤷ receiving on Kafka topic '{self.consume_topic}'" ) await loop.run_in_executor(None, self.bootstrap_prefiltering_process) logger.info("Closing down Prefilter...") self.clear_data()
[docs] async def main() -> None: """Creates and starts all configured Prefilter instances. This function: 1. Iterates through all prefilter configurations defined in config.yaml 2. For each prefilter: - Determines the relevance function to use - Sets up the validation configuration based on the config.yaml - Determines the topics to consume from and produce to - Creates an according ``Prefilter`` instance - Runs the ``start`` method """ tasks = [] for prefilter in PREFILTERS: relevance_function_name = prefilter["relevance_method"] validation_config = [ item for collector in COLLECTORS if collector["name"] == prefilter["collector_name"] for item in collector["required_log_information"] ] consume_topic = f"{CONSUME_TOPIC_PREFIX}-{prefilter['name']}" produce_topics = [ f"{PRODUCE_TOPIC_PREFIX}-{inspector['name']}" for inspector in INSPECTORS if prefilter["name"] == inspector["prefilter_name"] ] prefilter_instance = Prefilter( validation_config=validation_config, consume_topic=consume_topic, produce_topics=produce_topics, relevance_function_name=relevance_function_name, ) prefilter_instance.name = prefilter["name"] tasks.append(asyncio.create_task(prefilter_instance.start())) await asyncio.gather(*tasks)
if __name__ == "__main__": # pragma: no cover asyncio.run(main())