# HG changeset patch # User Chaiwat Suttipongsakul # Date 1693290255 -25200 # Tue Aug 29 13:24:15 2023 +0700 # Node ID cb88c658c3e502684ba1ca86c200b990fbade96d # Parent 2429b47908d3a44e0f4189d90f3b480143f46aa3 black format diff --git a/priority.py b/priority.py --- a/priority.py +++ b/priority.py @@ -1,15 +1,42 @@ # -*- coding: utf-8 -*- + class SyslogMatrix(object): LEVELS = ( - 'emergency', 'alert', 'critica', 'error', - 'warning', 'notice', 'info', 'debug' + "emergency", + "alert", + "critica", + "error", + "warning", + "notice", + "info", + "debug", ) FACILITIES = ( - 'kernel', 'user', 'mail', 'system', 'security0', 'syslog', - 'lpd', 'nntp', 'uucp', 'time', 'security1', 'ftpd', 'ntpd', - 'logaudit', 'logalert', 'clock', 'local0', 'local1', 'local2', - 'local3', 'local4', 'local5', 'local6', 'local7' + "kernel", + "user", + "mail", + "system", + "security0", + "syslog", + "lpd", + "nntp", + "uucp", + "time", + "security1", + "ftpd", + "ntpd", + "logaudit", + "logalert", + "clock", + "local0", + "local1", + "local2", + "local3", + "local4", + "local5", + "local6", + "local7", ) def __init__(self): @@ -25,7 +52,7 @@ facility, level = self.matrix.get(code) return ( (facility, self.FACILITIES.index(facility)), - (level, self.LEVELS.index(level)) + (level, self.LEVELS.index(level)), ) def decode_int(self, code): diff --git a/rfc5424.py b/rfc5424.py --- a/rfc5424.py +++ b/rfc5424.py @@ -2,10 +2,11 @@ from datetime import datetime from itertools import zip_longest + def convert_rfc5424_to_rfc3164(message): # Extract the required fields from the RFC-5424 message # Example RFC-5424 message: '<189>Jun 20 06:31:32 FortiGate-101F_02 - eventtime=1687242692932574340 ... - pattern = r'<(\d+)>(\d+ ){0,1}(\S+ \d+ \d+:\d+:\d+) ([\S\s]+) - ([\S\s]+)' + pattern = r"<(\d+)>(\d+ ){0,1}(\S+ \d+ \d+:\d+:\d+) ([\S\s]+) - ([\S\s]+)" match = re.match(pattern, message) if not match: @@ -15,7 +16,7 @@ priority = match.group(1) version = match.group(2) timestamp = match.group(3) - hostname = match.group(4) #Hostname with optional appname and procid + hostname = match.group(4) # Hostname with optional appname and procid message = match.group(5) # Convert the priority to RFC-3164 format @@ -25,11 +26,12 @@ rfc3164_pri = facility * 8 + severity # Convert the timestamp to RFC-3164 format (MMM DD HH:MM:SS) - timestamp_dt = datetime.strptime(timestamp, '%b %d %H:%M:%S') - timestamp_rfc3164 = timestamp_dt.strftime('%b %d %H:%M:%S') + timestamp_dt = datetime.strptime(timestamp, "%b %d %H:%M:%S") + timestamp_rfc3164 = timestamp_dt.strftime("%b %d %H:%M:%S") # Rearrange the fields according to RFC-3164 format - rfc3164_message = '<{}>{} {}: {}'.format(rfc3164_pri, timestamp_rfc3164, hostname, message) + rfc3164_message = "<{}>{} {}: {}".format( + rfc3164_pri, timestamp_rfc3164, hostname, message + ) return rfc3164_message - diff --git a/syslogd.py b/syslogd.py --- a/syslogd.py +++ b/syslogd.py @@ -4,20 +4,23 @@ # Configuration import os -DEBUG = os.environ.get('DEBUG') == 'True' -LOG_DUMP = os.environ.get('LOG_DUMP') == 'True' -SQL_DUMP = os.environ.get('SQL_DUMP') == 'True' -SQL_WRITE = os.environ.get('SQL_WRITE') == 'True' -BINDING_IP = os.environ.get('BINDING_IP', '0.0.0.0') -BINDING_PORT = os.environ.get('BINDING_PORT', '5140') -del(os) + +DEBUG = os.environ.get("DEBUG") == "True" +LOG_DUMP = os.environ.get("LOG_DUMP") == "True" +SQL_DUMP = os.environ.get("SQL_DUMP") == "True" +SQL_WRITE = os.environ.get("SQL_WRITE") == "True" +BINDING_IP = os.environ.get("BINDING_IP", "0.0.0.0") +BINDING_PORT = os.environ.get("BINDING_PORT", "5140") +del os # Query -SQL = ("INSERT INTO SystemEvents (Facility, Priority, FromHost, " - "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, " - "ProcessID, Message) VALUES " - "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, " - ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)") +SQL = ( + "INSERT INTO SystemEvents (Facility, Priority, FromHost, " + "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, " + "ProcessID, Message) VALUES " + "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, " + ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)" +) # Code import asyncio @@ -32,6 +35,7 @@ except ImportError: uvloop = None + class SyslogUDPServer: syslog_matrix = SyslogMatrix() @@ -46,7 +50,7 @@ self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper) self.setup() - print(f'Listening on UDP {host}:{port} using {self.loop.__module__}') + print(f"Listening on UDP {host}:{port} using {self.loop.__module__}") def setup(self): if SQL_WRITE: @@ -64,7 +68,7 @@ asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None)) async def close_db_connection(self): - if SQL_WRITE and hasattr(self, 'db') and self.db: + if SQL_WRITE and hasattr(self, "db") and self.db: try: await self.db.close() except Exception as e: @@ -82,10 +86,11 @@ self.loop.stop() async def connect_to_sqlite(self): - self.db = await aiosqlite.connect('syslog.db', loop=self.loop) + self.db = await aiosqlite.connect("syslog.db", loop=self.loop) # Enable WAL mode for better write performance - await self.db.execute('PRAGMA journal_mode=WAL') - await self.db.execute('''CREATE TABLE IF NOT EXISTS SystemEvents ( + await self.db.execute("PRAGMA journal_mode=WAL") + await self.db.execute( + """CREATE TABLE IF NOT EXISTS SystemEvents ( ID INTEGER PRIMARY KEY AUTOINCREMENT, Facility INTEGER, Priority INTEGER, @@ -96,84 +101,92 @@ SysLogTag TEXT, ProcessID TEXT, Message TEXT - )''') + )""" + ) await self.db.commit() # Create an index on the ReceivedAt field - await self.db.execute('''CREATE INDEX IF NOT EXISTS idx_ReceivedAt - ON SystemEvents (ReceivedAt)''') + await self.db.execute( + """CREATE INDEX IF NOT EXISTS idx_ReceivedAt + ON SystemEvents (ReceivedAt)""" + ) await self.db.commit() # Create the virtual table with the full-text search index - await self.db.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS - USING FTS5(Message)''') + await self.db.execute( + """CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS + USING FTS5(Message)""" + ) await self.db.commit() async def escape(self, msg): return msg async def handle_datagram(self, data, address): - ''' + """ Handle RFC-3164 message e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT [m][d] [---t--] [hostn-program] [--------message-------] - ''' + """ ReceivedAt = datetime.now() # Generated data = data.decode() data = convert_rfc5424_to_rfc3164(data) if LOG_DUMP: - print('\n DATA:', data) - datetime_hostname_program = data[data.find('>')+1:data.find(': ')] + print("\n DATA:", data) + datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")] m, d, t, hostname = datetime_hostname_program.split()[:4] - formatted_datetime = '%s %s %s %s' % ( - m, d.zfill(2), t, ReceivedAt.year - ) + formatted_datetime = "%s %s %s %s" % (m, d.zfill(2), t, ReceivedAt.year) DeviceReportedTime = datetime.strptime( - formatted_datetime, '%b %d %H:%M:%S %Y' + formatted_datetime, "%b %d %H:%M:%S %Y" ) time_delta = ReceivedAt - DeviceReportedTime if abs(time_delta.days) > 1: - formatted_datetime = '%s %s %s %s' % ( - m, d.zfill(2), t, ReceivedAt.year - 1 + formatted_datetime = "%s %s %s %s" % ( + m, + d.zfill(2), + t, + ReceivedAt.year - 1, ) DeviceReportedTime = datetime.strptime( - formatted_datetime, '%b %d %H:%M:%S %Y' + formatted_datetime, "%b %d %H:%M:%S %Y" ) time_delta = ReceivedAt - DeviceReportedTime if abs(time_delta.days) > 1: pass # Something wrong, just ignore it. else: - program = '-'.join(datetime_hostname_program.split()[4:]) - if '[' in program: - SysLogTag = program[:program.find('[')] + program = "-".join(datetime_hostname_program.split()[4:]) + if "[" in program: + SysLogTag = program[: program.find("[")] else: SysLogTag = program - if '[' in program: - ProcessID = program[program.find('[')+1:program.find(']')] + if "[" in program: + ProcessID = program[program.find("[") + 1 : program.find("]")] else: - ProcessID = '0' - Message = data[data.find(': ')+2:] - code = data[data.find('<')+1:data.find('>')] + ProcessID = "0" + Message = data[data.find(": ") + 2 :] + code = data[data.find("<") + 1 : data.find(">")] Facility, Priority = self.syslog_matrix.decode_int(code) FromHost = hostname or address[0] InfoUnitID = 1 # Hardcoded params = { - 'Facility': Facility, - 'Priority': Priority, - 'FromHost': FromHost, - 'InfoUnitID': InfoUnitID, - 'ReceivedAt': ReceivedAt.strftime('%Y-%m-%d %H:%M:%S'), - 'DeviceReportedTime': DeviceReportedTime.strftime('%Y-%m-%d %H:%M:%S'), - 'SysLogTag': SysLogTag, - 'ProcessID': ProcessID, - 'Message': Message + "Facility": Facility, + "Priority": Priority, + "FromHost": FromHost, + "InfoUnitID": InfoUnitID, + "ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"), + "DeviceReportedTime": DeviceReportedTime.strftime( + "%Y-%m-%d %H:%M:%S" + ), + "SysLogTag": SysLogTag, + "ProcessID": ProcessID, + "Message": Message, } sql_command = SQL if SQL_DUMP: - print('\n SQL:', sql_command) - print('\nPARAMS:', params) + print("\n SQL:", sql_command) + print("\nPARAMS:", params) if SQL_WRITE: try: @@ -181,9 +194,9 @@ pass except Exception as e: if DEBUG: - print('\n SQL:', sql_command) - print('\nPARAMS:', params) - print('\nEXCEPT:', e) + print("\n SQL:", sql_command) + print("\nPARAMS:", params) + print("\nEXCEPT:", e) await self.db.rollback() else: await self.db.commit() @@ -192,7 +205,7 @@ self.endpoint, _ = self.loop.run_until_complete( self.loop.create_datagram_endpoint( lambda: DatagramProtocol(self.handle_datagram), - local_addr=(self.host, self.port) + local_addr=(self.host, self.port), ) ) @@ -212,11 +225,11 @@ def error_received(self, exc): if DEBUG: - print('Error received:', exc) + print("Error received:", exc) def connection_lost(self, exc): if DEBUG: - print('Closing transport') + print("Closing transport") if __name__ == "__main__": @@ -238,4 +251,3 @@ syslog_server.stop() loop.run_until_complete(syslog_server.close_db_connection()) loop.close() -