# HG changeset patch # User Chaiwat Suttipongsakul # Date 1690638306 -25200 # Sat Jul 29 20:45:06 2023 +0700 # Node ID 15d13a5b3768e953b07e6e42b750ca20ed61b8e3 # Parent 0000000000000000000000000000000000000000 clone from pySyslog and modified to use AsyncIO and SQLite diff --git a/.hgignore b/.hgignore new file mode 100644 --- /dev/null +++ b/.hgignore @@ -0,0 +1,1 @@ +syslog.db diff --git a/priority.py b/priority.py new file mode 100644 --- /dev/null +++ b/priority.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + +class SyslogMatrix(object): + LEVELS = ( + 'emergency', 'alert', 'critica', 'error', + 'warning', 'notice', 'info', 'debug' + ) + FACILITIES = ( + 'kernel', 'user', 'mail', 'system', 'security0', 'syslog', + 'lpd', 'nntp', 'uucp', 'time', 'security1', 'ftpd', 'ntpd', + 'logaudit', 'logalert', 'clock', 'local0', 'local1', 'local2', + 'local3', 'local4', 'local5', 'local6', 'local7' + ) + + def __init__(self): + self.matrix = {} + i = 0 + for facility in self.FACILITIES: + for level in self.LEVELS: + self.matrix[str(i)] = (facility, level) + i += 1 + + def decode(self, code): + code = str(code) if isinstance(code, int) else code + facility, level = self.matrix.get(code) + return ( + (facility, self.FACILITIES.index(facility)), + (level, self.LEVELS.index(level)) + ) + + def decode_int(self, code): + facility, level = self.decode(code) + return (facility[1], level[1]) diff --git a/requirements.pip b/requirements.pip new file mode 100644 --- /dev/null +++ b/requirements.pip @@ -0,0 +1,2 @@ +aiosqlite + diff --git a/rfc5424.py b/rfc5424.py new file mode 100644 --- /dev/null +++ b/rfc5424.py @@ -0,0 +1,43 @@ +import re +from datetime import datetime +from itertools import zip_longest + +def convert_rfc5424_to_rfc3164(message): + # Extract the required fields from the RFC-5424 message + # Example RFC-5424 message: '<189>Jun 20 06:31:32 FortiGate-101F_02 - eventtime=1687242692932574340 ... + pattern = r'<(\d+)>(\d+ ){0,1}(\S+ \d+ \d+:\d+:\d+) ([\S\s]+) - ([\S\s]+)' + match = re.match(pattern, message) + + if not match: + return message + + # Extract the necessary fields + priority = match.group(1) + version = match.group(2) + timestamp = match.group(3) + hostname = match.group(4) + message = match.group(5) + + # Hostname may contains application name and process id (space separated) + hostname, appname, procid = [x or y for x, y in zip_longest(['', '', ''], hostname.split(' '), fillvalue='')] + syslog_tag = appname + if procid: + syslog_tag += '-{}'.format(procid) + syslog_tag = syslog_tag or '-' # In case that there is no appname and procid + + # Convert the priority to RFC-3164 format + rfc5424_pri = int(priority) + facility = rfc5424_pri // 8 + severity = rfc5424_pri % 8 + rfc3164_pri = facility * 8 + severity + + + # Convert the timestamp to RFC-3164 format (MMM DD HH:MM:SS) + timestamp_dt = datetime.strptime(timestamp, '%b %d %H:%M:%S') + timestamp_rfc3164 = timestamp_dt.strftime('%b %d %H:%M:%S') + + # Rearrange the fields according to RFC-3164 format + rfc3164_message = '<{}>{} {} {}: {}'.format(rfc3164_pri, timestamp_rfc3164, hostname, syslog_tag, message) + + return rfc3164_message + diff --git a/rsyslog-forward.conf b/rsyslog-forward.conf new file mode 100644 --- /dev/null +++ b/rsyslog-forward.conf @@ -0,0 +1,6 @@ +$ActionQueueFileName fwdRule1 # unique name prefix for spool files +$ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible) +$ActionQueueSaveOnShutdown on # save messages to disk on shutdown +$ActionQueueType LinkedList # run asynchronously +$ActionResumeRetryCount -1 # infinite retries if host is down +*.* @127.0.0.1:5140 diff --git a/rsyslog-minimal.conf b/rsyslog-minimal.conf new file mode 100644 --- /dev/null +++ b/rsyslog-minimal.conf @@ -0,0 +1,23 @@ +$WorkDirectory /var/lib/rsyslog + +$FileOwner root +$FileGroup adm +$FileCreateMode 0640 +$DirCreateMode 0755 +$Umask 0022 + +module(load="immark") +module(load="imuxsock") +module(load="imudp") +input( + type="imudp" + port="514" +) + +$ActionQueueFileName fwdRule1 +$ActionQueueMaxDiskSpace 1g +$ActionQueueSaveOnShutdown on +$ActionQueueType LinkedList +$ActionResumeRetryCount -1 +*.* @127.0.0.1:5140 + diff --git a/syslogd.py b/syslogd.py new file mode 100755 --- /dev/null +++ b/syslogd.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +## Syslog Server in Python with asyncio and SQLite. + +# Configuration +import os +DEBUG = os.environ.get('DEBUG') == 'True' +LOG_DUMP = os.environ.get('LOG_DUMP') == 'True' +SQL_DUMP = os.environ.get('SQL_DUMP') == 'True' +SQL_WRITE = os.environ.get('SQL_WRITE') == 'True' +BINDING_IP = os.environ.get('BINDING_IP', '0.0.0.0') +BINDING_PORT = os.environ.get('BINDING_PORT', '5140') +del(os) + +# Query +SQL = ("INSERT INTO SystemEvents (Facility, Priority, FromHost, " + "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, " + "ProcessID, Message) VALUES " + "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, " + ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)") + +# Code +import asyncio +import aiosqlite +import signal +from datetime import datetime +from priority import SyslogMatrix +from rfc5424 import convert_rfc5424_to_rfc3164 + +class SyslogUDPServer: + + syslog_matrix = SyslogMatrix() + + def __init__(self, host, port, loop=None): + self.host = host + self.port = port + self.loop = loop or asyncio.get_event_loop() + self.running = False + + # Register the signal handler for SIGINT (Control-C) + self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper) + + self.setup() + if DEBUG: + print(f'Listening on UDP {host}:{port}') + + def setup(self): + if SQL_WRITE: + self.loop.run_until_complete(self.connect_to_sqlite()) + + async def handle_sigint(self, signum, frame): + print("\nSIGINT received. Shutting down gracefully...") + self.running = False + + # Run the shutdown coroutine in the event loop and wait for completion + await self.shutdown() + + def handle_sigint_wrapper(self): + # Pass signum and frame arguments to handle_sigint() + asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None)) + + async def close_db_connection(self): + if SQL_WRITE and hasattr(self, 'db') and self.db: + try: + await self.db.close() + except Exception as e: + if DEBUG: + print("Error while closing the database connection:", e) + + async def shutdown(self): + # Stop the server + self.stop() + + # Close the database connection if SQL_WRITE is True + await self.close_db_connection() + + # Stop the event loop + self.loop.stop() + + async def connect_to_sqlite(self): + self.db = await aiosqlite.connect('syslog.db', loop=self.loop) + await self.db.execute('''CREATE TABLE IF NOT EXISTS SystemEvents ( + ID INTEGER PRIMARY KEY AUTOINCREMENT, + Facility INTEGER, + Priority INTEGER, + FromHost TEXT, + InfoUnitID INTEGER, + ReceivedAt TIMESTAMP, + DeviceReportedTime TIMESTAMP, + SysLogTag TEXT, + ProcessID TEXT, + Message TEXT + )''') + await self.db.commit() + + # Create an index on the ReceivedAt field + await self.db.execute('''CREATE INDEX IF NOT EXISTS idx_ReceivedAt + ON SystemEvents (ReceivedAt)''') + await self.db.commit() + + # Create the virtual table with the full-text search index + await self.db.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS + USING FTS5(Message)''') + await self.db.commit() + + async def escape(self, msg): + return msg + + async def handle_datagram(self, data, address): + ReceivedAt = datetime.now() # Generated + data = data.decode() + data = convert_rfc5424_to_rfc3164(data) + if LOG_DUMP: + print(data) + datetime_hostname_program = data[data.find('>') + 1:data.find(': ')] + m, d, t = datetime_hostname_program.split()[:3] + formatted_datetime = '%s %s %s %s' % ( + m, d.zfill(2), t, ReceivedAt.year + ) + DeviceReportedTime = datetime.strptime( + formatted_datetime, '%b %d %H:%M:%S %Y' + ) + time_delta = ReceivedAt - DeviceReportedTime + if abs(time_delta.days) > 1: + formatted_datetime = '%s %s %s %s' % ( + m, d.zfill(2), t, ReceivedAt.year - 1 + ) + DeviceReportedTime = datetime.strptime( + formatted_datetime, '%b %d %H:%M:%S %Y' + ) + time_delta = ReceivedAt - DeviceReportedTime + if abs(time_delta.days) > 1: + pass # Something wrong, just ignore it. + else: + hostname, program = datetime_hostname_program.split()[-2:] + if '[' in program: + SysLogTag = program[:program.find('[')] + else: + SysLogTag = program + if '[' in program: + ProcessID = program[program.find('[') + 1:program.find(']')] + else: + ProcessID = '0' + Message = data[data.find(': ') + 2:] + code = data[data.find('<') + 1:data.find('>')] + Facility, Priority = self.syslog_matrix.decode_int(code) + FromHost = hostname or address[0] + InfoUnitID = 1 # Hardcoded + + params = { + 'Facility': Facility, + 'Priority': Priority, + 'FromHost': FromHost, + 'InfoUnitID': InfoUnitID, + 'ReceivedAt': ReceivedAt.strftime('%Y-%m-%d %H:%M:%S'), + 'DeviceReportedTime': DeviceReportedTime.strftime('%Y-%m-%d %H:%M:%S'), + 'SysLogTag': SysLogTag, + 'ProcessID': ProcessID, + 'Message': Message + } + + sql_command = SQL + + if SQL_DUMP: + print('\nSQL:', sql_command, params) + + if SQL_WRITE: + try: + async with self.db.execute(sql_command, params) as cursor: + pass + except Exception as e: + if DEBUG: + print(sql_command, params) + print(e) + await self.db.rollback() + else: + await self.db.commit() + + def start(self): + self.endpoint, _ = self.loop.run_until_complete( + self.loop.create_datagram_endpoint( + lambda: DatagramProtocol(self.handle_datagram), + local_addr=(self.host, self.port) + ) + ) + + def stop(self): + self.endpoint.close() + + +class DatagramProtocol: + def __init__(self, datagram_callback): + self.datagram_callback = datagram_callback + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + asyncio.create_task(self.datagram_callback(data, addr)) + + def error_received(self, exc): + if DEBUG: + print('Error received:', exc) + + def connection_lost(self, exc): + if DEBUG: + print('Closing transport') + + +if __name__ == "__main__": + try: + syslog_server = SyslogUDPServer(BINDING_IP, BINDING_PORT) + syslog_server.start() + loop = syslog_server.loop + + # Run the event loop + loop.run_forever() + + except (IOError, SystemExit): + raise + except Exception as e: + print("Error occurred:", e) + finally: + print("Shutting down the server...") + syslog_server.stop() + loop.run_until_complete(syslog_server.close_db_connection()) + loop.close() +