@@ 1,15 1,42 @@
# -*- coding: utf-8 -*-
+
class SyslogMatrix(object):
LEVELS = (
- 'emergency', 'alert', 'critica', 'error',
- 'warning', 'notice', 'info', 'debug'
+ "emergency",
+ "alert",
+ "critica",
+ "error",
+ "warning",
+ "notice",
+ "info",
+ "debug",
)
FACILITIES = (
- 'kernel', 'user', 'mail', 'system', 'security0', 'syslog',
- 'lpd', 'nntp', 'uucp', 'time', 'security1', 'ftpd', 'ntpd',
- 'logaudit', 'logalert', 'clock', 'local0', 'local1', 'local2',
- 'local3', 'local4', 'local5', 'local6', 'local7'
+ "kernel",
+ "user",
+ "mail",
+ "system",
+ "security0",
+ "syslog",
+ "lpd",
+ "nntp",
+ "uucp",
+ "time",
+ "security1",
+ "ftpd",
+ "ntpd",
+ "logaudit",
+ "logalert",
+ "clock",
+ "local0",
+ "local1",
+ "local2",
+ "local3",
+ "local4",
+ "local5",
+ "local6",
+ "local7",
)
def __init__(self):
@@ 25,7 52,7 @@ class SyslogMatrix(object):
facility, level = self.matrix.get(code)
return (
(facility, self.FACILITIES.index(facility)),
- (level, self.LEVELS.index(level))
+ (level, self.LEVELS.index(level)),
)
def decode_int(self, code):
@@ 4,20 4,23 @@
# Configuration
import os
-DEBUG = os.environ.get('DEBUG') == 'True'
-LOG_DUMP = os.environ.get('LOG_DUMP') == 'True'
-SQL_DUMP = os.environ.get('SQL_DUMP') == 'True'
-SQL_WRITE = os.environ.get('SQL_WRITE') == 'True'
-BINDING_IP = os.environ.get('BINDING_IP', '0.0.0.0')
-BINDING_PORT = os.environ.get('BINDING_PORT', '5140')
-del(os)
+
+DEBUG = os.environ.get("DEBUG") == "True"
+LOG_DUMP = os.environ.get("LOG_DUMP") == "True"
+SQL_DUMP = os.environ.get("SQL_DUMP") == "True"
+SQL_WRITE = os.environ.get("SQL_WRITE") == "True"
+BINDING_IP = os.environ.get("BINDING_IP", "0.0.0.0")
+BINDING_PORT = os.environ.get("BINDING_PORT", "5140")
+del os
# Query
-SQL = ("INSERT INTO SystemEvents (Facility, Priority, FromHost, "
- "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
- "ProcessID, Message) VALUES "
- "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
- ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)")
+SQL = (
+ "INSERT INTO SystemEvents (Facility, Priority, FromHost, "
+ "InfoUnitID, ReceivedAt, DeviceReportedTime, SysLogTag, "
+ "ProcessID, Message) VALUES "
+ "(:Facility, :Priority, :FromHost, :InfoUnitID, :ReceivedAt, "
+ ":DeviceReportedTime, :SysLogTag, :ProcessID, :Message)"
+)
# Code
import asyncio
@@ 32,6 35,7 @@ try:
except ImportError:
uvloop = None
+
class SyslogUDPServer:
syslog_matrix = SyslogMatrix()
@@ 46,7 50,7 @@ class SyslogUDPServer:
self.loop.add_signal_handler(signal.SIGINT, self.handle_sigint_wrapper)
self.setup()
- print(f'Listening on UDP {host}:{port} using {self.loop.__module__}')
+ print(f"Listening on UDP {host}:{port} using {self.loop.__module__}")
def setup(self):
if SQL_WRITE:
@@ 64,7 68,7 @@ class SyslogUDPServer:
asyncio.ensure_future(self.handle_sigint(signal.SIGINT, None))
async def close_db_connection(self):
- if SQL_WRITE and hasattr(self, 'db') and self.db:
+ if SQL_WRITE and hasattr(self, "db") and self.db:
try:
await self.db.close()
except Exception as e:
@@ 82,10 86,11 @@ class SyslogUDPServer:
self.loop.stop()
async def connect_to_sqlite(self):
- self.db = await aiosqlite.connect('syslog.db', loop=self.loop)
+ self.db = await aiosqlite.connect("syslog.db", loop=self.loop)
# Enable WAL mode for better write performance
- await self.db.execute('PRAGMA journal_mode=WAL')
- await self.db.execute('''CREATE TABLE IF NOT EXISTS SystemEvents (
+ await self.db.execute("PRAGMA journal_mode=WAL")
+ await self.db.execute(
+ """CREATE TABLE IF NOT EXISTS SystemEvents (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
Facility INTEGER,
Priority INTEGER,
@@ 96,84 101,92 @@ class SyslogUDPServer:
SysLogTag TEXT,
ProcessID TEXT,
Message TEXT
- )''')
+ )"""
+ )
await self.db.commit()
# Create an index on the ReceivedAt field
- await self.db.execute('''CREATE INDEX IF NOT EXISTS idx_ReceivedAt
- ON SystemEvents (ReceivedAt)''')
+ await self.db.execute(
+ """CREATE INDEX IF NOT EXISTS idx_ReceivedAt
+ ON SystemEvents (ReceivedAt)"""
+ )
await self.db.commit()
# Create the virtual table with the full-text search index
- await self.db.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS
- USING FTS5(Message)''')
+ await self.db.execute(
+ """CREATE VIRTUAL TABLE IF NOT EXISTS SystemEventsFTS
+ USING FTS5(Message)"""
+ )
await self.db.commit()
async def escape(self, msg):
return msg
async def handle_datagram(self, data, address):
- '''
+ """
Handle RFC-3164 message
e.g. Jun 21 10:54:52 Main-LB forward: in:LAN out:eth11-WAN-TOT
[m][d] [---t--] [hostn-program] [--------message-------]
- '''
+ """
ReceivedAt = datetime.now() # Generated
data = data.decode()
data = convert_rfc5424_to_rfc3164(data)
if LOG_DUMP:
- print('\n DATA:', data)
- datetime_hostname_program = data[data.find('>')+1:data.find(': ')]
+ print("\n DATA:", data)
+ datetime_hostname_program = data[data.find(">") + 1 : data.find(": ")]
m, d, t, hostname = datetime_hostname_program.split()[:4]
- formatted_datetime = '%s %s %s %s' % (
- m, d.zfill(2), t, ReceivedAt.year
- )
+ formatted_datetime = "%s %s %s %s" % (m, d.zfill(2), t, ReceivedAt.year)
DeviceReportedTime = datetime.strptime(
- formatted_datetime, '%b %d %H:%M:%S %Y'
+ formatted_datetime, "%b %d %H:%M:%S %Y"
)
time_delta = ReceivedAt - DeviceReportedTime
if abs(time_delta.days) > 1:
- formatted_datetime = '%s %s %s %s' % (
- m, d.zfill(2), t, ReceivedAt.year - 1
+ formatted_datetime = "%s %s %s %s" % (
+ m,
+ d.zfill(2),
+ t,
+ ReceivedAt.year - 1,
)
DeviceReportedTime = datetime.strptime(
- formatted_datetime, '%b %d %H:%M:%S %Y'
+ formatted_datetime, "%b %d %H:%M:%S %Y"
)
time_delta = ReceivedAt - DeviceReportedTime
if abs(time_delta.days) > 1:
pass # Something wrong, just ignore it.
else:
- program = '-'.join(datetime_hostname_program.split()[4:])
- if '[' in program:
- SysLogTag = program[:program.find('[')]
+ program = "-".join(datetime_hostname_program.split()[4:])
+ if "[" in program:
+ SysLogTag = program[: program.find("[")]
else:
SysLogTag = program
- if '[' in program:
- ProcessID = program[program.find('[')+1:program.find(']')]
+ if "[" in program:
+ ProcessID = program[program.find("[") + 1 : program.find("]")]
else:
- ProcessID = '0'
- Message = data[data.find(': ')+2:]
- code = data[data.find('<')+1:data.find('>')]
+ ProcessID = "0"
+ Message = data[data.find(": ") + 2 :]
+ code = data[data.find("<") + 1 : data.find(">")]
Facility, Priority = self.syslog_matrix.decode_int(code)
FromHost = hostname or address[0]
InfoUnitID = 1 # Hardcoded
params = {
- 'Facility': Facility,
- 'Priority': Priority,
- 'FromHost': FromHost,
- 'InfoUnitID': InfoUnitID,
- 'ReceivedAt': ReceivedAt.strftime('%Y-%m-%d %H:%M:%S'),
- 'DeviceReportedTime': DeviceReportedTime.strftime('%Y-%m-%d %H:%M:%S'),
- 'SysLogTag': SysLogTag,
- 'ProcessID': ProcessID,
- 'Message': Message
+ "Facility": Facility,
+ "Priority": Priority,
+ "FromHost": FromHost,
+ "InfoUnitID": InfoUnitID,
+ "ReceivedAt": ReceivedAt.strftime("%Y-%m-%d %H:%M:%S"),
+ "DeviceReportedTime": DeviceReportedTime.strftime(
+ "%Y-%m-%d %H:%M:%S"
+ ),
+ "SysLogTag": SysLogTag,
+ "ProcessID": ProcessID,
+ "Message": Message,
}
sql_command = SQL
if SQL_DUMP:
- print('\n SQL:', sql_command)
- print('\nPARAMS:', params)
+ print("\n SQL:", sql_command)
+ print("\nPARAMS:", params)
if SQL_WRITE:
try:
@@ 181,9 194,9 @@ class SyslogUDPServer:
pass
except Exception as e:
if DEBUG:
- print('\n SQL:', sql_command)
- print('\nPARAMS:', params)
- print('\nEXCEPT:', e)
+ print("\n SQL:", sql_command)
+ print("\nPARAMS:", params)
+ print("\nEXCEPT:", e)
await self.db.rollback()
else:
await self.db.commit()
@@ 192,7 205,7 @@ class SyslogUDPServer:
self.endpoint, _ = self.loop.run_until_complete(
self.loop.create_datagram_endpoint(
lambda: DatagramProtocol(self.handle_datagram),
- local_addr=(self.host, self.port)
+ local_addr=(self.host, self.port),
)
)
@@ 212,11 225,11 @@ class DatagramProtocol:
def error_received(self, exc):
if DEBUG:
- print('Error received:', exc)
+ print("Error received:", exc)
def connection_lost(self, exc):
if DEBUG:
- print('Closing transport')
+ print("Closing transport")
if __name__ == "__main__":
@@ 238,4 251,3 @@ if __name__ == "__main__":
syslog_server.stop()
loop.run_until_complete(syslog_server.close_db_connection())
loop.close()
-