A => .hgignore +1 -0
A => priority.py +33 -0
@@ 0,0 1,33 @@
+# -*- coding: utf-8 -*-
+
+class SyslogMatrix(object):
+ LEVELS = (
+ 'emergency', 'alert', 'critica', 'error',
+ 'warning', 'notice', 'info', 'debug'
+ )
+ FACILITIES = (
+ 'kernel', 'user', 'mail', 'system', 'security0', 'syslog',
+ 'lpd', 'nntp', 'uucp', 'time', 'security1', 'ftpd', 'ntpd',
+ 'logaudit', 'logalert', 'clock', 'local0', 'local1', 'local2',
+ 'local3', 'local4', 'local5', 'local6', 'local7'
+ )
+
+ def __init__(self):
+ self.matrix = {}
+ i = 0
+ for facility in self.FACILITIES:
+ for level in self.LEVELS:
+ self.matrix[str(i)] = (facility, level)
+ i += 1
+
+ def decode(self, code):
+ code = str(code) if isinstance(code, int) else code
+ facility, level = self.matrix.get(code)
+ return (
+ (facility, self.FACILITIES.index(facility)),
+ (level, self.LEVELS.index(level))
+ )
+
+ def decode_int(self, code):
+ facility, level = self.decode(code)
+ return (facility[1], level[1])
A => requirements.pip +2 -0
@@ 0,0 1,2 @@
+aiosqlite
+
A => rfc5424.py +43 -0
@@ 0,0 1,43 @@
+import re
+from datetime import datetime
+from itertools import zip_longest
+
+def convert_rfc5424_to_rfc3164(message):
+ # Extract the required fields from the RFC-5424 message
+ # Example RFC-5424 message: '<189>Jun 20 06:31:32 FortiGate-101F_02 - eventtime=1687242692932574340 ...
+ pattern = r'<(\d+)>(\d+ ){0,1}(\S+ \d+ \d+:\d+:\d+) ([\S\s]+) - ([\S\s]+)'
+ match = re.match(pattern, message)
+
+ if not match:
+ return message
+
+ # Extract the necessary fields
+ priority = match.group(1)
+ version = match.group(2)
+ timestamp = match.group(3)
+ hostname = match.group(4)
+ message = match.group(5)
+
+ # Hostname may contains application name and process id (space separated)
+ hostname, appname, procid = [x or y for x, y in zip_longest(['', '', ''], hostname.split(' '), fillvalue='')]
+ syslog_tag = appname
+ if procid:
+ syslog_tag += '-{}'.format(procid)
+ syslog_tag = syslog_tag or '-' # In case that there is no appname and procid
+
+ # Convert the priority to RFC-3164 format
+ rfc5424_pri = int(priority)
+ facility = rfc5424_pri // 8
+ severity = rfc5424_pri % 8
+ rfc3164_pri = facility * 8 + severity
+
+
+ # Convert the timestamp to RFC-3164 format (MMM DD HH:MM:SS)
+ timestamp_dt = datetime.strptime(timestamp, '%b %d %H:%M:%S')
+ timestamp_rfc3164 = timestamp_dt.strftime('%b %d %H:%M:%S')
+
+ # Rearrange the fields according to RFC-3164 format
+ rfc3164_message = '<{}>{} {} {}: {}'.format(rfc3164_pri, timestamp_rfc3164, hostname, syslog_tag, message)
+
+ return rfc3164_message
+
A => rsyslog-forward.conf +6 -0
@@ 0,0 1,6 @@
+$ActionQueueFileName fwdRule1 # unique name prefix for spool files
+$ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible)
+$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
+$ActionQueueType LinkedList # run asynchronously
+$ActionResumeRetryCount -1 # infinite retries if host is down
+*.* @127.0.0.1:5140
A => rsyslog-minimal.conf +23 -0
@@ 0,0 1,23 @@
+$WorkDirectory /var/lib/rsyslog
+
+$FileOwner root
+$FileGroup adm
+$FileCreateMode 0640
+$DirCreateMode 0755
+$Umask 0022
+
+module(load="immark")
+module(load="imuxsock")
+module(load="imudp")
+input(
+ type="imudp"
+ port="514"
+)
+
+$ActionQueueFileName fwdRule1
+$ActionQueueMaxDiskSpace 1g
+$ActionQueueSaveOnShutdown on
+$ActionQueueType LinkedList
+$ActionResumeRetryCount -1
+*.* @127.0.0.1:5140
+
A => syslogd.py +228 -0
@@ 0,0 1,228 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+## Syslog Server in Python with asyncio and SQLite.
+
+# Configuration
+import os
+DEBUG = os.environ.get('DEBUG') == 'True'
+LOG_DUMP = os.environ.get('LOG_DUMP') == 'True'
+SQL_DUMP = os.environ.get('SQL_DUMP') == 'True'
+SQL_WRITE = os.environ.get('SQL_WRITE') == 'True'
+BINDING_IP = os.environ.get('BINDING_IP', '0.0.0.0')
+BINDING_PORT = os.environ.get('BINDING_PORT', '5140')
+del(os)
+
+# Query
+SQL = ("INSERT INTO SystemEvents (Facility, Priority, FromHost, "
+ "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
+ "ProcessID, Message) VALUES "
+ "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
+ ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)")
+
+# Code
+import asyncio
+import aiosqlite
+import signal
+from datetime import datetime
+from priority import SyslogMatrix
+from rfc5424 import convert_rfc5424_to_rfc3164
+
+class SyslogUDPServer:
+
+ syslog_matrix = SyslogMatrix()
+
+ def __init__(self, host, port, loop=None):
+ self.host = host
+ self.port = port
+ self.loop = loop or asyncio.get_event_loop()
+ self.running = False
+
+ # Register the signal handler for SIGINT (Control-C)
+ self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper)
+
+ self.setup()
+ if DEBUG:
+ print(f'Listening on UDP {host}:{port}')
+
+ def setup(self):
+ if SQL_WRITE:
+ self.loop.run_until_complete(self.connect_to_sqlite())
+
+ async def handle_sigint(self, signum, frame):
+ print("\nSIGINT received. Shutting down gracefully...")
+ self.running = False
+
+ # Run the shutdown coroutine in the event loop and wait for completion
+ await self.shutdown()
+
+ def handle_sigint_wrapper(self):
+ # Pass signum and frame arguments to handle_sigint()
+ asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None))
+
+ async def close_db_connection(self):
+ if SQL_WRITE and hasattr(self, 'db') and self.db:
+ try:
+ await self.db.close()
+ except Exception as e:
+ if DEBUG:
+ print("Error while closing the database connection:", e)
+
+ async def shutdown(self):
+ # Stop the server
+ self.stop()
+
+ # Close the database connection if SQL_WRITE is True
+ await self.close_db_connection()
+
+ # Stop the event loop
+ self.loop.stop()
+
+ async def connect_to_sqlite(self):
+ self.db = await aiosqlite.connect('syslog.db', loop=self.loop)
+ await self.db.execute('''CREATE TABLE IF NOT EXISTS SystemEvents (
+ ID INTEGER PRIMARY KEY AUTOINCREMENT,
+ Facility INTEGER,
+ Priority INTEGER,
+ FromHost TEXT,
+ InfoUnitID INTEGER,
+ ReceivedAt TIMESTAMP,
+ DeviceReportedTime TIMESTAMP,
+ SysLogTag TEXT,
+ ProcessID TEXT,
+ Message TEXT
+ )''')
+ await self.db.commit()
+
+ # Create an index on the ReceivedAt field
+ await self.db.execute('''CREATE INDEX IF NOT EXISTS idx_ReceivedAt
+ ON SystemEvents (ReceivedAt)''')
+ await self.db.commit()
+
+ # Create the virtual table with the full-text search index
+ await self.db.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS
+ USING FTS5(Message)''')
+ await self.db.commit()
+
+ async def escape(self, msg):
+ return msg
+
+ async def handle_datagram(self, data, address):
+ ReceivedAt = datetime.now() # Generated
+ data = data.decode()
+ data = convert_rfc5424_to_rfc3164(data)
+ if LOG_DUMP:
+ print(data)
+ datetime_hostname_program = data[data.find('>') + 1:data.find(': ')]
+ m, d, t = datetime_hostname_program.split()[:3]
+ formatted_datetime = '%s %s %s %s' % (
+ m, d.zfill(2), t, ReceivedAt.year
+ )
+ DeviceReportedTime = datetime.strptime(
+ formatted_datetime, '%b %d %H:%M:%S %Y'
+ )
+ time_delta = ReceivedAt - DeviceReportedTime
+ if abs(time_delta.days) > 1:
+ formatted_datetime = '%s %s %s %s' % (
+ m, d.zfill(2), t, ReceivedAt.year - 1
+ )
+ DeviceReportedTime = datetime.strptime(
+ formatted_datetime, '%b %d %H:%M:%S %Y'
+ )
+ time_delta = ReceivedAt - DeviceReportedTime
+ if abs(time_delta.days) > 1:
+ pass # Something wrong, just ignore it.
+ else:
+ hostname, program = datetime_hostname_program.split()[-2:]
+ if '[' in program:
+ SysLogTag = program[:program.find('[')]
+ else:
+ SysLogTag = program
+ if '[' in program:
+ ProcessID = program[program.find('[') + 1:program.find(']')]
+ else:
+ ProcessID = '0'
+ Message = data[data.find(': ') + 2:]
+ code = data[data.find('<') + 1:data.find('>')]
+ Facility, Priority = self.syslog_matrix.decode_int(code)
+ FromHost = hostname or address[0]
+ InfoUnitID = 1 # Hardcoded
+
+ params = {
+ 'Facility': Facility,
+ 'Priority': Priority,
+ 'FromHost': FromHost,
+ 'InfoUnitID': InfoUnitID,
+ 'ReceivedAt': ReceivedAt.strftime('%Y-%m-%d %H:%M:%S'),
+ 'DeviceReportedTime': DeviceReportedTime.strftime('%Y-%m-%d %H:%M:%S'),
+ 'SysLogTag': SysLogTag,
+ 'ProcessID': ProcessID,
+ 'Message': Message
+ }
+
+ sql_command = SQL
+
+ if SQL_DUMP:
+ print('\nSQL:', sql_command, params)
+
+ if SQL_WRITE:
+ try:
+ async with self.db.execute(sql_command, params) as cursor:
+ pass
+ except Exception as e:
+ if DEBUG:
+ print(sql_command, params)
+ print(e)
+ await self.db.rollback()
+ else:
+ await self.db.commit()
+
+ def start(self):
+ self.endpoint, _ = self.loop.run_until_complete(
+ self.loop.create_datagram_endpoint(
+ lambda: DatagramProtocol(self.handle_datagram),
+ local_addr=(self.host, self.port)
+ )
+ )
+
+ def stop(self):
+ self.endpoint.close()
+
+
+class DatagramProtocol:
+ def __init__(self, datagram_callback):
+ self.datagram_callback = datagram_callback
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def datagram_received(self, data, addr):
+ asyncio.create_task(self.datagram_callback(data, addr))
+
+ def error_received(self, exc):
+ if DEBUG:
+ print('Error received:', exc)
+
+ def connection_lost(self, exc):
+ if DEBUG:
+ print('Closing transport')
+
+
+if __name__ == "__main__":
+ try:
+ syslog_server = SyslogUDPServer(BINDING_IP, BINDING_PORT)
+ syslog_server.start()
+ loop = syslog_server.loop
+
+ # Run the event loop
+ loop.run_forever()
+
+ except (IOError, SystemExit):
+ raise
+ except Exception as e:
+ print("Error occurred:", e)
+ finally:
+ print("Shutting down the server...")
+ syslog_server.stop()
+ loop.run_until_complete(syslog_server.close_db_connection())
+ loop.close()
+