M aiosyslogd/server.py +10 -12
@@ 8,13 8,13 @@ from .priority import SyslogMatrix
from .rfc5424 import RFC5424_PATTERN, normalize_to_rfc5424
from datetime import datetime
from importlib import import_module
+from loguru import logger
from types import ModuleType
from typing import Dict, Any, Tuple, List, Type, Self
import asyncio
import re
import signal
import sys
-from loguru import logger
uvloop: ModuleType | None = None
try:
@@ 29,10 29,11 @@ except ImportError:
CFG = config.load_config()
# Server settings
-DEBUG: bool = CFG.get("server", {}).get("debug", False)
-LOG_DUMP: bool = CFG.get("server", {}).get("log_dump", False)
-BINDING_IP: str = CFG.get("server", {}).get("bind_ip", "0.0.0.0")
-BINDING_PORT: int = int(CFG.get("server", {}).get("bind_port", 5140))
+SERVER_CFG = CFG.get("server", {})
+DEBUG: bool = SERVER_CFG.get("debug", False)
+LOG_DUMP: bool = SERVER_CFG.get("log_dump", False)
+BINDING_IP: str = SERVER_CFG.get("bind_ip", "0.0.0.0")
+BINDING_PORT: int = int(SERVER_CFG.get("bind_port", 5140))
# Database settings
DB_CFG = CFG.get("database", {})
@@ 152,9 153,8 @@ class SyslogUDPServer(asyncio.DatagramPr
batch.clear()
except asyncio.CancelledError:
break
- except Exception as e:
- logger.opt(exception=True).error(f"Error in database writer")
- logger.debug(str(e))
+ except Exception:
+ logger.opt(exception=True).error("Error in database writer")
if batch and self.db:
await self.db.write_batch(batch) # Final write
logger.info("Database writer task finished.")
@@ 173,7 173,7 @@ class SyslogUDPServer(asyncio.DatagramPr
decoded_data, debug_mode=DEBUG
)
if LOG_DUMP:
- logger.trace(f"FROM {address[0]}: {processed_data}")
+ logger.debug(f"FROM {address[0]}: {processed_data}")
match: re.Match[str] | None = RFC5424_PATTERN.match(processed_data)
if not match:
@@ 256,9 256,7 @@ async def run_server() -> None:
def main() -> None:
"""CLI Entry point."""
- log_level = "DEBUG" if DEBUG else "INFO"
- if LOG_DUMP:
- log_level = "TRACE"
+ log_level = "DEBUG" if DEBUG or LOG_DUMP else "INFO"
logger.remove()
logger.add(
M aiosyslogd/web.py +45 -41
@@ 198,38 198,49 @@ async def startup() -> None:
@app.route("/")
async def index() -> str | Response:
"""Main route for displaying and searching logs."""
- available_dbs: List[str] = get_available_databases()
- if not available_dbs:
- return await render_template(
- "index.html",
- error="No SQLite database files found. Ensure `aiosyslogd` has run and created logs.",
+ context: Dict[str, Any] = {
+ "logs": [],
+ "total_logs": 0,
+ "query_time": 0.0,
+ "search_query": request.args.get("q", "").strip(),
+ "available_dbs": get_available_databases(),
+ "selected_db": None,
+ "error": None,
+ "page_info": {},
+ "filters": {
+ key: request.args.get(key, "").strip()
+ for key in ["from_host", "received_at_min", "received_at_max"]
+ },
+ "debug_query": "",
+ "request": request,
+ }
+
+ if not context["available_dbs"]:
+ context["error"] = (
+ "No SQLite database files found. Ensure `aiosyslogd` has run and created logs."
)
+ return await render_template("index.html", **context)
# --- Get parameters from request ---
- selected_db: str = request.args.get("db_file", available_dbs[0])
- search_query: str = request.args.get("q", "").strip()
+ context["selected_db"] = request.args.get(
+ "db_file", context["available_dbs"][0]
+ )
last_id: int | None = request.args.get("last_id", type=int)
page_size: int = 50
- filter_keys = ["from_host", "received_at_min", "received_at_max"]
- filters: Dict[str, str] = {
- key: request.args.get(key, "").strip() for key in filter_keys
- }
-
- if selected_db not in available_dbs:
+ if context["selected_db"] not in context["available_dbs"]:
abort(404, "Database file not found.")
# --- Build Query ---
- query_parts = build_log_query(search_query, filters, last_id, page_size)
- query_error: str | None = None
- logs: List[aiosqlite.Row] = []
- total_logs: int | None = None
- query_time: float | None = None
+ query_parts = build_log_query(
+ context["search_query"], context["filters"], last_id, page_size
+ )
+ context["debug_query"] = query_parts["debug_query"]
# --- Execute Query ---
try:
start_time: float = time.perf_counter()
- db_uri: str = f"file:{selected_db}?mode=ro"
+ db_uri: str = f"file:{context['selected_db']}?mode=ro"
async with aiosqlite.connect(
db_uri,
uri=True,
@@ 241,43 252,33 @@ async def index() -> str | Response:
) as cursor:
result = await cursor.fetchone()
if result:
- total_logs = result[0]
+ context["total_logs"] = result[0]
async with conn.execute(
query_parts["main_sql"], query_parts["main_params"]
) as cursor:
- logs = list(await cursor.fetchall())
- query_time = time.perf_counter() - start_time
+ context["logs"] = await cursor.fetchall()
+ context["query_time"] = time.perf_counter() - start_time
except (aiosqlite.OperationalError, aiosqlite.DatabaseError) as e:
- query_error = str(e)
+ context["error"] = str(e)
logger.opt(exception=True).error(
- f"Database query failed for {selected_db}"
+ f"Database query failed for {context['selected_db']}"
)
# --- Prepare Pagination & Rendering ---
- has_next_page: bool = len(logs) > page_size
+ has_next_page: bool = len(context["logs"]) > page_size
next_last_id: int | None = (
- logs[page_size - 1]["ID"] if logs and has_next_page else None
+ context["logs"][page_size - 1]["ID"]
+ if context["logs"] and has_next_page
+ else None
)
- page_info: Dict[str, Any] = {
+ context["page_info"] = {
"has_next_page": has_next_page,
"next_last_id": next_last_id,
"prev_last_id": None,
}
+ context["logs"] = context["logs"][:page_size]
- return await render_template(
- "index.html",
- logs=logs[:page_size],
- total_logs=total_logs,
- query_time=query_time,
- search_query=search_query,
- available_dbs=available_dbs,
- selected_db=selected_db,
- error=query_error,
- page_info=page_info,
- filters=filters,
- request=request,
- debug_query=query_parts["debug_query"],
- )
+ return await render_template("index.html", **context)
def main() -> None:
@@ 303,6 304,9 @@ def main() -> None:
port: int = server_cfg.get("bind_port", 5141)
debug: bool = server_cfg.get("debug", False)
+ if debug:
+ logger.level("DEBUG")
+
logger.info(f"Starting aiosyslogd-web interface on http://{host}:{port}")
if uvloop:
uvloop.install()
M tests/test_parsers.py +31 -67
@@ 1,13 1,22 @@
from datetime import datetime
+from loguru import logger
from unittest.mock import patch
import pytest
import re
+import sys
# --- Import the classes and functions to be tested ---
from aiosyslogd.priority import SyslogMatrix
from aiosyslogd.rfc5424 import normalize_to_rfc5424, convert_rfc3164_to_rfc5424
-# --- Test Suite for priority.py ---
+
+@pytest.fixture(autouse=True)
+def setup_logger(capsys):
+ """Ensure logger is configured to output to stderr for capture."""
+ logger.remove()
+ logger.add(sys.stderr, level="DEBUG")
+ yield
+ logger.remove()
class TestSyslogMatrix:
@@ 49,108 58,63 @@ class TestSyslogMatrix:
assert level == ("emergency", 0)
-# --- Test Suite for rfc5424.py ---
-
-
class TestRfc5424Conversion:
"""Tests for syslog message format normalization."""
def test_normalize_already_rfc5424(self):
- """Tests that a message already in RFC5424 format is not changed."""
rfc5424_msg = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - 'su root' failed for lonvick on /dev/pts/8"
normalized = normalize_to_rfc5424(rfc5424_msg)
assert normalized == rfc5424_msg
def test_normalize_standard_rfc3164(self):
- """Tests conversion of a standard RFC3164 message."""
rfc3164_msg = "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"
-
- # We can't test the timestamp exactly, so we check the structure.
normalized = normalize_to_rfc5424(rfc3164_msg)
-
assert normalized.startswith("<34>1 ")
assert "mymachine" in normalized
- assert "su" in normalized # app-name
- assert "- - " in normalized # msgid and sd
+ assert "su" in normalized
+ assert "- - " in normalized
assert normalized.endswith("'su root' failed for lonvick on /dev/pts/8")
def test_normalize_rfc3164_with_pid(self):
- """Tests conversion of an RFC3164 message that includes a PID."""
rfc3164_msg = (
"<13>Feb 5 10:01:02 host CRON[12345]: (root) CMD (command)"
)
-
normalized = normalize_to_rfc5424(rfc3164_msg)
-
assert normalized.startswith("<13>1 ")
assert " host " in normalized
assert " CRON " in normalized
- assert " 12345 " in normalized # procid
+ assert " 12345 " in normalized
assert normalized.endswith("(root) CMD (command)")
def test_normalize_unparseable_message(self):
- """Tests that a message that doesn't match either format is returned as-is."""
plain_msg = "this is just a plain log message"
normalized = normalize_to_rfc5424(plain_msg)
assert normalized == plain_msg
@patch("aiosyslogd.rfc5424.datetime")
def test_rfc3164_timestamp_conversion_past(self, mock_datetime):
- """
- Tests that a timestamp from a previous month is correctly assigned to the previous year.
- """
- # Simulate that "now" is Jan 2025
mock_datetime.now.return_value = datetime(2025, 1, 15)
-
- # Configure the mock to use the real strptime method so it returns a real datetime object
mock_datetime.strptime = datetime.strptime
-
- # This log is from December, so it should be from 2024
rfc3164_msg = "<34>Dec 10 22:14:15 mymachine su: test"
-
normalized = convert_rfc3164_to_rfc5424(rfc3164_msg)
-
- # Check that the year in the timestamp is correct
assert "2024-12-10T" in normalized
-
-def test_normalize_to_rfc5424_debug_mode(capsys):
- """Tests that a debug message is printed when normalizing a non-syslog message in debug mode."""
- message = "this is not a syslog message"
- debug_mode = True
-
- # Normalize the message
- normalized = normalize_to_rfc5424(message, debug_mode)
-
- # Capture console output and verify debug message
- captured = capsys.readouterr()
- assert (
- "[RFC-CONVERT] Not an RFC 3164 message, returning original: this is not a syslog message"
- in captured.out
- )
- # Ensure the message is returned unchanged
- assert normalized == message
-
+ def test_normalize_to_rfc5424_debug_mode(self, capsys):
+ message = "this is not a syslog message"
+ normalized = normalize_to_rfc5424(message, debug_mode=True)
+ captured = capsys.readouterr()
+ assert "Not an RFC 3164 message" in captured.err
+ assert normalized == message
-def test_convert_rfc3164_to_rfc5424_timestamp_error(capsys):
- """Tests that a debug message is printed when an RFC3164 timestamp cannot be parsed in debug mode."""
- message = "<34>Feb 30 22:14:15 mymachine su: test" # Invalid date (Feb 30)
- debug_mode = True
-
- # Convert the message
- normalized = convert_rfc3164_to_rfc5424(message, debug_mode)
-
- # Capture console output and verify debug message
- captured = capsys.readouterr()
- assert (
- "[RFC-CONVERT] Could not parse RFC-3164 timestamp, using current time."
- in captured.out
- )
- # Verify the output is RFC5424 with a current timestamp
- parts = normalized.split()
- assert parts[0] == "<34>1"
- assert re.match(
- r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", parts[1]
- ) # ISO timestamp
- assert parts[2] == "mymachine"
- assert parts[-1] == "test"
+ def test_convert_rfc3164_to_rfc5424_timestamp_error(self, capsys):
+ message = "<34>Feb 30 22:14:15 mymachine su: test" # Invalid date
+ normalized = convert_rfc3164_to_rfc5424(message, debug_mode=True)
+ captured = capsys.readouterr()
+ assert "Could not parse RFC-3164 timestamp" in captured.err
+ parts = normalized.split()
+ assert parts[0] == "<34>1"
+ assert re.match(
+ r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", parts[1]
+ )
+ assert parts[2] == "mymachine"
+ assert parts[-1] == "test"
M tests/test_server.py +53 -443
@@ 1,13 1,23 @@
from aiosyslogd.db import BaseDatabase
from aiosyslogd.server import SyslogUDPServer, get_db_driver
from datetime import datetime
+from loguru import logger
from unittest.mock import AsyncMock, patch, MagicMock
import asyncio
import pytest
import pytest_asyncio
+import sys
-# Helper function to create a mock datagram
+@pytest.fixture(autouse=True)
+def setup_logger_for_server(capsys):
+ """Fixture to configure and reset the logger for each test."""
+ logger.remove()
+ logger.add(sys.stderr, level="DEBUG")
+ yield
+ logger.remove()
+
+
def create_test_datagram(
message: str, priority: int = 34, ts: str = "2025-06-11T12:00:00.000Z"
) -> bytes:
@@ 17,10 27,8 @@ def create_test_datagram(
)
-# Pytest fixtures
@pytest.fixture
def mock_db():
- """Provides a mocked BaseDatabase instance."""
db = AsyncMock(spec=BaseDatabase)
db.connect = AsyncMock()
db.write_batch = AsyncMock()
@@ 30,11 38,8 @@ def mock_db():
@pytest_asyncio.fixture
async def server(mock_db):
- """Provides a SyslogUDPServer instance with a mocked database."""
with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
- with patch(
- "aiosyslogd.server.BATCH_SIZE", 1
- ): # Force small batch size for testing
+ with patch("aiosyslogd.server.BATCH_SIZE", 1):
server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
yield server
try:
@@ 42,14 47,11 @@ async def server(mock_db):
server._db_writer_task.cancel()
await server.shutdown()
except asyncio.CancelledError:
- pass # Expected during shutdown
+ pass
-# Test cases
@pytest.mark.asyncio
async def test_server_creation(server, mock_db):
- """Tests that the server is created and initialized correctly."""
- # Assert
assert server.host == "127.0.0.1"
assert server.port == 5141
assert server.db == mock_db
@@ 59,481 61,89 @@ async def test_server_creation(server, m
@pytest.mark.asyncio
async def test_server_creation_debug_mode(mock_db, capsys):
- """Tests that the debug mode message is printed on server creation."""
- # Arrange: Patch the DEBUG config to be True
- with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
- with patch("aiosyslogd.server.DEBUG", True):
- # Act
- server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
-
- # Assert
- captured = capsys.readouterr()
- assert "Debug mode is ON." in captured.out
-
- # Cleanup
- try:
- await server.shutdown()
- except asyncio.CancelledError:
- pass
-
-
-@pytest.mark.asyncio
-async def test_connection_made(server):
- """Tests that the database writer task is started upon connection."""
- # Arrange
- mock_transport = MagicMock()
- # Reset the task to ensure it's created by the call
- if server._db_writer_task:
- server._db_writer_task.cancel()
- server._db_writer_task = None
-
- # Act
- server.connection_made(mock_transport)
-
- # Assert
- assert server.transport == mock_transport
- assert server._db_writer_task is not None
- assert not server._db_writer_task.done()
-
-
-@pytest.mark.asyncio
-async def test_connection_made_no_db(server):
- """Tests that the db writer task is NOT started if there is no db driver."""
- # Arrange
- server.db = None # Manually remove the db driver
- mock_transport = MagicMock()
- server._db_writer_task = None
-
- # Act
- server.connection_made(mock_transport)
-
- # Assert
- assert server.transport == mock_transport
- assert server._db_writer_task is None # Task should not be created
+ with patch("aiosyslogd.server.DEBUG", True):
+ server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
+ captured = capsys.readouterr()
+ assert "Debug mode is ON." in captured.err
+ try:
+ await server.shutdown()
+ except asyncio.CancelledError:
+ pass
@pytest.mark.asyncio
-async def test_datagram_received(server):
- """Tests that datagram_received queues messages correctly."""
- # Arrange
- test_data = create_test_datagram("Test message")
- addr = ("192.168.1.1", 12345)
-
- # Act
- server.datagram_received(test_data, addr)
-
- # Assert
- assert server._message_queue.qsize() == 1
- queued_item = await server._message_queue.get()
- assert queued_item[0] == test_data
- assert queued_item[1] == addr
- assert isinstance(queued_item[2], datetime)
-
-
-@pytest.mark.asyncio
-async def test_datagram_received_shutting_down(server):
- """Tests that datagrams are ignored when the server is shutting down."""
- # Arrange
- server._shutting_down = True
- test_data = create_test_datagram("Should be ignored")
- addr = ("192.168.1.1", 12345)
-
- # Act
- server.datagram_received(test_data, addr)
-
- # Assert
- assert server._message_queue.qsize() == 0
-
-
-@pytest.mark.asyncio
-async def test_error_received_debug_on(server, capsys):
- """Tests that error_received prints output when debug mode is on."""
- # Arrange
+async def test_error_received(server, capsys):
test_exc = ValueError("Test Error")
- with patch("aiosyslogd.server.DEBUG", True):
- # Act
- server.error_received(test_exc)
-
- # Assert
- captured = capsys.readouterr()
- assert "Error received: Test Error" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_error_received_debug_off(server, capsys):
- """Tests that error_received is silent when debug mode is off."""
- # Arrange
- test_exc = ValueError("Test Error")
- with patch("aiosyslogd.server.DEBUG", False):
- # Act
- server.error_received(test_exc)
-
- # Assert
- captured = capsys.readouterr()
- assert captured.out == ""
- assert captured.err == ""
-
-
-@pytest.mark.asyncio
-async def test_connection_lost_debug_on(server, capsys):
- """Tests that connection_lost prints output when debug mode is on."""
- # Arrange
- test_exc = ConnectionAbortedError("Connection lost unexpectedly")
- with patch("aiosyslogd.server.DEBUG", True):
- # Act
- server.connection_lost(test_exc)
-
- # Assert
- captured = capsys.readouterr()
- assert "Connection lost: Connection lost unexpectedly" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_connection_lost_debug_off(server, capsys):
- """Tests that connection_lost is silent when debug mode is off."""
- # Arrange
- test_exc = ConnectionAbortedError("Connection lost unexpectedly")
- with patch("aiosyslogd.server.DEBUG", False):
- # Act
- server.connection_lost(test_exc)
-
- # Assert
- captured = capsys.readouterr()
- assert captured.out == ""
- assert captured.err == ""
+ server.error_received(test_exc)
+ captured = capsys.readouterr()
+ assert "Error received: Test Error" in captured.err
@pytest.mark.asyncio
-async def test_database_writer_batch_full(server, mock_db):
- """Tests that a full batch triggers a write and is then cleared."""
- # Arrange
- # Use a side_effect to capture a copy of the batch at call time,
- # preventing an issue where the test inspects the list after it's cleared.
- captured_batch = None
-
- async def capture_side_effect(batch):
- nonlocal captured_batch
- captured_batch = list(batch) # noqa -- make a copy to capture the state
-
- mock_db.write_batch.side_effect = capture_side_effect
-
- with patch("aiosyslogd.server.BATCH_SIZE", 2):
- server.connection_made(MagicMock())
-
- # Act 1: Send one message, batch should not be full
- server.datagram_received(
- create_test_datagram("log 1"), ("localhost", 123)
- )
- await asyncio.sleep(0) # Yield to writer
-
- # Assert 1: No write should have happened yet
- mock_db.write_batch.assert_not_called()
-
- # Act 2: Send second message to fill the batch
- server.datagram_received(
- create_test_datagram("log 2"), ("localhost", 123)
- )
- await asyncio.sleep(0.01) # Yield to writer to process the full batch
-
- # Assert 2: The batch should have been written
- mock_db.write_batch.assert_called_once()
- assert captured_batch is not None
- assert len(captured_batch) == 2
- assert captured_batch[0]["Message"] == "log 1"
- assert captured_batch[1]["Message"] == "log 2"
-
-
-@pytest.mark.asyncio
-async def test_database_writer_batch_timeout(server, mock_db):
- """Tests that the database writer writes a batch after a timeout."""
- # Arrange
- with (
- patch("aiosyslogd.server.BATCH_SIZE", 10),
- patch("aiosyslogd.server.BATCH_TIMEOUT", 0.01),
- ):
- server.connection_made(MagicMock())
- server.datagram_received(
- create_test_datagram("log 1"), ("localhost", 123)
- )
-
- # Act
- await asyncio.sleep(0.02) # Wait for timeout to occur
-
- # Assert
- mock_db.write_batch.assert_called_once()
- assert server._message_queue.empty()
+async def test_connection_lost(server, capsys):
+ test_exc = ConnectionAbortedError("Connection lost unexpectedly")
+ server.connection_lost(test_exc)
+ captured = capsys.readouterr()
+ assert "Connection lost: Connection lost unexpectedly" in captured.err
@pytest.mark.asyncio
-async def test_database_writer_shutdown_flush(mock_db):
- """Tests that the database writer flushes remaining logs on shutdown."""
- # Arrange - create a server instance manually to control its lifecycle
- server = None
- with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
- with patch("aiosyslogd.server.BATCH_SIZE", 10):
- server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
-
- try:
- server.connection_made(MagicMock())
+async def test_database_writer_exception(server, mock_db, capsys):
+ server.connection_made(MagicMock())
+ with patch.object(
+ server, "process_datagram", side_effect=ValueError("Processing failed")
+ ):
server.datagram_received(
- create_test_datagram("log 1"), ("localhost", 123)
- )
- server.datagram_received(
- create_test_datagram("log 2"), ("localhost", 123)
+ create_test_datagram("bad log"), ("localhost", 123)
)
- await asyncio.sleep(0) # Yield to the event loop to process the queue
-
- # Act
- await server.shutdown()
-
- # Assert
- mock_db.write_batch.assert_called_once()
- assert len(mock_db.write_batch.call_args[0][0]) == 2
- finally:
- # Ensure cleanup in case of assertion failure
- if server and server.transport:
- server.transport.close()
- if (
- server
- and server._db_writer_task
- and not server._db_writer_task.done()
- ):
- server._db_writer_task.cancel()
-
-
-@pytest.mark.asyncio
-async def test_database_writer_exception_debug_on(server, mock_db, capsys):
- """Tests that an exception in the writer loop is logged in debug mode."""
- # Arrange
- with patch("aiosyslogd.server.DEBUG", True):
- server.connection_made(MagicMock())
- # Make processing fail
- with patch.object(
- server,
- "process_datagram",
- side_effect=ValueError("Processing failed"),
- ):
- server.datagram_received(
- create_test_datagram("bad log"), ("localhost", 123)
- )
-
- # Act
- await asyncio.sleep(0.01) # let writer run
-
- # Assert
- captured = capsys.readouterr()
- assert "[DB-WRITER-ERROR] Processing failed" in captured.out
- mock_db.write_batch.assert_not_called()
-
-
-@pytest.mark.asyncio
-async def test_database_writer_exception_debug_off(server, mock_db, capsys):
- """Tests that an exception in the writer loop is silent when debug is off."""
- # Arrange
- with patch("aiosyslogd.server.DEBUG", False):
- server.connection_made(MagicMock())
- # Make processing fail
- with patch.object(
- server,
- "process_datagram",
- side_effect=ValueError("Processing failed"),
- ):
- server.datagram_received(
- create_test_datagram("bad log"), ("localhost", 123)
- )
-
- # Act
- await asyncio.sleep(0.01) # let writer run
-
- # Assert
- captured = capsys.readouterr()
- # Check that the specific error message is not in the output
- assert "[DB-WRITER-ERROR]" not in captured.out
- mock_db.write_batch.assert_not_called()
-
-
-@pytest.mark.asyncio
-async def test_process_datagram_valid_rfc5424(server):
- """Tests processing of a valid RFC5424 datagram."""
- # Arrange
- test_data = create_test_datagram("Test message")
- addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
-
- # Act
- params = server.process_datagram(test_data, addr, received_at)
-
- # Assert
- assert params is not None
- assert params["Facility"] == 4 # From priority 34
- assert params["Priority"] == 2
- assert params["FromHost"] == "testhost"
- assert params["SysLogTag"] == "testapp"
- assert params["ProcessID"] == "1234"
- assert params["Message"] == "Test message"
- assert params["ReceivedAt"] == received_at
+ await asyncio.sleep(0.01)
+ captured = capsys.readouterr()
+ assert "Error in database writer" in captured.err
+ assert "Processing failed" in captured.err
+ mock_db.write_batch.assert_not_called()
@pytest.mark.asyncio
async def test_process_datagram_log_dump_on(server, capsys):
- """Tests that datagram processing prints output when LOG_DUMP is on."""
- # Arrange
test_data = create_test_datagram("Log dump test")
addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
with patch("aiosyslogd.server.LOG_DUMP", True):
- # Act
- server.process_datagram(test_data, addr, received_at)
-
- # Assert
+ server.process_datagram(test_data, addr, datetime.now())
captured = capsys.readouterr()
- assert "FROM 192.168.1.1:" in captured.out
- assert "RFC5424 DATA:" in captured.out
- assert "Log dump test" in captured.out
-
-
-@pytest.mark.asyncio
-@pytest.mark.parametrize(
- "bad_ts, test_id",
- [
- ("not-a-timestamp", "invalid-string-ValueError"),
- # The case ts=None causes an unhandled AttributeError in the source code.
- # It is not included here as the test would fail until the source is fixed
- # to catch AttributeError in the timestamp parsing block.
- ],
-)
-async def test_process_datagram_invalid_timestamp_fallback(
- server, bad_ts, test_id
-):
- """
- Tests that DeviceReportedTime falls back to ReceivedAt when the
- timestamp from the message is unparseable (triggering a handled exception).
- """
- # Arrange
- test_data = create_test_datagram(f"Test with {test_id}", ts=bad_ts)
- addr = ("192.168.1.1", 12345)
- # Use a distinct timestamp to ensure we can verify the fallback.
- received_at = datetime(2025, 6, 11, 12, 1, 1)
-
- # Act
- params = server.process_datagram(test_data, addr, received_at)
-
- # Assert
- assert params is not None, "Processing should not fail completely"
- assert (
- params["DeviceReportedTime"] == received_at
- ), "Should fall back to received_at"
+ assert "FROM 192.168.1.1:" in captured.err
+ assert "Log dump test" in captured.err
@pytest.mark.asyncio
async def test_process_datagram_invalid_encoding(server, capsys):
- """Tests handling of a datagram with invalid UTF-8 encoding."""
- # Arrange
- test_data = b"\xff\xfe" # Invalid UTF-8
+ test_data = b"\xff\xfe"
addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
-
- # Act
- with patch("aiosyslogd.server.DEBUG", True): # Enable debug output
- params = server.process_datagram(test_data, addr, received_at)
-
- # Assert
- assert params is None
+ params = server.process_datagram(test_data, addr, datetime.now())
captured = capsys.readouterr()
- assert "Cannot decode message" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_process_datagram_non_rfc5424(server):
- """Tests processing of a non-RFC5424 datagram with fallback parsing."""
- # Arrange
- test_data = b"<14>Invalid syslog message"
- addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
-
- # Act
- params = server.process_datagram(test_data, addr, received_at)
-
- # Assert
- assert params is not None
- assert params["Facility"] == 1 # From priority 14
- assert params["Priority"] == 6
- assert params["FromHost"] == "192.168.1.1"
- assert params["SysLogTag"] == "UNKNOWN"
- assert params["Message"] == "<14>Invalid syslog message"
+ assert params is None
+ assert "Cannot decode message" in captured.err
@pytest.mark.asyncio
async def test_debug_mode_invalid_datagram(server, capsys):
- """Tests that a debug message is printed when an invalid datagram is received in DEBUG mode."""
with patch("aiosyslogd.server.DEBUG", True):
- test_data = b"this is not a syslog message" # Non-RFC5424 message
+ test_data = b"this is not a syslog message"
addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
-
- # Process the datagram
- params = server.process_datagram(test_data, addr, received_at)
-
- # Capture console output and verify debug message
+ params = server.process_datagram(test_data, addr, datetime.now())
captured = capsys.readouterr()
assert (
"Failed to parse as RFC-5424: this is not a syslog message"
- in captured.out
+ in captured.err
)
- # Ensure params are still returned for fallback processing
assert params is not None
assert params["Message"] == "this is not a syslog message"
-@pytest.mark.asyncio
-async def test_debug_mode_decoding_error(server, capsys):
- """Tests that a debug message is printed when a decoding error occurs in DEBUG mode."""
- with patch("aiosyslogd.server.DEBUG", True):
- test_data = b"\xff\xfe" # Invalid UTF-8 encoding
- addr = ("192.168.1.1", 12345)
- received_at = datetime(2025, 6, 11, 12, 0, 0)
-
- # Process the datagram
- params = server.process_datagram(test_data, addr, received_at)
-
- # Capture console output and verify debug message
- captured = capsys.readouterr()
- assert (
- "Cannot decode message from ('192.168.1.1', 12345): b'\\xff\\xfe'"
- in captured.out
- )
- # Ensure no params are returned due to decoding failure
- assert params is None
-
-
def test_get_db_driver_injection_attempt(capsys):
- """
- Tests that get_db_driver prevents code injection by validating the driver name.
- """
- # Arrange: Patch the DB_DRIVER config to simulate a malicious value.
- # An attacker might try to use path traversal or module names to execute code.
malicious_driver_name = "../../../../os"
with patch("aiosyslogd.server.DB_DRIVER", malicious_driver_name):
- # Act & Assert: The function should raise SystemExit.
with pytest.raises(SystemExit):
get_db_driver()
-
- # Assert that an informative error message was printed.
captured = capsys.readouterr()
- assert "Error: Invalid database driver" in captured.out
- assert f"'{malicious_driver_name}'" in captured.out
- assert "Allowed drivers are:" in captured.out
- assert "sqlite" in captured.out
- assert "meilisearch" in captured.out
-
-
-def test_get_db_driver_is_none():
- """
- Tests that get_db_driver returns None when the driver is not set.
- """
- # Arrange: Patch the DB_DRIVER config to be None.
- with patch("aiosyslogd.server.DB_DRIVER", None):
- # Act
- driver = get_db_driver()
- # Assert
- assert driver is None
+ assert "Invalid database driver" in captured.err
+ assert f"'{malicious_driver_name}'" in captured.err
A => tests/test_web.py +66 -0
@@ 0,0 1,66 @@
+from loguru import logger
+from unittest.mock import patch
+import pytest
+import sys
+
+# --- Import the module and app to be tested ---
+from aiosyslogd import web
+
+
+@pytest.fixture(autouse=True)
+def setup_logger_for_web():
+ """Ensure logger is configured for all tests in this module."""
+ logger.remove()
+ logger.add(sys.stderr, level="DEBUG")
+ yield
+ logger.remove()
+
+
+@pytest.fixture
+def client():
+ """Provides a test client for the Quart app."""
+ app_config = web.CFG.copy()
+ app_config["database"] = {"driver": "sqlite"}
+ app_config["web_server"] = {
+ "bind_ip": "127.0.0.1",
+ "bind_port": 5141,
+ "debug": False,
+ }
+ with patch("aiosyslogd.web.CFG", app_config):
+ from aiosyslogd.web import app
+
+ yield app.test_client()
+
+
+def test_main_meilisearch_exit(capsys):
+ """
+ Tests that the main function exits cleanly if Meilisearch is the configured driver.
+ """
+ meili_config = {
+ "database": {"driver": "meilisearch"},
+ "web_server": {
+ "bind_ip": "127.0.0.1",
+ "bind_port": 5141,
+ "debug": False,
+ },
+ }
+ with patch("aiosyslogd.web.CFG", meili_config):
+ with pytest.raises(SystemExit) as e:
+ web.main()
+ assert e.value.code == 0
+
+ captured = capsys.readouterr()
+ assert "Meilisearch backend is selected" in captured.err
+ assert "This web UI is for the SQLite backend only" in captured.err
+
+
+@pytest.mark.asyncio
+async def test_index_route_no_dbs(client):
+ """
+ Tests the index route when no database files are found.
+ """
+ with patch("aiosyslogd.web.get_available_databases", return_value=[]):
+ response = await client.get("/")
+ assert response.status_code == 200
+ response_data = await response.get_data(as_text=True)
+ assert "No SQLite database files found" in response_data