5 files changed, 205 insertions(+), 563 deletions(-)

M aiosyslogd/server.py
M aiosyslogd/web.py
M tests/test_parsers.py
M tests/test_server.py
A => tests/test_web.py
M aiosyslogd/server.py +10 -12
@@ 8,13 8,13 @@ from .priority import SyslogMatrix
 from .rfc5424 import RFC5424_PATTERN, normalize_to_rfc5424
 from datetime import datetime
 from importlib import import_module
+from loguru import logger
 from types import ModuleType
 from typing import Dict, Any, Tuple, List, Type, Self
 import asyncio
 import re
 import signal
 import sys
-from loguru import logger
 
 uvloop: ModuleType | None = None
 try:

          
@@ 29,10 29,11 @@ except ImportError:
 CFG = config.load_config()
 
 # Server settings
-DEBUG: bool = CFG.get("server", {}).get("debug", False)
-LOG_DUMP: bool = CFG.get("server", {}).get("log_dump", False)
-BINDING_IP: str = CFG.get("server", {}).get("bind_ip", "0.0.0.0")
-BINDING_PORT: int = int(CFG.get("server", {}).get("bind_port", 5140))
+SERVER_CFG = CFG.get("server", {})
+DEBUG: bool = SERVER_CFG.get("debug", False)
+LOG_DUMP: bool = SERVER_CFG.get("log_dump", False)
+BINDING_IP: str = SERVER_CFG.get("bind_ip", "0.0.0.0")
+BINDING_PORT: int = int(SERVER_CFG.get("bind_port", 5140))
 
 # Database settings
 DB_CFG = CFG.get("database", {})

          
