@@ 49,7 49,9 @@ class SyslogMatrix:
def decode(self, code):
code = str(code) if isinstance(code, int) else code
- facility, level = self.matrix.get(code)
+ facility, level = self.matrix.get(
+ code, ("kernel", "emergency")
+ ) # Fallback to 0, 0
return (
(facility, self.FACILITIES.index(facility)),
(level, self.LEVELS.index(level)),
@@ 13,15 13,6 @@ BINDING_IP = os.environ.get("BINDING_IP"
BINDING_PORT = os.environ.get("BINDING_PORT", "5140")
del os
-# Query
-SQL = (
- "INSERT INTO SystemEvents (Facility, Priority, FromHost, "
- "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
- "ProcessID, Message) VALUES "
- "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
- ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)"
-)
-
# Code
import asyncio
import aiosqlite
@@ 59,12 50,9 @@ class SyslogUDPServer:
async def handle_sigint(self, signum, frame):
print("\nSIGINT received. Shutting down gracefully...")
self.running = False
-
- # Run the shutdown coroutine in the event loop and wait for completion
await self.shutdown()
def handle_sigint_wrapper(self):
- # Pass signum and frame arguments to handle_sigint()
asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None))
async def close_db_connection(self):
@@ 76,21 64,23 @@ class SyslogUDPServer:
print(f"Error while closing the database connection: {e}")
async def shutdown(self):
- # Stop the server
self.stop()
-
- # Close the database connection if SQL_WRITE is True
await self.close_db_connection()
-
- # Stop the event loop
self.loop.stop()
async def connect_to_sqlite(self):
self.db = await aiosqlite.connect("syslog.db", loop=self.loop)
# Enable WAL mode for better write performance
await self.db.execute("PRAGMA journal_mode=WAL")
+ # Enable auto_vacuum to keep the database size in check
+ await self.db.execute("PRAGMA auto_vacuum = FULL")
+ await self.db.commit()
+
+ async def create_monthly_table(self, year_month):
+ table_name = f"SystemEvents{year_month}"
+ fts_table_name = f"SystemEventsFTS{year_month}"
await self.db.execute(
- """CREATE TABLE IF NOT EXISTS SystemEvents (
+ f"""CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
Facility INTEGER,
Priority INTEGER,
@@ 103,24 93,30 @@ class SyslogUDPServer:
Message TEXT
)"""
)
- await self.db.commit()
-
- # Create an index on the ReceivedAt field
await self.db.execute(
- """CREATE INDEX IF NOT EXISTS idx_ReceivedAt
- ON SystemEvents (ReceivedAt)"""
+ f"""CREATE INDEX IF NOT EXISTS idx_ReceivedAt_{year_month}
+ ON {table_name} (ReceivedAt)"""
)
- await self.db.commit()
-
- # Create the virtual table with the full-text search index
await self.db.execute(
- """CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS
+ f"""CREATE VIRTUAL TABLE IF NOT EXISTS {fts_table_name}
USING FTS5(Message)"""
)
await self.db.commit()
+ return table_name
- async def escape(self, msg):
- return msg
+ def escape(self, msg):
+ """
+ Escapes special characters in the message to prevent SQL injection.
+ Handles strings safely for SQLite insertion.
+ """
+ if not isinstance(msg, str):
+ return str(msg)
+ escaped_msg = (
+ msg.replace("'", "''") # Escapes single quotes for SQLite
+ .replace('"', '""') # Escapes double quotes
+ .replace("\\", "\\\\") # Escapes backslashes
+ )
+ return escaped_msg
async def handle_datagram(self, data, address):
"""
@@ 128,17 124,20 @@ class SyslogUDPServer:
e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT
[m][d] [---t--] [hostn-program] [--------message-------]
"""
- ReceivedAt = datetime.now() # Generated
+ ReceivedAt = datetime.now()
data = data.decode()
data = convert_rfc5424_to_rfc3164(data)
if LOG_DUMP:
print(f"\n DATA: {data}")
+
+ # Get current year and month for table name
+ year_month = ReceivedAt.strftime("%Y%m") # e.g., "202503"
+ table_name = await self.create_monthly_table(year_month)
+
datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")]
m, d, t, hostname = datetime_hostname_program.split()[:4]
formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year}"
- DeviceReportedTime = datetime.strptime(
- formatted_datetime, "%b %d %H:%M:%S %Y"
- )
+ DeviceReportedTime = datetime.strptime(formatted_datetime, "%b %d %H:%M:%S %Y")
time_delta = ReceivedAt - DeviceReportedTime
if abs(time_delta.days) > 1:
formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year - 1}"
@@ 162,22 161,32 @@ class SyslogUDPServer:
code = data[data.find("<") + 1 : data.find(">")]
Facility, Priority = self.syslog_matrix.decode_int(code)
FromHost = hostname or address[0]
- InfoUnitID = 1 # Hardcoded
+ InfoUnitID = 1
+
+ FromHost = self.escape(FromHost)
+ SysLogTag = self.escape(SysLogTag)
+ ProcessID = self.escape(ProcessID)
+ Message = self.escape(Message)
+
params = {
"Facility": Facility,
"Priority": Priority,
"FromHost": FromHost,
"InfoUnitID": InfoUnitID,
"ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"),
- "DeviceReportedTime": DeviceReportedTime.strftime(
- "%Y-%m-%d %H:%M:%S"
- ),
+ "DeviceReportedTime": DeviceReportedTime.strftime("%Y-%m-%d %H:%M:%S"),
"SysLogTag": SysLogTag,
"ProcessID": ProcessID,
"Message": Message,
}
- sql_command = SQL
+ sql_command = (
+ f"INSERT INTO {table_name} (Facility, Priority, FromHost, "
+ "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
+ "ProcessID, Message) VALUES "
+ "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
+ ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)"
+ )
if SQL_DUMP:
print(f"\n SQL: {sql_command}")
@@ 187,14 196,18 @@ class SyslogUDPServer:
try:
async with self.db.execute(sql_command, params) as cursor:
pass
+ # Optionally sync to FTS table if needed
+ await self.db.execute(
+ f"INSERT INTO SystemEventsFTS{year_month} (Message) VALUES (?)",
+ (Message,),
+ )
+ await self.db.commit()
except Exception as e:
if DEBUG:
print(f"\n SQL: {sql_command}")
print(f"\nPARAMS: {params}")
print(f"\nEXCEPT: {e}")
await self.db.rollback()
- else:
- await self.db.commit()
def start(self):
self.endpoint, _ = self.loop.run_until_complete(
@@ 233,10 246,7 @@ if __name__ == "__main__":
syslog_server = SyslogUDPServer(BINDING_IP, BINDING_PORT, loop)
syslog_server.start()
loop = syslog_server.loop
-
- # Run the event loop
loop.run_forever()
-
except (IOError, SystemExit):
raise
except Exception as e: