1 files changed, 60 insertions(+), 42 deletions(-)

M syslogd.py
M syslogd.py +60 -42
@@ 11,13 11,13 @@ SQL_DUMP = os.environ.get("SQL_DUMP") ==
 SQL_WRITE = os.environ.get("SQL_WRITE") == "True"
 BINDING_IP = os.environ.get("BINDING_IP", "0.0.0.0")
 BINDING_PORT = os.environ.get("BINDING_PORT", "5140")
+
 del os
 
-# Code
 import asyncio
 import aiosqlite
 import signal
-from datetime import datetime
+from datetime import datetime, timedelta
 from priority import SyslogMatrix
 from rfc5424 import convert_rfc5424_to_rfc3164
 

          
@@ 28,7 28,6 @@ except ImportError:
 
 
 class SyslogUDPServer:
-
     syslog_matrix = SyslogMatrix()
 
     def __init__(self, host, port, loop=None):

          
@@ 36,10 35,9 @@ class SyslogUDPServer:
         self.port = port
         self.loop = loop or asyncio.get_event_loop()
         self.running = False
-
-        # Register the signal handler for SIGINT (Control-C)
+        self.fts_buffer = []  # Buffer for FTS messages
+        self.last_fts_sync = None  # Timestamp of last FTS sync
         self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper)
-
         self.setup()
         print(f"Listening on UDP {host}:{port} using {self.loop.__module__}")
 

          
@@ 50,6 48,9 @@ class SyslogUDPServer:
     async def handle_sigint(self, signum, frame):
         print("\nSIGINT received. Shutting down gracefully...")
         self.running = False
+        # Sync any remaining FTS messages before shutdown
+        if self.fts_buffer:
+            await self.sync_fts_buffer()
         await self.shutdown()
 
     def handle_sigint_wrapper(self):

          
@@ 70,9 71,7 @@ class SyslogUDPServer:
 
     async def connect_to_sqlite(self):
         self.db = await aiosqlite.connect("syslog.db", loop=self.loop)
-        # Enable WAL mode for better write performance
         await self.db.execute("PRAGMA journal_mode=WAL")
-        # Enable auto_vacuum to keep the database size in check
         await self.db.execute("PRAGMA auto_vacuum = FULL")
         await self.db.commit()
 

          
@@ 105,37 104,50 @@ class SyslogUDPServer:
         return table_name
 
     def escape(self, msg):
-        """
-        Escapes special characters in the message to prevent SQL injection.
-        Handles strings safely for SQLite insertion.
-        """
         if not isinstance(msg, str):
             return str(msg)
-        escaped_msg = (
-            msg.replace("'", "''")  # Escapes single quotes for SQLite
-            .replace('"', '""')  # Escapes double quotes
-            .replace("\\", "\\\\")  # Escapes backslashes
-        )
-        return escaped_msg
+        return msg.replace("'", "''").replace('"', '""').replace("\\", "\\\\")
+
+    async def sync_fts_buffer(self, table_name=None):
+        if not self.fts_buffer:
+            return
+
+        year_month = self.fts_buffer[0]["ReceivedAt"].strftime("%Y%m")
+        fts_table_name = f"SystemEventsFTS{year_month}"
+        messages = [(entry["Message"],) for entry in self.fts_buffer]
+
+        try:
+            await self.db.executemany(
+                f"INSERT INTO {fts_table_name} (Message) VALUES (?)", messages
+            )
+            await self.db.commit()
+            self.last_fts_sync = datetime.now()
+            self.fts_buffer = []
+            if DEBUG:
+                print(f"Synced {len(messages)} messages to {fts_table_name}")
+        except Exception as e:
+            if DEBUG:
+                print(f"Failed to sync FTS buffer: {e}")
+            await self.db.rollback()
 
     async def handle_datagram(self, data, address):
-        """
-        Handle RFC-3164 message
-        e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT
-             [m][d] [---t--] [hostn-program]  [--------message-------]
-        """
         ReceivedAt = datetime.now()
         data = data.decode()
         data = convert_rfc5424_to_rfc3164(data)
         if LOG_DUMP:
             print(f"\n  DATA: {data}")
 
-        # Get current year and month for table name
-        year_month = ReceivedAt.strftime("%Y%m")  # e.g., "202503"
+        year_month = ReceivedAt.strftime("%Y%m")
         table_name = await self.create_monthly_table(year_month)
 
         datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")]
-        m, d, t, hostname = datetime_hostname_program.split()[:4]
+        try:
+            m, d, t, hostname = datetime_hostname_program.split()[:4]
+        except ValueError:
+            if DEBUG:
+                print(f"Invalid message format: {data}")
+            return
+
         formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year}"
         DeviceReportedTime = datetime.strptime(formatted_datetime, "%b %d %H:%M:%S %Y")
         time_delta = ReceivedAt - DeviceReportedTime

          
@@ 146,17 158,15 @@ class SyslogUDPServer:
             )
         time_delta = ReceivedAt - DeviceReportedTime
         if abs(time_delta.days) > 1:
-            pass  # Something wrong, just ignore it.
+            pass
         else:
             program = "-".join(datetime_hostname_program.split()[4:])
-            if "[" in program:
-                SysLogTag = program[: program.find("[")]
-            else:
-                SysLogTag = program
-            if "[" in program:
-                ProcessID = program[program.find("[") + 1 : program.find("]")]
-            else:
-                ProcessID = "0"
+            SysLogTag = program[: program.find("[")] if "[" in program else program
+            ProcessID = (
+                program[program.find("[") + 1 : program.find("]")]
+                if "[" in program
+                else "0"
+            )
             Message = data[data.find(": ") + 2 :]
             code = data[data.find("<") + 1 : data.find(">")]
             Facility, Priority = self.syslog_matrix.decode_int(code)

          
@@ 173,8 183,8 @@ class SyslogUDPServer:
                 "Priority": Priority,
                 "FromHost": FromHost,
                 "InfoUnitID": InfoUnitID,
-                "ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"),
-                "DeviceReportedTime": DeviceReportedTime.strftime("%Y-%m-%d %H:%M:%S"),
+                "ReceivedAt": ReceivedAt,
+                "DeviceReportedTime": DeviceReportedTime,
                 "SysLogTag": SysLogTag,
                 "ProcessID": ProcessID,
                 "Message": Message,

          
@@ 196,12 206,20 @@ class SyslogUDPServer:
                 try:
                     async with self.db.execute(sql_command, params) as cursor:
                         pass
-                    # Optionally sync to FTS table if needed
-                    await self.db.execute(
-                        f"INSERT INTO SystemEventsFTS{year_month} (Message) VALUES (?)",
-                        (Message,),
-                    )
                     await self.db.commit()
+
+                    # Add to FTS buffer
+                    self.fts_buffer.append(params)
+
+                    # Check buffer size or time since last sync
+                    if len(self.fts_buffer) >= 100:
+                        await self.sync_fts_buffer(table_name)
+                    elif (
+                        self.last_fts_sync
+                        and (ReceivedAt - self.last_fts_sync).total_seconds() >= 10
+                    ):
+                        await self.sync_fts_buffer(table_name)
+
                 except Exception as e:
                     if DEBUG:
                         print(f"\n   SQL: {sql_command}")