@@ 11,13 11,13 @@ SQL_DUMP = os.environ.get("SQL_DUMP") ==
SQL_WRITE = os.environ.get("SQL_WRITE") == "True"
BINDING_IP = os.environ.get("BINDING_IP", "0.0.0.0")
BINDING_PORT = os.environ.get("BINDING_PORT", "5140")
+
del os
-# Code
import asyncio
import aiosqlite
import signal
-from datetime import datetime
+from datetime import datetime, timedelta
from priority import SyslogMatrix
from rfc5424 import convert_rfc5424_to_rfc3164
@@ 28,7 28,6 @@ except ImportError:
class SyslogUDPServer:
-
syslog_matrix = SyslogMatrix()
def __init__(self, host, port, loop=None):
@@ 36,10 35,9 @@ class SyslogUDPServer:
self.port = port
self.loop = loop or asyncio.get_event_loop()
self.running = False
-
- # Register the signal handler for SIGINT (Control-C)
+ self.fts_buffer = [] # Buffer for FTS messages
+ self.last_fts_sync = None # Timestamp of last FTS sync
self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper)
-
self.setup()
print(f"Listening on UDP {host}:{port} using {self.loop.__module__}")
@@ 50,6 48,9 @@ class SyslogUDPServer:
async def handle_sigint(self, signum, frame):
print("\nSIGINT received. Shutting down gracefully...")
self.running = False
+ # Sync any remaining FTS messages before shutdown
+ if self.fts_buffer:
+ await self.sync_fts_buffer()
await self.shutdown()
def handle_sigint_wrapper(self):
@@ 70,9 71,7 @@ class SyslogUDPServer:
async def connect_to_sqlite(self):
self.db = await aiosqlite.connect("syslog.db", loop=self.loop)
- # Enable WAL mode for better write performance
await self.db.execute("PRAGMA journal_mode=WAL")
- # Enable auto_vacuum to keep the database size in check
await self.db.execute("PRAGMA auto_vacuum = FULL")
await self.db.commit()
@@ 105,37 104,50 @@ class SyslogUDPServer:
return table_name
def escape(self, msg):
- """
- Escapes special characters in the message to prevent SQL injection.
- Handles strings safely for SQLite insertion.
- """
if not isinstance(msg, str):
return str(msg)
- escaped_msg = (
- msg.replace("'", "''") # Escapes single quotes for SQLite
- .replace('"', '""') # Escapes double quotes
- .replace("\\", "\\\\") # Escapes backslashes
- )
- return escaped_msg
+ return msg.replace("'", "''").replace('"', '""').replace("\\", "\\\\")
+
+ async def sync_fts_buffer(self, table_name=None):
+ if not self.fts_buffer:
+ return
+
+ year_month = self.fts_buffer[0]["ReceivedAt"].strftime("%Y%m")
+ fts_table_name = f"SystemEventsFTS{year_month}"
+ messages = [(entry["Message"],) for entry in self.fts_buffer]
+
+ try:
+ await self.db.executemany(
+ f"INSERT INTO {fts_table_name} (Message) VALUES (?)", messages
+ )
+ await self.db.commit()
+ self.last_fts_sync = datetime.now()
+ self.fts_buffer = []
+ if DEBUG:
+ print(f"Synced {len(messages)} messages to {fts_table_name}")
+ except Exception as e:
+ if DEBUG:
+ print(f"Failed to sync FTS buffer: {e}")
+ await self.db.rollback()
async def handle_datagram(self, data, address):
- """
- Handle RFC-3164 message
- e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT
- [m][d] [---t--] [hostn-program] [--------message-------]
- """
ReceivedAt = datetime.now()
data = data.decode()
data = convert_rfc5424_to_rfc3164(data)
if LOG_DUMP:
print(f"\n DATA: {data}")
- # Get current year and month for table name
- year_month = ReceivedAt.strftime("%Y%m") # e.g., "202503"
+ year_month = ReceivedAt.strftime("%Y%m")
table_name = await self.create_monthly_table(year_month)
datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")]
- m, d, t, hostname = datetime_hostname_program.split()[:4]
+ try:
+ m, d, t, hostname = datetime_hostname_program.split()[:4]
+ except ValueError:
+ if DEBUG:
+ print(f"Invalid message format: {data}")
+ return
+
formatted_datetime = f"{m} {d.zfill(2)} {t} {ReceivedAt.year}"
DeviceReportedTime = datetime.strptime(formatted_datetime, "%b %d %H:%M:%S %Y")
time_delta = ReceivedAt - DeviceReportedTime
@@ 146,17 158,15 @@ class SyslogUDPServer:
)
time_delta = ReceivedAt - DeviceReportedTime
if abs(time_delta.days) > 1:
- pass # Something wrong, just ignore it.
+ pass
else:
program = "-".join(datetime_hostname_program.split()[4:])
- if "[" in program:
- SysLogTag = program[: program.find("[")]
- else:
- SysLogTag = program
- if "[" in program:
- ProcessID = program[program.find("[") + 1 : program.find("]")]
- else:
- ProcessID = "0"
+ SysLogTag = program[: program.find("[")] if "[" in program else program
+ ProcessID = (
+ program[program.find("[") + 1 : program.find("]")]
+ if "[" in program
+ else "0"
+ )
Message = data[data.find(": ") + 2 :]
code = data[data.find("<") + 1 : data.find(">")]
Facility, Priority = self.syslog_matrix.decode_int(code)
@@ 173,8 183,8 @@ class SyslogUDPServer:
"Priority": Priority,
"FromHost": FromHost,
"InfoUnitID": InfoUnitID,
- "ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"),
- "DeviceReportedTime": DeviceReportedTime.strftime("%Y-%m-%d %H:%M:%S"),
+ "ReceivedAt": ReceivedAt,
+ "DeviceReportedTime": DeviceReportedTime,
"SysLogTag": SysLogTag,
"ProcessID": ProcessID,
"Message": Message,
@@ 196,12 206,20 @@ class SyslogUDPServer:
try:
async with self.db.execute(sql_command, params) as cursor:
pass
- # Optionally sync to FTS table if needed
- await self.db.execute(
- f"INSERT INTO SystemEventsFTS{year_month} (Message) VALUES (?)",
- (Message,),
- )
await self.db.commit()
+
+ # Add to FTS buffer
+ self.fts_buffer.append(params)
+
+ # Check buffer size or time since last sync
+ if len(self.fts_buffer) >= 100:
+ await self.sync_fts_buffer(table_name)
+ elif (
+ self.last_fts_sync
+ and (ReceivedAt - self.last_fts_sync).total_seconds() >= 10
+ ):
+ await self.sync_fts_buffer(table_name)
+
except Exception as e:
if DEBUG:
print(f"\n SQL: {sql_command}")