@@ 152,9 153,8 @@ class SyslogUDPServer(asyncio.DatagramPr
                 batch.clear()
             except asyncio.CancelledError:
                 break
-            except Exception as e:
-                logger.opt(exception=True).error(f"Error in database writer")
-                logger.debug(str(e))
+            except Exception:
+                logger.opt(exception=True).error("Error in database writer")
         if batch and self.db:
             await self.db.write_batch(batch)  # Final write
         logger.info("Database writer task finished.")

          
@@ 173,7 173,7 @@ class SyslogUDPServer(asyncio.DatagramPr
             decoded_data, debug_mode=DEBUG
         )
         if LOG_DUMP:
-            logger.trace(f"FROM {address[0]}: {processed_data}")
+            logger.debug(f"FROM {address[0]}: {processed_data}")
 
         match: re.Match[str] | None = RFC5424_PATTERN.match(processed_data)
         if not match:

          
@@ 256,9 256,7 @@ async def run_server() -> None:
 
 def main() -> None:
     """CLI Entry point."""
-    log_level = "DEBUG" if DEBUG else "INFO"
-    if LOG_DUMP:
-        log_level = "TRACE"
+    log_level = "DEBUG" if DEBUG or LOG_DUMP else "INFO"
 
     logger.remove()
     logger.add(

          
M aiosyslogd/web.py +45 -41
@@ 198,38 198,49 @@ async def startup() -> None:
 @app.route("/")
 async def index() -> str | Response:
     """Main route for displaying and searching logs."""
-    available_dbs: List[str] = get_available_databases()
-    if not available_dbs:
-        return await render_template(
-            "index.html",
-            error="No SQLite database files found. Ensure `aiosyslogd` has run and created logs.",
+    context: Dict[str, Any] = {
+        "logs": [],
+        "total_logs": 0,
+        "query_time": 0.0,
+        "search_query": request.args.get("q", "").strip(),
+        "available_dbs": get_available_databases(),
+        "selected_db": None,
+        "error": None,
+        "page_info": {},
+        "filters": {
+            key: request.args.get(key, "").strip()
+            for key in ["from_host", "received_at_min", "received_at_max"]
+        },
+        "debug_query": "",
+        "request": request,
+    }
+
+    if not context["available_dbs"]:
+        context["error"] = (
+            "No SQLite database files found. Ensure `aiosyslogd` has run and created logs."
         )
+        return await render_template("index.html", **context)
 
     # --- Get parameters from request ---
-    selected_db: str = request.args.get("db_file", available_dbs[0])
-    search_query: str = request.args.get("q", "").strip()
+    context["selected_db"] = request.args.get(
+        "db_file", context["available_dbs"][0]
+    )
     last_id: int | None = request.args.get("last_id", type=int)
     page_size: int = 50
 
-    filter_keys = ["from_host", "received_at_min", "received_at_max"]
-    filters: Dict[str, str] = {
-        key: request.args.get(key, "").strip() for key in filter_keys
-    }
-
-    if selected_db not in available_dbs:
+    if context["selected_db"] not in context["available_dbs"]:
         abort(404, "Database file not found.")
 
     # --- Build Query ---
-    query_parts = build_log_query(search_query, filters, last_id, page_size)
-    query_error: str | None = None
-    logs: List[aiosqlite.Row] = []
-    total_logs: int | None = None
-    query_time: float | None = None
+    query_parts = build_log_query(
+        context["search_query"], context["filters"], last_id, page_size
+    )
+    context["debug_query"] = query_parts["debug_query"]
 
     # --- Execute Query ---
     try:
         start_time: float = time.perf_counter()
-        db_uri: str = f"file:{selected_db}?mode=ro"
+        db_uri: str = f"file:{context['selected_db']}?mode=ro"
         async with aiosqlite.connect(
             db_uri,
             uri=True,

          
@@ 241,43 252,33 @@ async def index() -> str | Response:
             ) as cursor:
                 result = await cursor.fetchone()
                 if result:
-                    total_logs = result[0]
+                    context["total_logs"] = result[0]
             async with conn.execute(
                 query_parts["main_sql"], query_parts["main_params"]
             ) as cursor:
-                logs = list(await cursor.fetchall())
-        query_time = time.perf_counter() - start_time
+                context["logs"] = await cursor.fetchall()
+        context["query_time"] = time.perf_counter() - start_time
     except (aiosqlite.OperationalError, aiosqlite.DatabaseError) as e:
-        query_error = str(e)
+        context["error"] = str(e)
         logger.opt(exception=True).error(
-            f"Database query failed for {selected_db}"
+            f"Database query failed for {context['selected_db']}"
         )
 
     # --- Prepare Pagination & Rendering ---
-    has_next_page: bool = len(logs) > page_size
+    has_next_page: bool = len(context["logs"]) > page_size
     next_last_id: int | None = (
-        logs[page_size - 1]["ID"] if logs and has_next_page else None
+        context["logs"][page_size - 1]["ID"]
+        if context["logs"] and has_next_page
+        else None
     )
-    page_info: Dict[str, Any] = {
+    context["page_info"] = {
         "has_next_page": has_next_page,
         "next_last_id": next_last_id,
         "prev_last_id": None,
     }
+    context["logs"] = context["logs"][:page_size]
 
-    return await render_template(
-        "index.html",
-        logs=logs[:page_size],
-        total_logs=total_logs,
-        query_time=query_time,
-        search_query=search_query,
-        available_dbs=available_dbs,
-        selected_db=selected_db,
-        error=query_error,
-        page_info=page_info,
-        filters=filters,
-        request=request,
-        debug_query=query_parts["debug_query"],
-    )
+    return await render_template("index.html", **context)
 
 
 def main() -> None:

          
@@ 303,6 304,9 @@ def main() -> None:
     port: int = server_cfg.get("bind_port", 5141)
     debug: bool = server_cfg.get("debug", False)
 
+    if debug:
+        logger.level("DEBUG")
+
     logger.info(f"Starting aiosyslogd-web interface on http://{host}:{port}")
     if uvloop:
         uvloop.install()

          
M tests/test_parsers.py +31 -67
@@ 1,13 1,22 @@ 
 from datetime import datetime
+from loguru import logger
 from unittest.mock import patch
 import pytest
 import re
+import sys
 
 # --- Import the classes and functions to be tested ---
 from aiosyslogd.priority import SyslogMatrix
 from aiosyslogd.rfc5424 import normalize_to_rfc5424, convert_rfc3164_to_rfc5424
 
-# --- Test Suite for priority.py ---
+
+@pytest.fixture(autouse=True)
+def setup_logger(capsys):
+    """Ensure logger is configured to output to stderr for capture."""
+    logger.remove()
+    logger.add(sys.stderr, level="DEBUG")
+    yield
+    logger.remove()
 
 
 class TestSyslogMatrix:

          
@@ 49,108 58,63 @@ class TestSyslogMatrix:
         assert level == ("emergency", 0)
 
 
-# --- Test Suite for rfc5424.py ---
-
-
 class TestRfc5424Conversion:
     """Tests for syslog message format normalization."""
 
     def test_normalize_already_rfc5424(self):
-        """Tests that a message already in RFC5424 format is not changed."""
         rfc5424_msg = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - 'su root' failed for lonvick on /dev/pts/8"
         normalized = normalize_to_rfc5424(rfc5424_msg)
         assert normalized == rfc5424_msg
 
     def test_normalize_standard_rfc3164(self):
-        """Tests conversion of a standard RFC3164 message."""
         rfc3164_msg = "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"
-
-        # We can't test the timestamp exactly, so we check the structure.
         normalized = normalize_to_rfc5424(rfc3164_msg)
-
         assert normalized.startswith("<34>1 ")
         assert "mymachine" in normalized
-        assert "su" in normalized  # app-name
-        assert "- - " in normalized  # msgid and sd
+        assert "su" in normalized
+        assert "- - " in normalized
         assert normalized.endswith("'su root' failed for lonvick on /dev/pts/8")
 
     def test_normalize_rfc3164_with_pid(self):
-        """Tests conversion of an RFC3164 message that includes a PID."""
         rfc3164_msg = (
             "<13>Feb  5 10:01:02 host CRON[12345]: (root) CMD (command)"
         )
-
         normalized = normalize_to_rfc5424(rfc3164_msg)
-
         assert normalized.startswith("<13>1 ")
         assert " host " in normalized
         assert " CRON " in normalized
-        assert " 12345 " in normalized  # procid
+        assert " 12345 " in normalized
         assert normalized.endswith("(root) CMD (command)")
 
     def test_normalize_unparseable_message(self):
-        """Tests that a message that doesn't match either format is returned as-is."""
         plain_msg = "this is just a plain log message"
         normalized = normalize_to_rfc5424(plain_msg)
         assert normalized == plain_msg
 
     @patch("aiosyslogd.rfc5424.datetime")
     def test_rfc3164_timestamp_conversion_past(self, mock_datetime):
-        """
-        Tests that a timestamp from a previous month is correctly assigned to the previous year.
-        """
-        # Simulate that "now" is Jan 2025
         mock_datetime.now.return_value = datetime(2025, 1, 15)
-
-        # Configure the mock to use the real strptime method so it returns a real datetime object
         mock_datetime.strptime = datetime.strptime
-
-        # This log is from December, so it should be from 2024
         rfc3164_msg = "<34>Dec 10 22:14:15 mymachine su: test"
-
         normalized = convert_rfc3164_to_rfc5424(rfc3164_msg)
-
-        # Check that the year in the timestamp is correct
         assert "2024-12-10T" in normalized
 
-
-def test_normalize_to_rfc5424_debug_mode(capsys):
-    """Tests that a debug message is printed when normalizing a non-syslog message in debug mode."""
-    message = "this is not a syslog message"
-    debug_mode = True
-
-    # Normalize the message
-    normalized = normalize_to_rfc5424(message, debug_mode)
-
-    # Capture console output and verify debug message
-    captured = capsys.readouterr()
-    assert (
-        "[RFC-CONVERT] Not an RFC 3164 message, returning original: this is not a syslog message"
-        in captured.out
-    )
-    # Ensure the message is returned unchanged
-    assert normalized == message
-
+    def test_normalize_to_rfc5424_debug_mode(self, capsys):
+        message = "this is not a syslog message"
+        normalized = normalize_to_rfc5424(message, debug_mode=True)
+        captured = capsys.readouterr()
+        assert "Not an RFC 3164 message" in captured.err
+        assert normalized == message
 
-def test_convert_rfc3164_to_rfc5424_timestamp_error(capsys):
-    """Tests that a debug message is printed when an RFC3164 timestamp cannot be parsed in debug mode."""
-    message = "<34>Feb 30 22:14:15 mymachine su: test"  # Invalid date (Feb 30)
-    debug_mode = True
-
-    # Convert the message
-    normalized = convert_rfc3164_to_rfc5424(message, debug_mode)
-
-    # Capture console output and verify debug message
-    captured = capsys.readouterr()
-    assert (
-        "[RFC-CONVERT] Could not parse RFC-3164 timestamp, using current time."
-        in captured.out
-    )
-    # Verify the output is RFC5424 with a current timestamp
-    parts = normalized.split()
-    assert parts[0] == "<34>1"
-    assert re.match(
-        r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", parts[1]
-    )  # ISO timestamp
-    assert parts[2] == "mymachine"
-    assert parts[-1] == "test"
+    def test_convert_rfc3164_to_rfc5424_timestamp_error(self, capsys):
+        message = "<34>Feb 30 22:14:15 mymachine su: test"  # Invalid date
+        normalized = convert_rfc3164_to_rfc5424(message, debug_mode=True)
+        captured = capsys.readouterr()
+        assert "Could not parse RFC-3164 timestamp" in captured.err
+        parts = normalized.split()
+        assert parts[0] == "<34>1"
+        assert re.match(
+            r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", parts[1]
+        )
+        assert parts[2] == "mymachine"
+        assert parts[-1] == "test"

          
M tests/test_server.py +53 -443
@@ 1,13 1,23 @@ 
 from aiosyslogd.db import BaseDatabase
 from aiosyslogd.server import SyslogUDPServer, get_db_driver
 from datetime import datetime
+from loguru import logger
 from unittest.mock import AsyncMock, patch, MagicMock
 import asyncio
 import pytest
 import pytest_asyncio
+import sys
 
 
-# Helper function to create a mock datagram
+@pytest.fixture(autouse=True)
+def setup_logger_for_server(capsys):
+    """Fixture to configure and reset the logger for each test."""
+    logger.remove()
+    logger.add(sys.stderr, level="DEBUG")
+    yield
+    logger.remove()
+
+
 def create_test_datagram(
     message: str, priority: int = 34, ts: str = "2025-06-11T12:00:00.000Z"
 ) -> bytes:

          
@@ 17,10 27,8 @@ def create_test_datagram(
     )
 
 
-# Pytest fixtures
 @pytest.fixture
 def mock_db():
-    """Provides a mocked BaseDatabase instance."""
     db = AsyncMock(spec=BaseDatabase)
     db.connect = AsyncMock()
     db.write_batch = AsyncMock()

          
@@ 30,11 38,8 @@ def mock_db():
 
 @pytest_asyncio.fixture
 async def server(mock_db):
-    """Provides a SyslogUDPServer instance with a mocked database."""
     with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
-        with patch(
-            "aiosyslogd.server.BATCH_SIZE", 1
-        ):  # Force small batch size for testing
+        with patch("aiosyslogd.server.BATCH_SIZE", 1):
             server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
     yield server
     try:

          
@@ 42,14 47,11 @@ async def server(mock_db):
             server._db_writer_task.cancel()
         await server.shutdown()
     except asyncio.CancelledError:
-        pass  # Expected during shutdown
+        pass
 
 
-# Test cases
 @pytest.mark.asyncio
 async def test_server_creation(server, mock_db):
-    """Tests that the server is created and initialized correctly."""
-    # Assert
     assert server.host == "127.0.0.1"
     assert server.port == 5141
     assert server.db == mock_db

          
@@ 59,481 61,89 @@ async def test_server_creation(server, m
 
 @pytest.mark.asyncio
 async def test_server_creation_debug_mode(mock_db, capsys):
-    """Tests that the debug mode message is printed on server creation."""
-    # Arrange: Patch the DEBUG config to be True
-    with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
-        with patch("aiosyslogd.server.DEBUG", True):
-            # Act
-            server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
-
-            # Assert
-            captured = capsys.readouterr()
-            assert "Debug mode is ON." in captured.out
-
-            # Cleanup
-            try:
-                await server.shutdown()
-            except asyncio.CancelledError:
-                pass
-
-
-@pytest.mark.asyncio
-async def test_connection_made(server):
-    """Tests that the database writer task is started upon connection."""
-    # Arrange
-    mock_transport = MagicMock()
-    # Reset the task to ensure it's created by the call
-    if server._db_writer_task:
-        server._db_writer_task.cancel()
-    server._db_writer_task = None
-
-    # Act
-    server.connection_made(mock_transport)
-
-    # Assert
-    assert server.transport == mock_transport
-    assert server._db_writer_task is not None
-    assert not server._db_writer_task.done()
-
-
-@pytest.mark.asyncio
-async def test_connection_made_no_db(server):
-    """Tests that the db writer task is NOT started if there is no db driver."""
-    # Arrange
-    server.db = None  # Manually remove the db driver
-    mock_transport = MagicMock()
-    server._db_writer_task = None
-
-    # Act
-    server.connection_made(mock_transport)
-
-    # Assert
-    assert server.transport == mock_transport
-    assert server._db_writer_task is None  # Task should not be created
+    with patch("aiosyslogd.server.DEBUG", True):
+        server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
+        captured = capsys.readouterr()
+        assert "Debug mode is ON." in captured.err
+        try:
+            await server.shutdown()
+        except asyncio.CancelledError:
+            pass
 
 
 @pytest.mark.asyncio
-async def test_datagram_received(server):
-    """Tests that datagram_received queues messages correctly."""
-    # Arrange
-    test_data = create_test_datagram("Test message")
-    addr = ("192.168.1.1", 12345)
-
-    # Act
-    server.datagram_received(test_data, addr)
-
-    # Assert
-    assert server._message_queue.qsize() == 1
-    queued_item = await server._message_queue.get()
-    assert queued_item[0] == test_data
-    assert queued_item[1] == addr
-    assert isinstance(queued_item[2], datetime)
-
-
-@pytest.mark.asyncio
-async def test_datagram_received_shutting_down(server):
-    """Tests that datagrams are ignored when the server is shutting down."""
-    # Arrange
-    server._shutting_down = True
-    test_data = create_test_datagram("Should be ignored")
-    addr = ("192.168.1.1", 12345)
-
-    # Act
-    server.datagram_received(test_data, addr)
-
-    # Assert
-    assert server._message_queue.qsize() == 0
-
-
-@pytest.mark.asyncio
-async def test_error_received_debug_on(server, capsys):
-    """Tests that error_received prints output when debug mode is on."""
-    # Arrange
+async def test_error_received(server, capsys):
     test_exc = ValueError("Test Error")
-    with patch("aiosyslogd.server.DEBUG", True):
-        # Act
-        server.error_received(test_exc)
-
-        # Assert
-        captured = capsys.readouterr()
-        assert "Error received: Test Error" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_error_received_debug_off(server, capsys):
-    """Tests that error_received is silent when debug mode is off."""
-    # Arrange
-    test_exc = ValueError("Test Error")
-    with patch("aiosyslogd.server.DEBUG", False):
-        # Act
-        server.error_received(test_exc)
-
-        # Assert
-        captured = capsys.readouterr()
-        assert captured.out == ""
-        assert captured.err == ""
-
-
-@pytest.mark.asyncio
-async def test_connection_lost_debug_on(server, capsys):
-    """Tests that connection_lost prints output when debug mode is on."""
-    # Arrange
-    test_exc = ConnectionAbortedError("Connection lost unexpectedly")
-    with patch("aiosyslogd.server.DEBUG", True):
-        # Act
-        server.connection_lost(test_exc)
-
-        # Assert
-        captured = capsys.readouterr()
-        assert "Connection lost: Connection lost unexpectedly" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_connection_lost_debug_off(server, capsys):
-    """Tests that connection_lost is silent when debug mode is off."""
-    # Arrange
-    test_exc = ConnectionAbortedError("Connection lost unexpectedly")
-    with patch("aiosyslogd.server.DEBUG", False):
-        # Act
-        server.connection_lost(test_exc)
-
-        # Assert
-        captured = capsys.readouterr()
-        assert captured.out == ""
-        assert captured.err == ""
+    server.error_received(test_exc)
+    captured = capsys.readouterr()
+    assert "Error received: Test Error" in captured.err
 
 
 @pytest.mark.asyncio
-async def test_database_writer_batch_full(server, mock_db):
-    """Tests that a full batch triggers a write and is then cleared."""
-    # Arrange
-    # Use a side_effect to capture a copy of the batch at call time,
-    # preventing an issue where the test inspects the list after it's cleared.
-    captured_batch = None
-
-    async def capture_side_effect(batch):
-        nonlocal captured_batch
-        captured_batch = list(batch)  # noqa -- make a copy to capture the state
-
-    mock_db.write_batch.side_effect = capture_side_effect
-
-    with patch("aiosyslogd.server.BATCH_SIZE", 2):
-        server.connection_made(MagicMock())
-
-        # Act 1: Send one message, batch should not be full
-        server.datagram_received(
-            create_test_datagram("log 1"), ("localhost", 123)
-        )
-        await asyncio.sleep(0)  # Yield to writer
-
-        # Assert 1: No write should have happened yet
-        mock_db.write_batch.assert_not_called()
-
-        # Act 2: Send second message to fill the batch
-        server.datagram_received(
-            create_test_datagram("log 2"), ("localhost", 123)
-        )
-        await asyncio.sleep(0.01)  # Yield to writer to process the full batch
-
-        # Assert 2: The batch should have been written
-        mock_db.write_batch.assert_called_once()
-        assert captured_batch is not None
-        assert len(captured_batch) == 2
-        assert captured_batch[0]["Message"] == "log 1"
-        assert captured_batch[1]["Message"] == "log 2"
-
-
-@pytest.mark.asyncio
-async def test_database_writer_batch_timeout(server, mock_db):
-    """Tests that the database writer writes a batch after a timeout."""
-    # Arrange
-    with (
-        patch("aiosyslogd.server.BATCH_SIZE", 10),
-        patch("aiosyslogd.server.BATCH_TIMEOUT", 0.01),
-    ):
-        server.connection_made(MagicMock())
-        server.datagram_received(
-            create_test_datagram("log 1"), ("localhost", 123)
-        )
-
-        # Act
-        await asyncio.sleep(0.02)  # Wait for timeout to occur
-
-        # Assert
-        mock_db.write_batch.assert_called_once()
-        assert server._message_queue.empty()
+async def test_connection_lost(server, capsys):
+    test_exc = ConnectionAbortedError("Connection lost unexpectedly")
+    server.connection_lost(test_exc)
+    captured = capsys.readouterr()
+    assert "Connection lost: Connection lost unexpectedly" in captured.err
 
 
 @pytest.mark.asyncio
-async def test_database_writer_shutdown_flush(mock_db):
-    """Tests that the database writer flushes remaining logs on shutdown."""
-    # Arrange - create a server instance manually to control its lifecycle
-    server = None
-    with patch("aiosyslogd.server.get_db_driver", return_value=mock_db):
-        with patch("aiosyslogd.server.BATCH_SIZE", 10):
-            server = await SyslogUDPServer.create(host="127.0.0.1", port=5141)
-
-    try:
-        server.connection_made(MagicMock())
+async def test_database_writer_exception(server, mock_db, capsys):
+    server.connection_made(MagicMock())
+    with patch.object(
+        server, "process_datagram", side_effect=ValueError("Processing failed")
+    ):
         server.datagram_received(
-            create_test_datagram("log 1"), ("localhost", 123)
-        )
-        server.datagram_received(
-            create_test_datagram("log 2"), ("localhost", 123)
+            create_test_datagram("bad log"), ("localhost", 123)
         )
-        await asyncio.sleep(0)  # Yield to the event loop to process the queue
-
-        # Act
-        await server.shutdown()
-
-        # Assert
-        mock_db.write_batch.assert_called_once()
-        assert len(mock_db.write_batch.call_args[0][0]) == 2
-    finally:
-        # Ensure cleanup in case of assertion failure
-        if server and server.transport:
-            server.transport.close()
-        if (
-            server
-            and server._db_writer_task
-            and not server._db_writer_task.done()
-        ):
-            server._db_writer_task.cancel()
-
-
-@pytest.mark.asyncio
-async def test_database_writer_exception_debug_on(server, mock_db, capsys):
-    """Tests that an exception in the writer loop is logged in debug mode."""
-    # Arrange
-    with patch("aiosyslogd.server.DEBUG", True):
-        server.connection_made(MagicMock())
-        # Make processing fail
-        with patch.object(
-            server,
-            "process_datagram",
-            side_effect=ValueError("Processing failed"),
-        ):
-            server.datagram_received(
-                create_test_datagram("bad log"), ("localhost", 123)
-            )
-
-            # Act
-            await asyncio.sleep(0.01)  # let writer run
-
-            # Assert
-            captured = capsys.readouterr()
-            assert "[DB-WRITER-ERROR] Processing failed" in captured.out
-            mock_db.write_batch.assert_not_called()
-
-
-@pytest.mark.asyncio
-async def test_database_writer_exception_debug_off(server, mock_db, capsys):
-    """Tests that an exception in the writer loop is silent when debug is off."""
-    # Arrange
-    with patch("aiosyslogd.server.DEBUG", False):
-        server.connection_made(MagicMock())
-        # Make processing fail
-        with patch.object(
-            server,
-            "process_datagram",
-            side_effect=ValueError("Processing failed"),
-        ):
-            server.datagram_received(
-                create_test_datagram("bad log"), ("localhost", 123)
-            )
-
-            # Act
-            await asyncio.sleep(0.01)  # let writer run
-
-            # Assert
-            captured = capsys.readouterr()
-            # Check that the specific error message is not in the output
-            assert "[DB-WRITER-ERROR]" not in captured.out
-            mock_db.write_batch.assert_not_called()
-
-
-@pytest.mark.asyncio
-async def test_process_datagram_valid_rfc5424(server):
-    """Tests processing of a valid RFC5424 datagram."""
-    # Arrange
-    test_data = create_test_datagram("Test message")
-    addr = ("192.168.1.1", 12345)
-    received_at = datetime(2025, 6, 11, 12, 0, 0)
-
-    # Act
-    params = server.process_datagram(test_data, addr, received_at)
-
-    # Assert
-    assert params is not None
-    assert params["Facility"] == 4  # From priority 34
-    assert params["Priority"] == 2
-    assert params["FromHost"] == "testhost"
-    assert params["SysLogTag"] == "testapp"
-    assert params["ProcessID"] == "1234"
-    assert params["Message"] == "Test message"
-    assert params["ReceivedAt"] == received_at
+        await asyncio.sleep(0.01)
+        captured = capsys.readouterr()
+        assert "Error in database writer" in captured.err
+        assert "Processing failed" in captured.err
+        mock_db.write_batch.assert_not_called()
 
 
 @pytest.mark.asyncio
 async def test_process_datagram_log_dump_on(server, capsys):
-    """Tests that datagram processing prints output when LOG_DUMP is on."""
-    # Arrange
     test_data = create_test_datagram("Log dump test")
     addr = ("192.168.1.1", 12345)
-    received_at = datetime(2025, 6, 11, 12, 0, 0)
     with patch("aiosyslogd.server.LOG_DUMP", True):
-        # Act
-        server.process_datagram(test_data, addr, received_at)
-
-        # Assert
+        server.process_datagram(test_data, addr, datetime.now())
         captured = capsys.readouterr()
-        assert "FROM 192.168.1.1:" in captured.out
-        assert "RFC5424 DATA:" in captured.out
-        assert "Log dump test" in captured.out
-
-
-@pytest.mark.asyncio
-@pytest.mark.parametrize(
-    "bad_ts, test_id",
-    [
-        ("not-a-timestamp", "invalid-string-ValueError"),
-        # The case ts=None causes an unhandled AttributeError in the source code.
-        # It is not included here as the test would fail until the source is fixed
-        # to catch AttributeError in the timestamp parsing block.
-    ],
-)
-async def test_process_datagram_invalid_timestamp_fallback(
-    server, bad_ts, test_id
-):
-    """
-    Tests that DeviceReportedTime falls back to ReceivedAt when the
-    timestamp from the message is unparseable (triggering a handled exception).
-    """
-    # Arrange
-    test_data = create_test_datagram(f"Test with {test_id}", ts=bad_ts)
-    addr = ("192.168.1.1", 12345)
-    # Use a distinct timestamp to ensure we can verify the fallback.
-    received_at = datetime(2025, 6, 11, 12, 1, 1)
-
-    # Act
-    params = server.process_datagram(test_data, addr, received_at)
-
-    # Assert
-    assert params is not None, "Processing should not fail completely"
-    assert (
-        params["DeviceReportedTime"] == received_at
-    ), "Should fall back to received_at"
+        assert "FROM 192.168.1.1:" in captured.err
+        assert "Log dump test" in captured.err
 
 
 @pytest.mark.asyncio
 async def test_process_datagram_invalid_encoding(server, capsys):
-    """Tests handling of a datagram with invalid UTF-8 encoding."""
-    # Arrange
-    test_data = b"\xff\xfe"  # Invalid UTF-8
+    test_data = b"\xff\xfe"
     addr = ("192.168.1.1", 12345)
-    received_at = datetime(2025, 6, 11, 12, 0, 0)
-
-    # Act
-    with patch("aiosyslogd.server.DEBUG", True):  # Enable debug output
-        params = server.process_datagram(test_data, addr, received_at)
-
-    # Assert
-    assert params is None
+    params = server.process_datagram(test_data, addr, datetime.now())
     captured = capsys.readouterr()
-    assert "Cannot decode message" in captured.out
-
-
-@pytest.mark.asyncio
-async def test_process_datagram_non_rfc5424(server):
-    """Tests processing of a non-RFC5424 datagram with fallback parsing."""
-    # Arrange
-    test_data = b"<14>Invalid syslog message"
-    addr = ("192.168.1.1", 12345)
-    received_at = datetime(2025, 6, 11, 12, 0, 0)
-
-    # Act
-    params = server.process_datagram(test_data, addr, received_at)
-
-    # Assert
-    assert params is not None
-    assert params["Facility"] == 1  # From priority 14
-    assert params["Priority"] == 6
-    assert params["FromHost"] == "192.168.1.1"
-    assert params["SysLogTag"] == "UNKNOWN"
-    assert params["Message"] == "<14>Invalid syslog message"
+    assert params is None
+    assert "Cannot decode message" in captured.err
 
 
 @pytest.mark.asyncio
 async def test_debug_mode_invalid_datagram(server, capsys):
-    """Tests that a debug message is printed when an invalid datagram is received in DEBUG mode."""
     with patch("aiosyslogd.server.DEBUG", True):
-        test_data = b"this is not a syslog message"  # Non-RFC5424 message
+        test_data = b"this is not a syslog message"
         addr = ("192.168.1.1", 12345)
-        received_at = datetime(2025, 6, 11, 12, 0, 0)
-
-        # Process the datagram
-        params = server.process_datagram(test_data, addr, received_at)
-
-        # Capture console output and verify debug message
+        params = server.process_datagram(test_data, addr, datetime.now())
         captured = capsys.readouterr()
         assert (
             "Failed to parse as RFC-5424: this is not a syslog message"
-            in captured.out
+            in captured.err
         )
-        # Ensure params are still returned for fallback processing
         assert params is not None
         assert params["Message"] == "this is not a syslog message"
 
 
-@pytest.mark.asyncio
-async def test_debug_mode_decoding_error(server, capsys):
-    """Tests that a debug message is printed when a decoding error occurs in DEBUG mode."""
-    with patch("aiosyslogd.server.DEBUG", True):
-        test_data = b"\xff\xfe"  # Invalid UTF-8 encoding
-        addr = ("192.168.1.1", 12345)
-        received_at = datetime(2025, 6, 11, 12, 0, 0)
-
-        # Process the datagram
-        params = server.process_datagram(test_data, addr, received_at)
-
-        # Capture console output and verify debug message
-        captured = capsys.readouterr()
-        assert (
-            "Cannot decode message from ('192.168.1.1', 12345): b'\\xff\\xfe'"
-            in captured.out
-        )
-        # Ensure no params are returned due to decoding failure
-        assert params is None
-
-
 def test_get_db_driver_injection_attempt(capsys):
-    """
-    Tests that get_db_driver prevents code injection by validating the driver name.
-    """
-    # Arrange: Patch the DB_DRIVER config to simulate a malicious value.
-    # An attacker might try to use path traversal or module names to execute code.
     malicious_driver_name = "../../../../os"
     with patch("aiosyslogd.server.DB_DRIVER", malicious_driver_name):
-        # Act & Assert: The function should raise SystemExit.
         with pytest.raises(SystemExit):
             get_db_driver()
-
-    # Assert that an informative error message was printed.
     captured = capsys.readouterr()
-    assert "Error: Invalid database driver" in captured.out
-    assert f"'{malicious_driver_name}'" in captured.out
-    assert "Allowed drivers are:" in captured.out
-    assert "sqlite" in captured.out
-    assert "meilisearch" in captured.out
-
-
-def test_get_db_driver_is_none():
-    """
-    Tests that get_db_driver returns None when the driver is not set.
-    """
-    # Arrange: Patch the DB_DRIVER config to be None.
-    with patch("aiosyslogd.server.DB_DRIVER", None):
-        # Act
-        driver = get_db_driver()
-        # Assert
-        assert driver is None
+    assert "Invalid database driver" in captured.err
+    assert f"'{malicious_driver_name}'" in captured.err

          
A => tests/test_web.py +66 -0
@@ 0,0 1,66 @@ 
+from loguru import logger
+from unittest.mock import patch
+import pytest
+import sys
+
+# --- Import the module and app to be tested ---
+from aiosyslogd import web
+
+
+@pytest.fixture(autouse=True)
+def setup_logger_for_web():
+    """Ensure logger is configured for all tests in this module."""
+    logger.remove()
+    logger.add(sys.stderr, level="DEBUG")
+    yield
+    logger.remove()
+
+
+@pytest.fixture
+def client():
+    """Provides a test client for the Quart app."""
+    app_config = web.CFG.copy()
+    app_config["database"] = {"driver": "sqlite"}
+    app_config["web_server"] = {
+        "bind_ip": "127.0.0.1",
+        "bind_port": 5141,
+        "debug": False,
+    }
+    with patch("aiosyslogd.web.CFG", app_config):
+        from aiosyslogd.web import app
+
+        yield app.test_client()
+
+
+def test_main_meilisearch_exit(capsys):
+    """
+    Tests that the main function exits cleanly if Meilisearch is the configured driver.
+    """
+    meili_config = {
+        "database": {"driver": "meilisearch"},
+        "web_server": {
+            "bind_ip": "127.0.0.1",
+            "bind_port": 5141,
+            "debug": False,
+        },
+    }
+    with patch("aiosyslogd.web.CFG", meili_config):
+        with pytest.raises(SystemExit) as e:
+            web.main()
+        assert e.value.code == 0
+
+    captured = capsys.readouterr()
+    assert "Meilisearch backend is selected" in captured.err
+    assert "This web UI is for the SQLite backend only" in captured.err
+
+
+@pytest.mark.asyncio
+async def test_index_route_no_dbs(client):
+    """
+    Tests the index route when no database files are found.
+    """
+    with patch("aiosyslogd.web.get_available_databases", return_value=[]):
+        response = await client.get("/")
+        assert response.status_code == 200
+        response_data = await response.get_data(as_text=True)
+        assert "No SQLite database files found" in response_data