split table into monthly table
3 files changed, 59 insertions(+), 47 deletions(-)

M priority.py
M rfc5424.py
M syslogd.py
M priority.py +3 -1
@@ 49,7 49,9 @@ class SyslogMatrix:
 
     def decode(self, code):
         code = str(code) if isinstance(code, int) else code
-        facility, level = self.matrix.get(code)
+        facility, level = self.matrix.get(
+            code, ("kernel", "emergency")
+        )  # Fallback to 0, 0
         return (
             (facility, self.FACILITIES.index(facility)),
             (level, self.LEVELS.index(level)),

          
M rfc5424.py +3 -3
@@ 10,6 10,8 @@ def convert_rfc5424_to_rfc3164(message):
     match = re.match(pattern, message)
 
     if not match:
+        if DEBUG:
+            print(f"Failed to parse RFC-5424 message: {message}")
         return message
 
     # Extract the necessary fields

          
@@ 30,8 32,6 @@ def convert_rfc5424_to_rfc3164(message):
     timestamp_rfc3164 = timestamp_dt.strftime("%b %d %H:%M:%S")
 
     # Rearrange the fields according to RFC-3164 format
-    rfc3164_message = (
-        f"<{rfc3164_pri}>{timestamp_rfc3164} {hostname}: {message}"
-    )
+    rfc3164_message = f"<{rfc3164_pri}>{timestamp_rfc3164} {hostname}: {message}"
 
     return rfc3164_message

          
M syslogd.py +53 -43
@@ 13,15 13,6 @@ BINDING_IP = os.environ.get("BINDING_IP"
 BINDING_PORT = os.environ.get("BINDING_PORT", "5140")
 del os
 
-# Query
-SQL = (
-    "INSERT INTO SystemEvents (Facility, Priority, FromHost, "
-    "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
-    "ProcessID, Message) VALUES "
-    "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
-    ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)"
-)
-
 # Code
 import asyncio
 import aiosqlite

          
@@ 59,12 50,9 @@ class SyslogUDPServer:
     async def handle_sigint(self, signum, frame):
         print("\nSIGINT received. Shutting down gracefully...")
         self.running = False
-
-        # Run the shutdown coroutine in the event loop and wait for completion
         await self.shutdown()
 
     def handle_sigint_wrapper(self):
-        # Pass signum and frame arguments to handle_sigint()
         asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None))
 
     async def close_db_connection(self):

          
@@ 76,21 64,23 @@ class SyslogUDPServer:
                     print(f"Error while closing the database connection: {e}")
 
     async def shutdown(self):
-        # Stop the server
         self.stop()
-
-        # Close the database connection if SQL_WRITE is True
         await self.close_db_connection()
-
-        # Stop the event loop
         self.loop.stop()
 
     async def connect_to_sqlite(self):
         self.db = await aiosqlite.connect("syslog.db", loop=self.loop)
         # Enable WAL mode for better write performance
         await self.db.execute("PRAGMA journal_mode=WAL")
+        # Enable auto_vacuum to keep the database size in check
+        await self.db.execute("PRAGMA auto_vacuum = FULL")
+        await self.db.commit()
+
+    async def create_monthly_table(self, year_month):
+        table_name = f"SystemEvents{year_month}"
+        fts_table_name = f"SystemEventsFTS{year_month}"
         await self.db.execute(
-            """CREATE TABLE IF NOT EXISTS SystemEvents (
+            f"""CREATE TABLE IF NOT EXISTS {table_name} (
             ID INTEGER PRIMARY KEY AUTOINCREMENT,
             Facility INTEGER,
             Priority INTEGER,

          
@@ 103,24 93,30 @@ class SyslogUDPServer:
             Message TEXT
         )"""
         )
-        await self.db.commit()
-
-        # Create an index on the ReceivedAt field
         await self.db.execute(
-            """CREATE INDEX IF NOT EXISTS idx_ReceivedAt 
-            ON SystemEvents (ReceivedAt)"""
+            f"""CREATE INDEX IF NOT EXISTS idx_ReceivedAt_{year_month} 
+            ON {table_name} (ReceivedAt)"""
         )
-        await self.db.commit()
-
-        # Create the virtual table with the full-text search index
         await self.db.execute(
-            """CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS
+            f"""CREATE VIRTUAL TABLE IF NOT EXISTS {fts_table_name}
             USING FTS5(Message)"""
         )
         await self.db.commit()
+        return table_name
 
-    async def escape(self, msg):
-        return msg
+    def escape(self, msg):
+        """
+        Escapes special characters in the message to prevent SQL injection.
+        Handles strings safely for SQLite insertion.
+        """
+        if not isinstance(msg, str):
+            return str(msg)
+        escaped_msg = (
+            msg.replace("'", "''")  # Escapes single quotes for SQLite
+            .replace('"', '""')  # Escapes double quotes
+            .replace("\\", "\\\\")  # Escapes backslashes
+        )
+        return escaped_msg
 
     async def handle_datagram(self, data, address):
         """

          
@@ 128,17 124,20 @@ class SyslogUDPServer:
         e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT
              [m][d] [---t--] [hostn-program]  [--------message-------]
         """
-        ReceivedAt = datetime.now()  # Generated
+        ReceivedAt = datetime.now()
         data = data.decode()
         data = convert_rfc5424_to_rfc3164(data)
         if LOG_DUMP:
             print(f"\n  DATA: {data}")
+
+        # Get current year and month for table name
+        year_month = ReceivedAt.strftime("%Y%m")  # e.g., "202503"
+        table_name = await self.create_monthly_table(year_month)
+
         datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")]
         m, d, t, hostname = datetime_hostname_program.split()[:4]
         formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year}"
-        DeviceReportedTime = datetime.strptime(
-            formatted_datetime, "%b %d %H:%M:%S %Y"
-        )
+        DeviceReportedTime = datetime.strptime(formatted_datetime, "%b %d %H:%M:%S %Y")
         time_delta = ReceivedAt - DeviceReportedTime
         if abs(time_delta.days) > 1:
             formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year - 1}"

          
@@ 162,22 161,32 @@ class SyslogUDPServer:
             code = data[data.find("<") + 1 : data.find(">")]
             Facility, Priority = self.syslog_matrix.decode_int(code)
             FromHost = hostname or address[0]
-            InfoUnitID = 1  # Hardcoded
+            InfoUnitID = 1
+
+            FromHost = self.escape(FromHost)
+            SysLogTag = self.escape(SysLogTag)
+            ProcessID = self.escape(ProcessID)
+            Message = self.escape(Message)
+
             params = {
                 "Facility": Facility,
                 "Priority": Priority,
                 "FromHost": FromHost,
                 "InfoUnitID": InfoUnitID,
                 "ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"),
-                "DeviceReportedTime": DeviceReportedTime.strftime(
-                    "%Y-%m-%d %H:%M:%S"
-                ),
+                "DeviceReportedTime": DeviceReportedTime.strftime("%Y-%m-%d %H:%M:%S"),
                 "SysLogTag": SysLogTag,
                 "ProcessID": ProcessID,
                 "Message": Message,
             }
 
-            sql_command = SQL
+            sql_command = (
+                f"INSERT INTO {table_name} (Facility, Priority, FromHost, "
+                "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
+                "ProcessID, Message) VALUES "
+                "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
+                ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)"
+            )
 
             if SQL_DUMP:
                 print(f"\n   SQL: {sql_command}")

          
@@ 187,14 196,18 @@ class SyslogUDPServer:
                 try:
                     async with self.db.execute(sql_command, params) as cursor:
                         pass
+                    # Optionally sync to FTS table if needed
+                    await self.db.execute(
+                        f"INSERT INTO SystemEventsFTS{year_month} (Message) VALUES (?)",
+                        (Message,),
+                    )
+                    await self.db.commit()
                 except Exception as e:
                     if DEBUG:
                         print(f"\n   SQL: {sql_command}")
                         print(f"\nPARAMS: {params}")
                         print(f"\nEXCEPT: {e}")
                     await self.db.rollback()
-                else:
-                    await self.db.commit()
 
     def start(self):
         self.endpoint, _ = self.loop.run_until_complete(

          
@@ 233,10 246,7 @@ if __name__ == "__main__":
         syslog_server = SyslogUDPServer(BINDING_IP, BINDING_PORT, loop)
         syslog_server.start()
         loop = syslog_server.loop
-
-        # Run the event loop
         loop.run_forever()
-
     except (IOError, SystemExit):
         raise
     except Exception as e: