#!/usr/bin/env python3

"""
Syslog server accepting RFC5424 compliant messages over TCP.
Records are maintained in flat JSON files.
A convenient built-in Web interface allows searching with a simple query language and statistics plotting support.
"""

import argparse
import asyncio
import gzip
import json
import logging
import math
import os
import re
import signal
import socket
import ssl
import subprocess
import sys
import threading
import time
from abc import abstractmethod
from base64 import b64encode
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, field
from datetime import datetime, timezone
from enum import Enum
from gc import collect
from hashlib import md5
from hmac import compare_digest
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from queue import Queue, Empty as QueueEmpty
from string import Template
from typing import get_args, Dict, Optional, Tuple, ClassVar, Callable, List, Literal, DefaultDict, Any, Iterator, \
    Final, Union, Type, AsyncContextManager, ContextManager

from aiohttp import web, BasicAuth, hdrs
from lark import Lark, Transformer, Token, LarkError
from plotly.offline import get_plotlyjs


# region CONFIG
###


NAME: Final[str] = "syslogsearch"
TRACE: Final[int] = logging.DEBUG - 1


@dataclass(frozen=True)
class Config:
    """Settings from corresponding commandline arguments."""

    verbose: bool = field(default=False, metadata={"help": "enable verbose logging"})
    debug: bool = field(default=False, metadata={"help": "enable even more verbose debug logging"})
    read_only: bool = field(default=False, metadata={"help": "don't actually write received records"})
    reverse: bool = field(default=False, metadata={"help": "order results and pagination by most recent entry first"})
    systemd_notify: bool = field(default=False, metadata={"help": "try to signal systemd readiness"})

    syslog_bind_all: bool = field(default=False, metadata={"help": "bind to 0.0.0.0 instead of localhost only"})
    syslog_port: int = field(default=5140, metadata={"help": "syslog TCP port to listen on", "metavar": "PORT"})
    http_bind_all: bool = field(default=False, metadata={"help": "bind to 0.0.0.0 instead of localhost only"})
    http_port: int = field(default=5141, metadata={"help": "HTTP port to listen on", "metavar": "PORT"})

    http_basic_auth: bool = field(default=False, metadata={"help": "require basic authentication using credentials "
                                                                   "from LOGSEARCH_USER and LOGSEARCH_PASS environment "
                                                                   "variables"})
    http_basic_credentials: Optional[Tuple[str, str]] = field(init=False, default=None)

    https_certfile: Optional[Path] = field(default=None, metadata={"help": "certificate for HTTPS", "metavar": "PEM"})
    https_keyfile: Optional[Path] = field(default=None, metadata={"help": "key file for HTTPS", "metavar": "PEM"})

    data_dir: Path = field(default=Path("./"), metadata={"help": "storage directory", "metavar": "DIR"})
    flush_interval: float = field(default=60.0, metadata={"help": "write out records this often", "metavar": "SEC"})
    max_buffered: int = field(default=1000, metadata={"help": "record limit before early flush", "metavar": "LEN"})
    max_shards: int = field(default=15, metadata={"help": "number of 'daily' files to maintain", "metavar": "NUM"})
    rotate_compress: bool = field(default=False, metadata={"help": "create .gz files for old shards instead of delete"})
    file_format: str = field(default="json", metadata={"choices": ["json", "csv"], "metavar": "EXT",
                                                       "help": "format for new files, either 'json' or 'csv'"})
    json_full: bool = field(default=False, metadata={"help": "assume full objects instead of tuples in JSON mode"})

    filters: Optional[Path] = field(default=None, metadata={"help": "filter query definitions file", "metavar": "TXT"})
    alerts: Optional[Path] = field(default=None, metadata={"help": "alert definitions file", "metavar": "JSON"})

    def __post_init__(self) -> None:
        if self.http_basic_auth:
            super().__setattr__("http_basic_credentials", (os.environ["LOGSEARCH_USER"], os.environ["LOGSEARCH_PASS"]))
            del os.environ["LOGSEARCH_USER"]
            del os.environ["LOGSEARCH_PASS"]

    @classmethod
    def add_args(cls, p: argparse.ArgumentParser) -> None:
        for f in fields(cls):
            flag: str = f"--{f.name.replace('_', '-')}"
            if f.init:
                if f.default is None:  # Union[X, NoneType]
                    p.add_argument(flag, type=get_args(f.type)[0], default=None, **f.metadata)
                elif f.type is bool:
                    p.add_argument(flag, action="store_const", const=not f.default, default=f.default, **f.metadata)
                else:
                    p.add_argument(flag, type=f.type, default=f.default, **f.metadata)

    @classmethod
    def from_args(cls, args: argparse.Namespace) -> 'Config':
        return cls(**{_.name: getattr(args, _.name) for _ in fields(cls) if _.init})


# endregion
# region SYSLOG
###


@dataclass(frozen=True)
class SyslogRecord:
    """Main datastructure as parsing result, API response, and JSON file i/o."""

    timestamp: float
    facility: str
    severity: str

    remote_host: Optional[str]
    hostname: Optional[str]

    application: Optional[str]
    pid: Optional[int]
    mid: Optional[str]

    data: Optional[dict]
    message: str

    __slots__ = ("timestamp", "facility", "severity", "remote_host", "hostname",
                 "application", "pid", "mid", "data", "message")

    def to_dict(self) -> Dict[str, Any]:
        """As API response."""
        return {_: getattr(self, _) for _ in self.fields()}

    def to_row(self) -> Tuple:
        """In storage format."""
        return tuple(getattr(self, _) for _ in self.fields())

    def to_log_line(self) -> bytes:
        """As pre-formatted tail similar to frontend, UTC."""
        return "{} {} {} {}[{}] {}.{}: {}\n".format(
            datetime.fromtimestamp(self.timestamp, tz=timezone.utc).isoformat(),
            self.remote_host or "-", self.hostname or "-", self.application or "-", self.pid or "-",
            self.facility, self.severity, self.message,
        ).encode("utf-8", errors="replace")

    @classmethod
    def from_dict(cls, data: Dict) -> 'SyslogRecord':
        return cls(**data)

    @classmethod
    def from_row(cls, data: List) -> 'SyslogRecord':
        return cls(*data)

    @classmethod
    def fields(cls) -> Tuple[str, ...]:
        return cls.__slots__


class Syslog5424Parser:
    """
    Parser for raw RFC5424 syslog messages into the record datastructure.
    https://datatracker.ietf.org/doc/html/rfc5424
    """

    _header_rx: ClassVar['re.Pattern[bytes]'] = re.compile(
        br'^'
        br'(?:[0-9]+ )?'  # rsyslog '(o)' framing
        br'<(?P<priority>[0-9]{1,3})>'
        br'[0-9]{1,3} '  # version
        br'(?P<timestamp>[!-~]+) '
        br'(?P<hostname>[!-~]{1,255}) '
        br'(?P<application>[!-~]{1,48}) '
        br'(?P<pid>[!-~]{1,128}) '
        br'(?P<mid>[!-~]{1,32}) '
        br'(?:-|\[(?:[^]\\]|\\.)*]) '  # TODO: parse (certain) structured data
    )

    _reserved: Dict[int, int] = str.maketrans("\t\f\r\n", "    ")  # little sense in logs and for CSV

    severities: ClassVar[List[str]] = [
        "emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"
    ]

    facilities: ClassVar[List[str]] = [
        "kern", "user", "mail", "daemon", "auth", "syslog", "lpr", "news", "uucp", "cron", "authpriv", "ftp", "ntp",
        "audit", "alert", "clockd", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"
    ]

    def __init__(self) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)

    def _parse_int(self, value: Optional[str]) -> Optional[int]:
        try:
            return int(value) if value is not None else None
        except Exception as e:
            self._logger.warning(f"Cannot parse integer '{value}': {str(e)}")
            return None

    def _parse_header(self, line: bytes) -> Tuple[bytes, Dict[str, str]]:
        match: Optional[re.Match] = self._header_rx.match(line)
        if match is None:
            raise ValueError("Cannot parse syslog header")

        return line[match.end():], {
            k: v.decode("utf-8", errors="replace")
            for k, v in match.groupdict().items()
            if v and v != b'-'
        }

    def _parse_timestamp(self, timestamp: Optional[str]) -> Optional[float]:
        if timestamp is None:
            return None
        if timestamp.endswith('Z'):
            timestamp = timestamp[:-1] + '+0000'
        try:
            return datetime.strptime(  # TODO: RFC allows non-padded microseconds or milliseconds
                timestamp, '%Y-%m-%dT%H:%M:%S.%f%z' if '.' in timestamp else '%Y-%m-%dT%H:%M:%S%z'
            ).timestamp()
        except (ValueError, OverflowError) as e:
            self._logger.warning(f"Cannot parse timestamp '{timestamp}': {str(e)}")
            return None

    def _parse_message(self, data: bytes) -> str:
        if data.startswith(b'\xEF\xBB\xBF'):  # UTF-8 BOM
            data = data[3:]
        if data.startswith(b' '):  # https://www.rsyslog.com/doc/master/configuration/modules/mmrm1stspace.html
            data = data[1:]
        return data.decode("utf-8", errors="replace").translate(self._reserved).rstrip()

    def _parse_priority(self, priority: Optional[str]) -> Tuple[str, str]:
        if priority is None:
            raise ValueError("No priority")
        try:
            priority_val: int = int(priority)
            return self.severities[priority_val & 0x7], self.facilities[priority_val >> 3]
        except IndexError:
            raise ValueError(f"Cannot parse priority '{priority}'") from None

    def parse(self, line: bytes, remote_host: Optional[str]) -> SyslogRecord:
        self._logger.log(TRACE, line.decode("ascii", errors="replace").rstrip())

        line, header = self._parse_header(line)
        message: str = self._parse_message(line)
        severity, facility = self._parse_priority(header.get('priority', None))
        timestamp: Optional[float] = self._parse_timestamp(header.get('timestamp', None))

        return SyslogRecord(
            facility=facility,
            severity=severity,
            timestamp=timestamp if timestamp is not None else time.time(),
            hostname=header.get('hostname', None),
            application=header.get('application', None),
            pid=self._parse_int(header.get('pid', None)),
            mid=header.get('mid', None),
            data=None,
            message=message,
            remote_host=remote_host,
        )


# endregion
# region QUERY
###


class QueryParseError(ValueError):
    pass


class QueryMatch:
    @abstractmethod
    def __call__(self, record: SyslogRecord) -> bool:
        """Interface for a query tree as parsed from search syntax: Record to boolean."""
        raise NotImplementedError


class QueryOperator:
    @abstractmethod
    def __call__(self, value: str) -> bool:
        raise NotImplementedError


class QueryFactory:
    """Parse search input into query tree."""

    _GRAMMAR: ClassVar[str] = r"""
        ?start: query
              | noop
        noop:

        ?query: subquery_list_and
              | subquery_list_or

        subquery_list_and: subquery ("AND" subquery)+
        subquery_list_or: subquery ("OR" subquery)*

        ?subquery: query_negated
                 | query_inner
                 | match_negated
                 | match

        query_negated: "NOT" "(" query ")"
        ?query_inner: "(" query ")"

        match_negated: "NOT" match
        match: FIELD ":" condition
        condition: IREGEX
                 | REGEX
                 | QSTRING
                 | STRING

        FIELD: "*" | ("_"|LCASE_LETTER)+
        STRING: /[^ \t\f\r\n"()]/+
        QSTRING: ESCAPED_STRING
        IREGEX: "/" /([^\/\\]|\\.)*/ "/i"
        REGEX: "/" /([^\/\\]|\\.)*/ "/"

        %import common.LCASE_LETTER
        %import common.WORD
        %import common.ESCAPED_STRING
        %import common.WS
        %ignore WS
        """

    class QueryContainsOperator(QueryOperator):
        def __init__(self, target: str) -> None:
            self._target: str = target

        def __call__(self, value: str) -> bool:
            return self._target in value

        def __str__(self) -> str:
            return f'"{self._target}"'

    class QueryRegexOperator(QueryOperator):
        def __init__(self, regex: str, sensitive: bool) -> None:
            try:
                self._regex: re.Pattern = re.compile(regex, flags=0 if sensitive else re.IGNORECASE)
            except re.error as e:
                raise QueryParseError(f"Cannot parse '{regex}': {str(e)}") from None

        def __call__(self, value: str) -> bool:
            return self._regex.search(value) is not None

        def __str__(self) -> str:
            return f"/{self._regex.pattern}/{'i' if re.IGNORECASE & self._regex.flags else ''}"

    class QueryNotMatch(QueryMatch):
        def __init__(self, match: QueryMatch) -> None:
            self._match: QueryMatch = match

        def __call__(self, record: SyslogRecord) -> bool:
            return not self._match(record)

        def __str__(self) -> str:
            return f"<NOT {str(self._match)}>"

    class QueryAndMatch(QueryMatch):
        def __init__(self, matches: List[QueryMatch]) -> None:
            self._matches: List[QueryMatch] = matches

        def __call__(self, record: SyslogRecord) -> bool:
            return len(self._matches) > 0 and all(_(record) for _ in self._matches)  # True -> False upon empty

        def __str__(self) -> str:
            return f"<AND {' '.join(str(_) for _ in self._matches) if len(self._matches) else 'False'}>"

    class QueryOrMatch(QueryMatch):
        def __init__(self, matches: List[QueryMatch]) -> None:
            self._matches: List[QueryMatch] = matches

        def __call__(self, record: SyslogRecord) -> bool:
            return len(self._matches) == 0 or any(_(record) for _ in self._matches)  # False -> True upon empty

        def __str__(self) -> str:
            return f"<OR {' '.join(str(_) for _ in self._matches) if len(self._matches) else 'True'}>"

    class QueryFieldMatch(QueryMatch):
        _short_fields: ClassVar[Dict[str, Optional[str]]] = {
            "*": None,
            "client": "remote_host",
            "host": "hostname",
            "app": "application",
            "msg": "message",
        }

        def __init__(self, key: str, operator: QueryOperator) -> None:
            if key in self._short_fields:
                self._key: Optional[str] = self._short_fields[key]
            elif key in SyslogRecord.fields():
                self._key = key
            else:
                raise KeyError(f"Unknown field '{key}'. "
                               f"Expected one of: {', '.join(SyslogRecord.fields())} "
                               f"(or: {', '.join(self._short_fields.keys())})")
            self._operator: QueryOperator = operator

        @classmethod
        def _get_attr(cls, record: SyslogRecord, attr: str) -> str:
            value: Any = getattr(record, attr, None)
            return value if isinstance(value, str) else "" if value is None else str(value)

        def __call__(self, record: SyslogRecord) -> bool:
            if self._key is None:
                return any(self._operator(self._get_attr(record, _)) for _ in SyslogRecord.fields())
            return self._operator(self._get_attr(record, self._key))

        def __str__(self) -> str:
            return f"<{self._key if self._key is not None else '*'}:{str(self._operator)}>"

    class QueryBuilder(Transformer[Token, QueryMatch]):
        """Transformer for creating a query tree on-the-fly while parsing tokens."""

        @classmethod
        def _unescape(cls, s: str) -> str:
            return re.sub(r"\\(.)", r"\1", s)

        def condition(self, s: Tuple[Token]) -> QueryOperator:
            token: Token = s[0]
            if token.type == "STRING":
                return QueryFactory.QueryContainsOperator(token.value)
            elif token.type == "QSTRING":
                return QueryFactory.QueryContainsOperator(self._unescape(token.value[1:-1]))
            elif token.type == "IREGEX":
                return QueryFactory.QueryRegexOperator(self._unescape(token.value[1:-2]), sensitive=False)
            elif token.type == "REGEX":
                return QueryFactory.QueryRegexOperator(self._unescape(token.value[1:-1]), sensitive=True)
            else:  # TODO: severity enum gt/lt support
                raise QueryParseError(f"Unexpected token of type '{token.type}'")

        def noop(self, s: Tuple[()]) -> QueryMatch:
            return QueryFactory.QueryOrMatch([])

        def match(self, s: Tuple[Token, QueryOperator]) -> QueryMatch:
            return QueryFactory.QueryFieldMatch(s[0].value, s[1])

        def match_negated(self, s: Tuple[QueryMatch]) -> QueryMatch:
            return QueryFactory.QueryNotMatch(s[0])

        def query_negated(self, s: Tuple[QueryMatch]) -> QueryMatch:
            return QueryFactory.QueryNotMatch(s[0])

        def subquery_list_or(self, s: Tuple[QueryMatch, ...]) -> QueryMatch:
            return QueryFactory.QueryOrMatch(list(s)) if len(s) != 1 else s[0]

        def subquery_list_and(self, s: Tuple[QueryMatch, ...]) -> QueryMatch:
            return QueryFactory.QueryAndMatch(list(s)) if len(s) != 1 else s[0]

    _inst: ClassVar[Optional['QueryFactory']] = None  # singleton

    def __init__(self) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._parser: Lark = Lark(self._GRAMMAR, start="start", parser="lalr", transformer=QueryFactory.QueryBuilder())

    @classmethod
    def get_inst(cls) -> 'QueryFactory':
        if cls._inst is None:
            cls._inst = QueryFactory()
        return cls._inst

    def parse(self, query: str) -> QueryMatch:
        try:
            match: QueryMatch = self._parser.parse(query)  # type: ignore
        except QueryParseError:
            raise
        except re.error as e:
            raise QueryParseError(f"Cannot parse regex: {str(e)}") from None
        except (ValueError, KeyError, LarkError) as e:
            raise QueryParseError(f"Cannot parse query: {str(e)}") from None
        else:
            self._logger.debug(str(match))
            return match


class Timeline:
    """Counter for timestamps (from records) into fixed-number but variable-width buckets. (Bar-Chart)"""

    def __init__(self, start: float, end: float, samples: int = 250) -> None:
        self._size: int = 0
        self._start: float = start
        self._end: float = max(end, start + 1.0)
        self._samples: int = samples
        self._range: float = self._end - self._start
        self._width: float = self._range / self._samples
        self._scale: float = self._samples / self._range
        self._buckets: Dict[int, int] = {_: 0 for _ in range(self._samples)}  # faster than DefaultDict, similar to List

    def _count(self, timestamp: float, value: int) -> None:
        bucket: float = self._scale * (timestamp - self._start)
        self._buckets[min(max(math.floor(bucket), 0), self._samples - 1)] += value

    def push(self, timestamp: float) -> None:
        self._count(timestamp, 1)
        self._size += 1

    def pop(self, timestamps: List[float]) -> None:
        for timestamp in timestamps:
            self._count(timestamp, -1)
            self._size -= 1

    def to_dict(self) -> Dict:
        return {
            "range": (self._start, self._end, 0, max(self._buckets.values())),
            "x": [self._start + (_ * self._width) + (self._width / 2) for _ in self._buckets.keys()],
            "y": list(self._buckets.values()),
            # "interval": (next((_ for _ in self._buckets.keys() if self._buckets[_]), 0),
            #              next((_ for _ in reversed(self._buckets.keys()) if self._buckets[_]), self._samples - 1)),
        }

    def __len__(self) -> int:
        return self._size


class Stats:
    """Count record property values for statistics. (Pie-Chart)"""

    def __init__(self) -> None:
        self._keys: Tuple[str, ...] = ("all",)
        self._record_keys: Tuple[str, ...] = ("facility", "severity", "hostname", "application")
        self._stats: Dict[str, DefaultDict[Optional[str], int]] = {k: defaultdict(int)
                                                                   for k in self._record_keys + self._keys}

    def count_record(self, record: SyslogRecord) -> None:
        for key in self._record_keys:
            self._stats[key][getattr(record, key)] += 1

    def count_value(self, key: str, value: str, count: int = 1) -> None:
        self._stats[key][value] += count

    def to_dict(self) -> Dict:
        return self._stats


@dataclass(frozen=True)
class LogSearch:
    """Actual search task to be done while scanning input records. Built by API endpoint."""

    start: float
    end: float
    offset: int
    limit: Optional[int]
    query: QueryMatch


@dataclass
class LogSearchResult:
    """Search result from scanning input records. Returned by the API."""

    timeline_all: Timeline
    timeline_query: Timeline
    timeline_result: Timeline
    stats: Stats
    offset: int
    total: int = 0
    results: List[SyslogRecord] = field(default_factory=list)

    def to_dict(self) -> Dict:
        return {
            "timeline_all": self.timeline_all.to_dict(),
            "timeline_query": self.timeline_query.to_dict(),
            "timeline_result": self.timeline_result.to_dict(),
            "stats": self.stats.to_dict(),
            "offset": self.offset,
            "total": self.total,
            "results": [_.to_dict() for _ in self.results],
        }


class LogSearchEvaluator:
    """Run the given search on a stream of records, with result builder and offset/limit logic."""

    def __init__(self, query: LogSearch, reverse: bool) -> None:
        self._query: LogSearch = query
        self._reverse: bool = reverse
        self._result: LogSearchResult = LogSearchResult(timeline_all=Timeline(query.start, query.end),
                                                        timeline_query=Timeline(query.start, query.end),
                                                        timeline_result=Timeline(query.start, query.end),
                                                        stats=Stats(),
                                                        offset=query.offset)

    def get_result(self) -> LogSearchResult:
        if self._reverse and self._query.offset > 0:
            self._result.timeline_result.pop([_.timestamp for _ in self._result.results[:self._query.offset]])
            self._result.results = self._result.results[self._query.offset:]
        self._result.stats.count_value("all", "result",
                                       len(self._result.timeline_result))
        self._result.stats.count_value("all", "query",
                                       len(self._result.timeline_query) - len(self._result.timeline_result))
        self._result.stats.count_value("all", "all",
                                       len(self._result.timeline_all) - len(self._result.timeline_query))
        return self._result

    def evaluate(self, record: SyslogRecord) -> None:
        """Feed a candidate read from file."""

        if self._query.start <= record.timestamp <= self._query.end:
            if self._query.query(record):
                if self._query.limit is not None and self._query.limit <= 0:
                    pass
                elif self._reverse:
                    self._result.timeline_result.push(record.timestamp)
                    self._result.results.insert(0, record)
                    if self._query.limit is not None:
                        if len(self._result.results) > self._query.offset + self._query.limit:
                            self._result.timeline_result.pop([self._result.results[-1].timestamp])
                            self._result.results.pop()
                else:
                    if self._query.offset <= self._result.total:
                        if self._query.limit is None or len(self._result.results) < self._query.limit:
                            self._result.timeline_result.push(record.timestamp)
                            self._result.results.append(record)

                self._result.total += 1
                self._result.stats.count_record(record)
                self._result.timeline_query.push(record.timestamp)
            self._result.timeline_all.push(record.timestamp)


# endregion
# region FILTERS
###


class AlertRuleset:
    """Load alert trigger definitions from dedicated JSON configuration file."""

    @dataclass(frozen=True)
    class Alert:
        pos: int
        name: str
        query: QueryMatch
        command: List[Template]
        limit: float

        def to_dict(self) -> Dict:
            return {"name": self.name, "query": str(self.query), "command": [_.template for _ in self.command]}

        def __str__(self) -> str:
            return f"{self.name}: {str(self.query)} -> {' '.join([_.template for _ in self.command])}"

    def __init__(self, ruleset: Optional[Path]) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._alerts: List[AlertRuleset.Alert] = list(self._load_alerts(ruleset)) if ruleset is not None else []
        for alert in self._alerts:
            self._logger.info(str(alert))

    @classmethod
    def _load_alerts(cls, filename: Path) -> Iterator[Alert]:
        with filename.open("r") as fp:
            ruleset: List[Dict[str, Any]] = json.load(fp)
            if not isinstance(ruleset, list) or not all(isinstance(_, dict) for _ in ruleset):
                raise ValueError(f"Cannot parse '{ruleset}': Expecting object list")

            pos: int = 0
            queries: QueryFactory = QueryFactory.get_inst()
            for rule in ruleset:
                if not isinstance(rule["command"], list) or not all(isinstance(_, str) for _ in rule["command"]):
                    raise ValueError(f"Cannot parse '{ruleset}': Expecting command list")
                pos += 1
                yield AlertRuleset.Alert(pos=pos,
                                         name=str(rule.get("name", "")),
                                         query=queries.parse(str(rule["query"])),
                                         command=[Template(str(_)) for _ in rule["command"]],
                                         limit=float(rule.get("limit", 0.0)))

    def dump(self) -> List[Dict]:
        return [alert.to_dict() for alert in self._alerts]

    def match(self, record: SyslogRecord) -> Iterator[Alert]:
        for alert in self._alerts:
            if alert.query(record):
                yield alert


class AlertDispatcher(ContextManager['AlertDispatcher']):
    """Check parsed records against the alert ruleset. Trigger commands are executed in a separate thread."""

    class Runnable:
        def __init__(self, logger: logging.Logger, alert: AlertRuleset.Alert, record: SyslogRecord) -> None:
            self._logger: logging.Logger = logger
            self._timeout: float = 60.0
            self._name: str = alert.name
            self._command: List[str] = self._interpolate_command(alert.command, record)

        @classmethod
        def _interpolate_command(cls, command: List[Template], record: SyslogRecord) -> List[str]:
            if len(command) < 1:
                raise ValueError("Empty command")
            values: Dict[str, str] = {k: "" if v is None else str(v) for k, v in record.to_dict().items()}
            return [_.safe_substitute(values) for _ in command]

        def run(self) -> None:
            try:
                _: subprocess.CompletedProcess = subprocess.run(
                    self._command, env=None, timeout=self._timeout,
                    shell=False, check=True, close_fds=True,
                    stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                )
            except (OSError, subprocess.SubprocessError) as e:
                self._logger.error(f"Cannot run alert '{self._name}': {str(e)}")
            else:
                self._logger.info(f"Triggered alert '{self._name}'")

    def __init__(self, alerts: AlertRuleset) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._queue: Queue[Optional[AlertDispatcher.Runnable]] = Queue()
        self._thread: Optional[threading.Thread] = None
        self._alerts: AlertRuleset = alerts
        self._last_seen: DefaultDict[int, float] = defaultdict(float)

    def check(self, record: SyslogRecord) -> None:
        now: float = time.monotonic()
        for alert in self._alerts.match(record):
            if alert.limit > 0.0 and self._last_seen[alert.pos] > now - alert.limit:
                continue
            self._last_seen[alert.pos] = now
            self._queue.put_nowait(AlertDispatcher.Runnable(self._logger, alert, record))

    def _worker(self) -> None:
        while True:
            task: Optional[AlertDispatcher.Runnable] = self._queue.get(block=True)
            try:
                if task is None:
                    break
                task.run()
            finally:
                self._queue.task_done()

    def __enter__(self) -> 'AlertDispatcher':
        assert self._thread is None
        self._thread = threading.Thread(target=self._worker, name=self.__class__.__name__)
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        assert self._thread is not None
        self._logger.debug("Shutting down")
        self._queue.put_nowait(None)  # sentinel
        self._queue.join()
        self._thread.join()
        self._thread = None
        return False


class FilterRuleset:
    """Load filter query definitions from dedicated text file."""

    def __init__(self, ruleset: Optional[Path]) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._filters: List[QueryMatch] = list(self._load_queries(ruleset)) if ruleset is not None else []
        for query in self._filters:
            self._logger.info(str(query))

    @classmethod
    def _load_queries(cls, filename: Path) -> Iterator[QueryMatch]:
        queries: QueryFactory = QueryFactory.get_inst()
        with filename.open("r") as fp:
            for line in fp:
                line = line.strip()
                if line and not line.startswith("#"):
                    yield queries.parse(line)

    def dump(self) -> List[str]:
        return [str(_) for _ in self._filters]

    def match(self, record: SyslogRecord) -> bool:
        for query in self._filters:
            if query(record):
                return True
        return False


# endregion
# region FILES
###


class Serializer:
    """Conversion of record fields from/into storage format."""

    @classmethod
    @abstractmethod
    def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
        raise NotImplementedError

    @classmethod
    @abstractmethod
    def decode(cls, line: str) -> SyslogRecord:
        raise NotImplementedError


class CompactJSONSerializer(Serializer):
    """
    Most compact tuple representation by whitespace-less JSON lists
    """

    _decoder: ClassVar[json.JSONDecoder] = json.JSONDecoder(strict=False)
    _encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False, check_circular=False,
                                                            allow_nan=False, separators=(',', ':'))

    @classmethod
    def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
        for chunk in cls._encoder.iterencode(record.to_row(), _one_shot=True):
            yield chunk.encode("utf-8", errors="replace")
        yield b"\n"

    @classmethod
    def decode(cls, line: str) -> SyslogRecord:
        try:
            return SyslogRecord.from_row(cls._decoder.raw_decode(line)[0])
        except (UnicodeError, json.JSONDecodeError, ValueError, TypeError) as e:
            raise ValueError(f"Cannot decode JSON: {repr(e)}") from None


class FullJSONSerializer(Serializer):
    """
    Full JSON object representation, which always repeats keys.
    """

    _decoder: ClassVar[json.JSONDecoder] = json.JSONDecoder(strict=False)
    _encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False)

    @classmethod
    def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
        for chunk in cls._encoder.iterencode(record.to_dict(), _one_shot=True):
            yield chunk.encode("utf-8", errors="replace")
        yield b"\n"

    @classmethod
    def decode(cls, line: str) -> SyslogRecord:
        try:
            return SyslogRecord.from_dict(cls._decoder.raw_decode(line)[0])
        except (UnicodeError, json.JSONDecodeError, ValueError, TypeError) as e:
            raise ValueError(f"Cannot decode JSON: {repr(e)}") from None


class CSVSerializer(Serializer):
    """
    Tab-separated CSV dialect parser that supports None, str, int, and float values.
    Assumes no newlines, tabs, or similar are present. Does not escape any other special or unicode chars.
    """

    @classmethod
    def _encode(cls, values: Tuple) -> Iterator[bytes]:
        for value in values:
            if value is None:
                yield b""  # as we quote all strings, we can distinguish None from an empty string
            elif isinstance(value, (float, int)):
                yield str(value).encode("utf-8", errors="strict")
            elif isinstance(value, str):
                escaped: str = value.replace("\\", "\\\\").replace("\"", "\\\"")
                yield b"".join((b"\"", escaped.encode("utf-8", errors="replace"), b"\""))
            else:
                raise ValueError(f"Unsupported CSV value: '{value}'")

    @classmethod
    def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
        yield b"\t".join(cls._encode(record.to_row()))
        yield b"\n"

    @classmethod
    def _decode(cls, row: str) -> Iterator:
        for value in row.split("\t"):
            if not len(value):
                yield None
            elif value.startswith("\"") and value.endswith("\"") and len(value) >= 2:
                yield value[1:-1].replace("\\\"", "\"").replace("\\\\", "\\")
            else:
                yield float(value) if "." in value else int(value)

    @classmethod
    def decode(cls, row: str) -> SyslogRecord:
        if not row.endswith("\n"):
            raise ValueError("Truncated CSV line")
        return SyslogRecord.from_row(list(cls._decode(row[:-1])))


class FileCompressor(ContextManager['FileCompressor']):
    """Compress or delete old shard files."""

    def __init__(self, enabled: bool) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._enabled: bool = enabled
        self._buf_size: int = 4096*16
        self._executor: Optional[ThreadPoolExecutor] = None

    def __enter__(self) -> 'FileCompressor':
        assert self._executor is None
        if self._enabled:
            self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=self.__class__.__name__)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        if self._executor is not None:
            self._executor.shutdown(wait=True)
            self._executor = None
        return False

    def _fallback(self, file: Path) -> None:
        try:
            file.unlink(missing_ok=True)
        except OSError as e:
            self._logger.warning(f"Cannot delete {file}: {str(e)}")
        else:
            self._logger.info(f"Deleted {file}")

    def _handle(self, file: Path) -> None:
        try:
            temp: Path = file.with_suffix(file.suffix + ".gz.tmp")
            target: Path = file.with_suffix(file.suffix + ".gz")
            try:
                with file.open("rb", buffering=self._buf_size) as in_fp:
                    with gzip.GzipFile(temp, "wb", mtime=0) as out_fp:
                        while True:
                            buf: bytes = in_fp.read(self._buf_size)
                            if not buf:
                                break
                            out_fp.write(buf)
                temp.rename(target)
            except OSError:
                temp.unlink(missing_ok=True)
                target.unlink(missing_ok=True)
                raise
            else:
                file.unlink(missing_ok=True)
                self._logger.info(f"Created {target}")
        except OSError as e:
            self._logger.warning(f"Cannot compress {file}: {str(e)}")
            self._fallback(file)

    def handle(self, file: Path) -> None:
        if self._executor is not None:
            self._executor.submit(self._handle, file)
        else:
            self._fallback(file)


class FileRegistry:
    """Maintain an upper-limit of timestamp-based JSON files/shards."""

    @dataclass(frozen=True)
    class ShardFile:
        timestamp: int
        shard: str
        path: Path

        def __lt__(self, other: 'FileRegistry.ShardFile') -> bool:
            return self.timestamp < other.timestamp if isinstance(other, FileRegistry.ShardFile) else NotImplemented

    def __init__(self, root: Path, fmt: str, json_full: bool,
                 jitter: float, max_shards: int, rotate_cb: Callable[[Path], None]) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._root: Path = Path(root)
        self._jitter_margin: float = jitter * 2.0
        self._json_full: bool = json_full
        self._fmt: str = fmt
        self._shard_format: str = "%d-%m-%Y"  # daily, sortable
        self._shard_glob: str = "*.log.*"
        self._shard_re: re.Pattern = re.compile(r"[0-9]+-[0-9]+-[0-9]+_[0-9]+\.log\.(json|csv)")
        self._max_shards: int = max_shards
        self._rotate_cb: Callable[[Path], None] = rotate_cb
        self._index: List[FileRegistry.ShardFile] = sorted(self._crawl())
        self._lock: threading.Lock = threading.Lock()
        self._logger.info(f"Found {len(self._index)} files in {self._root}")

    def _crawl(self) -> Iterator[ShardFile]:
        for fn in self._root.glob(self._shard_glob):  # type: Path
            if self._shard_re.fullmatch(fn.name) is not None:
                shard, ts = fn.stem.split("_")
                yield FileRegistry.ShardFile(int(ts.split(".")[0]), shard, fn)

    def get_start(self) -> Optional[float]:
        """Smallest timestamp to be expected (from the oldest file)."""
        with self._lock:
            return float(self._index[0].timestamp - self._jitter_margin) if len(self._index) else None

    def get_current(self) -> Path:
        """Current file to write to. Possibly create a new entry and rotate/delete old ones."""
        now: datetime = datetime.now(timezone.utc)
        ts: int = math.floor(now.timestamp())
        shard: str = now.strftime(self._shard_format)
        with self._lock:
            if not len(self._index) or self._index[-1].shard != shard:
                while len(self._index) >= self._max_shards:  # maintenance on old shards whenever we add a new one
                    old: FileRegistry.ShardFile = self._index.pop(0)
                    self._rotate_cb(old.path)

                filename: Path = self._root / Path(f"{shard}_{ts}").with_suffix(f".log.{self._fmt}")
                self._index.append(FileRegistry.ShardFile(ts, shard, filename))
                self._logger.info(f"Creating {filename}")
            return self._index[-1].path

    def get_candidates(self, query_start: float, query_end: float) -> Iterator[Path]:
        """Files to search, which might contain records from the given timestamp interval."""
        results: List[Path] = []
        with self._lock:
            for i in range(len(self._index)):
                entry: FileRegistry.ShardFile = self._index[i]
                file_end: Optional[int] = self._index[i + 1].timestamp if i + 1 < len(self._index) else None
                if file_end is None or query_start <= file_end + self._jitter_margin:
                    if query_end >= entry.timestamp - self._jitter_margin:
                        results.append(entry.path)
        yield from results  # NB: not yielding during lock

    def get_serializer(self, filename: Path) -> Type[Serializer]:
        if filename.suffix == ".json":
            return FullJSONSerializer if self._json_full else CompactJSONSerializer
        elif filename.suffix == ".csv":
            return CSVSerializer
        else:
            raise RuntimeError(f"Unsupported file extension in '{filename}'")


class FileWriter(ContextManager['FileWriter']):
    """Single-thread-based write queue for records to file, as async file i/o is not a real thing in python anyway."""

    class QueueMessage(Enum):
        eof = "eof"  # shutdown sentinel
        flush = "flush"

    def __init__(self, files: FileRegistry, read_only: bool, max_buffered: int, flush_interval: float) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)

        self._enabled: bool = not read_only
        self._max_buffered: int = max_buffered
        self._flush_interval: float = flush_interval
        self._files: FileRegistry = files

        self._queue: Queue[Union[SyslogRecord, FileWriter.QueueMessage]] = Queue()
        self._thread: Optional[threading.Thread] = None

    def _worker(self) -> None:
        last_flush: float = time.monotonic()
        buffer: List[SyslogRecord] = []
        done: bool = False

        while len(buffer) or not done:
            try:
                timeout: float = max(1.0, last_flush + self._flush_interval - time.monotonic())
                record: Union[SyslogRecord, FileWriter.QueueMessage] = self._queue.get(block=True, timeout=timeout)
                if record is FileWriter.QueueMessage.eof:
                    done = True
                elif record is FileWriter.QueueMessage.flush:
                    last_flush = 0.0
                else:
                    assert isinstance(record, SyslogRecord)
                    self._logger.log(TRACE, str(record))
                    buffer.append(record)
            except QueueEmpty:
                pass
            else:
                self._queue.task_done()

            now: float = time.monotonic()
            if len(buffer) >= self._max_buffered or last_flush <= now - self._flush_interval or done:
                last_flush = now
                if len(buffer):
                    if self._enabled:
                        self._write(buffer)
                    buffer.clear()
                collect()

    def _write(self, records: List[SyslogRecord]) -> None:
        filename: Path = self._files.get_current()
        try:
            encoder: Type[Serializer] = self._files.get_serializer(filename)
            with filename.open("ab", buffering=4096 * 16) as fp:  # TODO: keep (text) fd open
                for record in records:
                    for chunk in encoder.encode(record):
                        fp.write(chunk)
                fp.flush()
        except (OSError, ValueError, RuntimeError) as e:
            self._logger.error(f"Cannot write to {filename}: {str(e)}")
        else:
            self._logger.log(TRACE, f"Flushed {len(records)} records to {filename}")

    def __enter__(self) -> 'FileWriter':
        assert self._thread is None
        self._thread = threading.Thread(target=self._worker, name=self.__class__.__name__)
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        assert self._thread is not None
        self._logger.debug("Shutting down")
        self._queue.put_nowait(FileWriter.QueueMessage.eof)
        self._queue.join()
        self._thread.join()
        self._thread = None
        return False

    def put(self, record: SyslogRecord) -> None:
        assert self._thread is not None
        self._queue.put_nowait(record)  # TODO: wait, full, and timeout?

    def flush(self) -> None:
        assert self._thread is not None
        self._queue.put_nowait(FileWriter.QueueMessage.flush)


class FileReader(ContextManager['FileReader']):
    """Thread-pool-based searching through files for query matches."""

    def __init__(self, files: FileRegistry, reverse: bool, max_workers: int = 4) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._files: FileRegistry = files
        self._reverse: bool = reverse
        self._max_workers: int = max_workers
        self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
        self._executor: Optional[ThreadPoolExecutor] = None

    def __enter__(self) -> 'FileReader':
        assert self._executor is None
        self._executor = ThreadPoolExecutor(max_workers=self._max_workers, thread_name_prefix=self.__class__.__name__)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        assert self._executor is not None
        self._executor.shutdown(wait=True)
        self._executor = None
        return False

    def _search(self, query: LogSearch) -> LogSearchResult:
        start: float = time.perf_counter()
        evaluator: LogSearchEvaluator = LogSearchEvaluator(query, self._reverse)
        for filename in self._files.get_candidates(query.start, query.end):
            try:
                decoder: Type[Serializer] = self._files.get_serializer(filename)
                with filename.open("r", buffering=4096*16, newline="\n", encoding="utf-8") as fp:
                    self._logger.log(TRACE, f"Scanning {filename}")
                    for line in fp:
                        if not line.endswith("\n"):
                            break  # dirty read
                        evaluator.evaluate(decoder.decode(line))
            except (OSError, ValueError) as e:
                self._logger.error(f"Cannot search {filename}: {str(e)}")

        result: LogSearchResult = evaluator.get_result()
        duration: int = math.ceil((time.perf_counter() - start) * 1000)
        self._logger.debug(f"Returning {len(result.results)} of {result.total} results after {duration}ms")
        return result

    async def search(self, query: LogSearch) -> LogSearchResult:
        assert self._executor is not None
        return await asyncio.wrap_future(self._executor.submit(self._search, query))


# endregion
# region SERVER
###


class SyslogServer(AsyncContextManager['SyslogServer']):
    """TCP server for syslog parsing with new record callback to write queue."""

    def __init__(self, exclude_cb: Callable[[SyslogRecord], bool], cb: List[Callable[[SyslogRecord], None]],
                 bind_all: bool = False, port: int = 5140) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._host: str = "0.0.0.0" if bind_all else "127.0.0.1"
        self._port: int = port
        self._server: Optional[asyncio.AbstractServer] = None
        self._parser: Syslog5424Parser = Syslog5424Parser()
        self._exclude_cb: Callable[[SyslogRecord], bool] = exclude_cb
        self._cb: List[Callable[[SyslogRecord], None]] = cb

    async def __aenter__(self) -> 'SyslogServer':
        try:
            assert self._server is None
            self._server = await asyncio.start_server(self._handler, self._host, self._port,
                                                      family=socket.AF_INET, flags=socket.IPPROTO_TCP,
                                                      reuse_address=True, reuse_port=True,
                                                      start_serving=True)
        except OSError as e:
            raise RuntimeError(f"Cannot start syslog server: {str(e)}") from None
        else:
            self._logger.info(f"Listening at {self._host}:{self._port}")
            return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        assert self._server is not None
        self._logger.debug("Shutting down")
        self._server.close()
        await self._server.wait_closed()
        self._server = None
        return False

    async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        remote_addr, remote_port = writer.get_extra_info('peername')  # type: str, int
        self._logger.debug(f"New connection from {remote_addr}:{remote_port}")

        while True:
            try:
                data: bytes = await reader.readline()
                if not data.endswith(b"\n"):
                    self._logger.debug(f"EOF from {remote_addr}:{remote_port}")
                    break
                record: SyslogRecord = self._parser.parse(data, remote_addr)
                if not self._exclude_cb(record):
                    for callback in self._cb:
                        callback(record)
            except OSError as e:
                self._logger.warning(f"Cannot read from {remote_addr}: {str(e)}")
                break
            except ValueError as e:
                self._logger.warning(f"Cannot parse syslog line from {remote_addr}: {str(e)}")
                break

        writer.close()
        await writer.wait_closed()


class ApiHandler:
    """Handler methods for API requests from aiohttp webserver."""

    _encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False, separators=(',', ':'),
                                                            check_circular=False, allow_nan=False)
    _query_factory: ClassVar[QueryFactory] = QueryFactory.get_inst()
    _compression_threshold: ClassVar[int] = 10 * 1024

    class JsonResponse(web.Response):
        def __init__(self, status: int, data: Union[Dict, List]) -> None:
            super().__init__(status=status, body=ApiHandler._encoder.encode(data),
                             content_type="application/json", charset="utf-8",
                             zlib_executor_size=ApiHandler._compression_threshold)
            self.enable_compression()

    class StreamedPlainResponse(web.StreamResponse):
        def __init__(self, status: int) -> None:
            super().__init__(status=status)
            self.content_type = "text/plain"
            self.charset = "utf-8"
            self.enable_compression()

        async def write_all(self, request: web.Request, data: Iterator[bytes]) -> web.StreamResponse:
            await self.prepare(request)
            try:
                for chunk in data:
                    await self.write(chunk)
            finally:
                await self.write_eof()
            return self

    class HtmlResponse(web.Response):
        def __init__(self, status: int, data: bytes = b"", etag: Optional[str] = None) -> None:
            super().__init__(status=status, body=data,
                             content_type="text/html", charset="utf-8",
                             zlib_executor_size=ApiHandler._compression_threshold)
            if len(data) >= ApiHandler._compression_threshold:
                self.enable_compression()
            if etag is not None:
                self.headers.update({
                    "ETag": f'"{etag}"',
                    "Cache-Control": "public, must-revalidate, max-age=604800",
                    "Vary": "Accept-Encoding",
                })

    def __init__(self, alerts: AlertRuleset, filters: FilterRuleset,
                 files: FileRegistry, writer: FileWriter, reader: FileReader) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._alerts: AlertRuleset = alerts
        self._filters: FilterRuleset = filters
        self._files: FileRegistry = files
        self._writer: FileWriter = writer
        self._reader: FileReader = reader

    async def _search(self, request: web.Request) -> LogSearchResult:
        query: QueryMatch = self._query_factory.parse(request.query.get("q", ""))
        end: float = float(request.query.get("e", "") or math.ceil(time.time()))
        start: float = float(request.query.get("s", "") or self._files.get_start() or (end - 3600.0))
        offset: int = int(request.query.get("o", "0") or 0)
        limit: Optional[int] = int(request.query["l"]) if request.query.get("l", "") else None

        self._writer.flush()
        return await self._reader.search(query=LogSearch(
            start=min(start, end), end=max(start, end), offset=offset, limit=limit, query=query
        ))

    async def _handle_search(self, request: web.Request) -> web.StreamResponse:
        try:
            result: LogSearchResult = await self._search(request)
            return self.JsonResponse(status=200, data=result.to_dict())
        except (QueryParseError, ValueError, RuntimeError) as e:
            self._logger.warning(str(e))
            return self.JsonResponse(status=200, data={"error": str(e)})
        except Exception as e:
            self._logger.error(repr(e))
            return self.JsonResponse(status=500, data={"error": e.__class__.__name__})

    async def _handle_tail(self, request: web.Request) -> web.StreamResponse:
        try:
            result: LogSearchResult = await self._search(request)
            return await self.StreamedPlainResponse(status=200).write_all(
                request, (record.to_log_line() for record in result.results)
            )
        except (QueryParseError, ValueError, RuntimeError) as e:
            self._logger.warning(str(e))
            return self.JsonResponse(status=200, data={"error": str(e)})
        except Exception as e:
            self._logger.error(repr(e))
            return self.JsonResponse(status=500, data={"error": e.__class__.__name__})

    async def _handle_flush(self, request: web.Request) -> web.Response:
        self._writer.flush()
        return self.HtmlResponse(status=202)

    async def _handle_alerts(self, request: web.Request) -> web.Response:
        return self.JsonResponse(status=200, data=self._alerts.dump())

    async def _handle_filters(self, request: web.Request) -> web.Response:
        return self.JsonResponse(status=200, data=self._filters.dump())

    async def _handle_index(self, request: web.Request) -> web.Response:
        if request.headers.get("ETag", "") == f'"{INDEX_ETAG}"':
            return self.HtmlResponse(status=304, data=b"", etag=INDEX_ETAG)
        return self.HtmlResponse(status=200, data=INDEX_HTML, etag=INDEX_ETAG)

    def get_routes(self) -> List[web.RouteDef]:
        return [
            web.route("GET", "/", self._handle_index),
            web.route("GET", "/api/search", self._handle_search),
            web.route("POST", "/api/search", self._handle_search),
            web.route("GET", "/api/tail", self._handle_tail),
            web.route("POST", "/api/tail", self._handle_tail),
            web.route("GET", "/api/flush", self._handle_flush),
            web.route("POST", "/api/flush", self._handle_flush),
            web.route("GET", "/api/alerts", self._handle_alerts),
            web.route("GET", "/api/filters", self._handle_filters),
        ]


class ApiRunner(AsyncContextManager['ApiRunner']):
    """Run aiohttp webserver for registered API route handlers."""

    def __init__(self, routes: List[web.RouteDef], bind_all: bool = False, port: int = 5141,
                 certfile: Optional[Path] = None, keyfile: Optional[Path] = None,
                 auth: Optional[Tuple[str, str]] = None) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._host: str = "0.0.0.0" if bind_all else "127.0.0.1"
        self._port: int = port
        self._auth: Optional[BasicAuth] = BasicAuth(login=auth[0], password=auth[1]) if auth is not None else None
        self._ssl: Optional[ssl.SSLContext] = None
        if certfile is not None:
            self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            self._ssl.load_cert_chain(certfile=certfile, keyfile=keyfile)

        self._app: web.Application = web.Application(middlewares=[self._auth_middleware])
        self._app.add_routes(routes)
        self._runner = web.AppRunner(self._app)
        self._site: Optional[web.BaseSite] = None

    @web.middleware
    async def _auth_middleware(self, request: web.Request, handler) -> web.Response:
        if self._auth is not None:
            try:
                authorization: Optional[str] = request.headers.get(hdrs.AUTHORIZATION)
                auth: Optional[BasicAuth] = BasicAuth.decode(authorization) if authorization is not None else None
            except ValueError:
                return web.Response(status=400)
            if auth is None or not compare_digest(auth, self._auth):
                return web.Response(status=401, headers={hdrs.WWW_AUTHENTICATE: f'Basic realm="{NAME}"'})
        return await handler(request)

    async def __aenter__(self) -> 'ApiRunner':
        assert self._site is None
        await self._runner.setup()
        self._site = web.TCPSite(self._runner, self._host, self._port, ssl_context=self._ssl)
        await self._site.start()
        self._logger.info("Listening at {}://{}:{}/ (authentication {})".format(
            "https" if self._ssl is not None else "http",
            self._host, self._port,
            "enabled" if self._auth is not None else "disabled"
        ))
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        assert self._site is not None
        self._logger.debug("Shutting down")
        await self._runner.cleanup()
        self._site = None
        return False


# endregion
# region MAIN
###


class SystemdNotifier(AsyncContextManager[None]):
    """Send ready/stopping commands to the systemd socket, if present."""

    def __init__(self, enabled: bool) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
        self._enabled: bool = enabled

        self._notify_path: Optional[str] = os.getenv("NOTIFY_SOCKET") or None
        if self._notify_path is not None and self._notify_path.startswith("@"):  # abstract namespace socket
            self._notify_path = "\0" + self._notify_path[1:]
        if self._enabled and self._notify_path is None:
            self._logger.warning("Cannot get NOTIFY_SOCKET")

    async def _send(self, unix_path: str, command: str) -> bool:
        sock: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        sock.setblocking(False)

        try:
            await self._loop.sock_connect(sock, unix_path)
            await self._loop.sock_sendall(sock, f"{command}\n".encode(encoding="utf-8", errors="strict"))
        except (OSError, UnicodeError) as e:
            self._logger.warning(f"Cannot send {command}: {str(e)}")
            return False
        else:
            self._logger.info(f"Sent {command}")
            return True
        finally:
            sock.close()

    async def __aenter__(self) -> None:
        if self._enabled and self._notify_path is not None:
            await self._send(self._notify_path, "READY=1")

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
        if self._enabled and self._notify_path is not None:
            await self._send(self._notify_path, "STOPPING=1")
        return False


class QueuedLogger(ContextManager[None]):
    """Move logging i/o into a separate thread."""

    def __init__(self, logger: logging.Logger) -> None:
        self._logger: logging.Logger = logger
        self._queue: Queue[Optional[logging.LogRecord]] = Queue()
        self._handler: logging.Handler = QueueHandler(self._queue)
        self._listener: QueueListener = QueueListener(self._queue, *logger.handlers)
        self._logger.handlers.clear()
        self._logger.handlers.append(self._handler)

    def __enter__(self) -> None:
        self._listener.start()

    def __exit__(self, *args: Any) -> None:
        self._listener.stop()
        self._queue.join()
        self._handler.close()
        for handler in self._listener.handlers:
            handler.close()
        self._listener.handlers = tuple()


async def wait_for_signal() -> None:
    """Async blocking until shutdown signal interrupt."""

    logger: logging.Logger = logging.getLogger(NAME)
    loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
    shutdown_requested: asyncio.Event = asyncio.Event()

    def _handler(signum: int) -> None:
        logger.debug(f"Received signal {signum}")
        shutdown_requested.set()

    for s in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
        loop.add_signal_handler(s, _handler, s)

    await shutdown_requested.wait()
    logger.info("Exiting")


async def run(config: Config) -> int:
    """Main entrypoint."""

    logging.addLevelName(TRACE, "TRACE")
    logging.basicConfig(level=TRACE if config.debug else logging.DEBUG if config.verbose else logging.INFO,
                        format="%(levelname)s %(name)s: %(message)s")
    with QueuedLogger(logging.getLogger()):

        os.umask(0o027)
        alerts: AlertRuleset = AlertRuleset(config.alerts)
        filters: FilterRuleset = FilterRuleset(config.filters)
        with FileCompressor(config.rotate_compress) as rotate_compressor:
            files: FileRegistry = FileRegistry(root=config.data_dir, fmt=config.file_format, json_full=config.json_full,
                                               jitter=config.flush_interval, max_shards=config.max_shards,
                                               rotate_cb=rotate_compressor.handle)

            with FileWriter(files, read_only=config.read_only,
                            max_buffered=config.max_buffered, flush_interval=config.flush_interval) as writer:
                with FileReader(files, reverse=config.reverse) as reader:
                    with AlertDispatcher(alerts) as checker:
                        async with SyslogServer(filters.match, [writer.put, checker.check],
                                                bind_all=config.syslog_bind_all, port=config.syslog_port):
                            api: ApiHandler = ApiHandler(alerts, filters, files, writer, reader)
                            async with ApiRunner(api.get_routes(),
                                                 bind_all=config.http_bind_all, port=config.http_port,
                                                 certfile=config.https_certfile, keyfile=config.https_keyfile,
                                                 auth=config.http_basic_credentials):
                                async with SystemdNotifier(enabled=config.systemd_notify):
                                    await wait_for_signal()

    return 0


# endregion
# region FRONTEND
###


# language=CSS
_INDEX_CSS: str = r"""
    :root {
      --pad: 1em;
      --hpad: 0.5em;
      --border: #dddddd;
      --bg: #eeeeee;
    }

    html, body {
        background-color: var(--bg);
        color: #000000;
        margin: 0;
        padding: 0;
    }
    body, input {
        font-family: monospace;
    }
    form, fieldset {
        border: none;
        padding: 0;
        margin: 0;
    }

    .widget, .plot {
        background-color: #ffffff;
        border: var(--border) 1px solid;
        border-radius: var(--hpad);
        overflow: hidden;
    }
    .element {
        margin: var(--pad);
    }

    #error {
        display: block;
        text-align: center;
        color: #ff0000;
    }
    #error:empty {
        display: none;
    }
    #query-help {
        display: none;
        text-align: center;
    }
    #query-help.visible {
        display: block;
    }

    #searchbar {
        padding: 0 var(--pad);
        position: sticky;
        top: var(--pad);
        z-index: 1;
        box-shadow: 0 0 0.25em 0.25em var(--bg);
    }

    #query-date, #query-line, #error, #query-help {
        width: 100%;
        margin: var(--pad) 0;
    }

    #query-line {
        display: flex;
        gap: var(--pad);
        align-items: stretch;
    }
    #query-line #query {
        flex-grow: 1;
    }
    #query-date {
        display: flex;
        gap: var(--pad);
        flex-wrap: wrap;
        justify-content: center;
    }

    .plot {
        height: 200px;
    }
    #stats {
        display: flex;
        flex-flow: row wrap;
        gap: var(--pad);
    }
    #stats .plot {
        min-width: 250px;
        flex-grow: 1;
        box-sizing: border-box;
    }
    #stats .plot .modebar-container {
        display: none;
    }

    #nav fieldset {
        display: flex;
        flex-wrap: wrap;
        gap: var(--hpad);
        margin: var(--pad);
        align-items: stretch;
    }
    #nav fieldset > :first-child {
        flex-grow: 1;
        display: table;
    }
    #results {
        white-space: nowrap;
        display: table-cell;
        vertical-align: middle;
    }

    #log {
        max-height: 70vh;
        white-space: pre;
        overflow: scroll;
        padding: var(--pad);
    }
    #log p {
        margin: 0;
    }

    .element.empty, .element:empty, #stats.empty, .plot.empty {
        display: none;
    }
    fieldset:disabled span, fieldset:disabled #error {
        color: #aaaaaa;
    }

    #spinner {
        display: none;
        position: absolute;
        bottom: 0; left: 0;
        height: 2px; width: 100%;
        background-color: #8fbbda;
        background-repeat: no-repeat;
        background-image: linear-gradient(#ffffff 0 0), linear-gradient(#ffffff 0 0);
        background-size: 60% 100%;
        animation: css-loaders-progress-16 3s infinite;
    }
    @keyframes css-loaders-progress-16 {
        0%   {background-position:-150% 0,-150% 0}
        66%  {background-position: 250% 0,-150% 0}
        100% {background-position: 250% 0, 250% 0}
    }
"""  # noqa

# language=JS
_INDEX_JS: str = r"""
    "use_strict";
    const $ = document.getElementById.bind(document);

    function log_format(record) {
        return new Date(record.timestamp * 1000).toString().split(" (")[0] + " " +
            (record.remote_host || "-") + " " + (record.hostname || "-") + " " + (record.application || "-") + "[" + (record.pid || "-") + "] " +
            record.facility + "." + record.severity + ": " + record.message + "\n";
    }

    function date_from_offset(offset) {
        let now = new Date();
        now.setTime(now.getTime() - (offset * 1000));
        now.setMinutes(now.getMinutes() - now.getTimezoneOffset());
        return now.toISOString().slice(0,19);
    }

    function date_from_timestamp(ts) {
        let dt = new Date(parseInt(ts) * 1000);
        dt.setMinutes(dt.getMinutes() - dt.getTimezoneOffset());
        return dt.toISOString().slice(0,19);
    }

    function date_to_timestamp(value) {
        return String(Date.parse(value) / 1000);
    }

    function build_table(results) {
        const log = $("log");
        if (results === null) {
            log.classList.add("empty")
            return;
        }

        const fragment = new DocumentFragment();
        for (let i=0; i<results.length; i++) {
            fragment.append(document.createTextNode(log_format(results[i])));
        }

        log.replaceChildren(fragment);
        log.scrollTo(0, 0);
        log.classList.remove("empty")
    }

    function build_nav(data) {
        if (data === null) {
            $("results").textContent = $("offset").value = $("total").value = "";
            $("nav-prev").disabled = $("nav-start").disabled = true;
            $("nav-next").disabled = $("nav-end").disabled = true;
        } else {
            $("results").textContent = data.results.length? String(data.offset + 1) + " - " + String(data.offset + data.results.length) + " / " + String(data.total): String(data.total);
            $("offset").value = data.offset;
            $("total").value = data.total;
            $("nav-prev").disabled = $("nav-start").disabled = data.offset <= 0;
            $("nav-next").disabled = $("nav-end").disabled = data.offset + data.results.length >= data.total;
        }
    }

    function build_plots(stats) {
        const widget = $("stats");
        widget.classList.add("empty");
        if (stats === null) {
            return;
        }

        for (const key in stats) {
            const panel = $("plot-" + key);
            if (Object.keys(stats[key]).length === 0 || Math.max(...Object.values(stats[key])) === 0) {
                panel.classList.add("empty");
                continue;
            } else {
                widget.classList.remove("empty");
                panel.classList.remove("empty");
            }

            const color_discrete_map = {all: "rgb(192, 192, 192)", query: "rgb(143, 187, 218)", result: "rgb(31, 119, 180)"};
            Plotly.react(panel,
                [{
                    type: "pie", textinfo: "label", textposition: "inside",
                    labels: Object.keys(stats[key]), values: Object.values(stats[key]),
                    marker: {
                        colors: (key === "all")?
                        Object.keys(stats[key]).map(label => color_discrete_map[label]):
                        ['rgb(102, 197, 204)', 'rgb(246, 207, 113)', 'rgb(248, 156, 116)', 'rgb(220, 176, 242)', 'rgb(135, 197, 95)', 'rgb(158, 185, 243)', 'rgb(254, 136, 177)', 'rgb(201, 219, 116)', 'rgb(139, 224, 164)', 'rgb(180, 151, 231)', 'rgb(179, 179, 179)'] // Pastel
                    },
                }], {
                    autosize: true, margin: {b: 8, r: 8, l: 8, t: 8}, showlegend: false,
                    paper_bgcolor: "rgba(255,255,255,0)", plot_bgcolor: "rgba(255,255,255,0)",
                    title: {
                        text: panel.dataset.title, font: {size: 12},
                        x: 0, y: 1, xanchor: 'left', 'yanchor': 'top', pad: {l: 8, t: 8},
                        yref: 'container', automargin: false,
                    },
                }, {responsive: true},
            );
            window.requestAnimationFrame(function(){ Plotly.relayout(panel, {}); });
        }
    }

    function build_timeline(timeline_all, timeline_query, timeline_result) {
        const panel = $("plot-timeline");
        if (timeline_all === null || timeline_query === null || timeline_result === null || timeline_all.x.length === 0) {
            panel.classList.add("empty");
            return;
        } else {
            panel.classList.remove("empty");
        }

        Plotly.react(panel,
            [
                {type: "bar", name: "all", marker: {color: "rgb(192, 192, 192)"}, x: timeline_all.x.map(ts => new Date(ts * 1000.0)), y: timeline_all.y},
                {type: "bar", name: "query", marker: {color: "rgb(143, 187, 218)"}, x: timeline_query.x.map(ts => new Date(ts * 1000.0)), y: timeline_query.y},
                {type: "bar", name: "result", marker: {color: "rgb(31, 119, 180)"}, x: timeline_result.x.map(ts => new Date(ts * 1000.0)), y: timeline_result.y},
            ], {
                barmode: "overlay", bargap: 0,
                autosize: true, margin: {b: 20, r: 8, l: 8, t: 8}, showlegend: true,
                legend: {x: 0, y: 1, xanchor: 'left', yanchor: 'top'},
                paper_bgcolor: "rgba(255,255,255,0)", plot_bgcolor: "rgba(255,255,255,0)",
                xaxis: {
                    range: [new Date(Math.min(timeline_all.range[0], timeline_query.range[0]) * 1000.0), new Date(Math.max(timeline_all.range[1], timeline_query.range[1]) * 1000)],
                    showgrid: false, showline: false, zeroline: false, visible: true
                },
                yaxis: {
                    range: [Math.min(timeline_all.range[2], timeline_query.range[2]), Math.max(timeline_all.range[3], timeline_query.range[3])],
                    showgrid: false, showline: false, zeroline: false, visible: false
                },
            }, {responsive: true},
        );
        window.requestAnimationFrame(function(){ Plotly.relayout(panel, {}); });
    }

    function process(data) {
        build_table(data === null? null: data.results);
        build_nav(data);
        build_timeline(data === null? null: data.timeline_all, data === null? null: data.timeline_query, data === null? null: data.timeline_result);
        build_plots(data === null? null: data.stats);
    }

    function disable(disabled) {
        $("query-fields").disabled = disabled;
        $("nav-fields").disabled = disabled;
        $("query-help").classList.remove("visible");
        $("spinner").style.display = disabled? "block": "none";
    }

    function request(method, url, callback) {
        const xhr = new XMLHttpRequest();
        xhr.withCredentials = true;
        xhr.open(method, url, true);
        xhr.onreadystatechange = function() {
            if (this.readyState === XMLHttpRequest.DONE) {
                const ct = this.getResponseHeader("Content-Type");
                const data = (ct !== null && ct.startsWith("application/json"))? JSON.parse(this.responseText): null;
                callback(data, this.status);
            }
        }
        xhr.send();
    }

    function search(plaintext_tab) {
        const error = $("error");
        const start = $("query-start");
        const end = $("query-end");

        let params = new URLSearchParams();
        params.append("q", $("query").value);
        params.append("s", start.value? date_to_timestamp(start.value): "");
        params.append("e", end.value? date_to_timestamp(end.value): "");
        let page_params = new URLSearchParams();
        page_params.append("_s", $("query-start-sel").value);
        page_params.append("_e", $("query-end-sel").value);
        let search_params = new URLSearchParams();
        search_params.append("l", $("limit").value);  // TODO: restore? replace state?
        search_params.append("o", $("offset").value);
        const http_params = (new URLSearchParams([...params, ...search_params])).toString();
        const history_params = (new URLSearchParams([...params, ...page_params])).toString();

        if (plaintext_tab) {
            window.open("api/tail?" + http_params, '_blank').focus();
            return;
        }

        disable(true);
        request("POST", "api/search?" + http_params, function (data, status) {
            if (history.state != history_params) {
                history.pushState(history_params, null, "#" + history_params);
            } else {
                history.replaceState(history_params, null, "#" + history_params);
            }
            if (data !== null && data.hasOwnProperty("error")) {
                error.textContent = data.error;
                process(null);
            } else if (status !== 200) {
                error.textContent = "Request error, status " + String(status);
                process(null);
            } else if (data === null) {
                error.textContent = "Request error, no data";
                process(null);
            } else {
                error.textContent = "";
                process(data);
            }
            disable(false);
        });
    }

    function do_search(plaintext_tab) {
        $("offset").value = 0;
        $("total").value = 0;
        search(plaintext_tab);
    }

    function do_nav(mode) {
        const offset = $("offset");
        const total = $("total");
        const limit = $("limit");
        if (mode === -2) offset.value = 0;
        else if (mode === -1) offset.value = Math.max(0, parseInt(offset.value) - parseInt(limit.value));
        else if (mode === 0) { if (!total.value || total.value === "0") return; }
        else if (mode === 1) offset.value = parseInt(offset.value) + parseInt(limit.value);
        else if (mode === 2) offset.value = Math.max(0, parseInt(total.value) - parseInt(limit.value));
        search();
    }

    function do_date_select(elem) {
        if (elem.id === "query-start-sel") $("query-start").value = (elem.value != -1)? date_from_offset(elem.value): "";
        else if (elem.id === "query-start") $("query-start-sel").value = (elem.value != "")? 0: -1;
        else if (elem.id === "query-end") $("query-end-sel").value = (elem.value != "")? 0: -1;
        else if (elem.id === "query-end-sel") $("query-end").value = (elem.value != -1)? date_from_offset(elem.value): "";
    }

    function restore_search(params, update_history) {
        if (params.has("_s") && params.get("_s") != "0") { $("query-start-sel").value = params.get("_s"); do_date_select($("query-start-sel")); }
        else if (params.get("s")) { $("query-start").value = date_from_timestamp(params.get("s")); do_date_select($("query-start")); }
        else { $("query-start-sel").value = $("query-start-sel").dataset.selected; do_date_select($("query-start-sel")); }
        if (params.has("_e") && params.get("_e") != "0") { $("query-end-sel").value = params.get("_e"); do_date_select($("query-end-sel")); }
        else if (params.get("e")) { $("query-end").value = date_from_timestamp(params.get("e")); do_date_select($("query-end")); }
        else { $("query-end-sel").value = $("query-end-sel").dataset.selected; do_date_select($("query-end-sel")); }

        if (params.has("l")) { $("limit").value = params.get("l"); }
        if (params.has("o")) { $("offset").value = params.get("o"); }
        if (params.has("q")) { $("query").value = params.get("q"); }

        process(null);
        disable(false);
    }

    window.addEventListener("popstate", function(event) {
        restore_search(event.state? new URLSearchParams(event.state): new URLSearchParams());
    });

    window.addEventListener("load", function() {
        restore_search(window.location.hash? new URLSearchParams(window.location.hash.substr(1)): new URLSearchParams());
    });
"""  # noqa

# language=XML
_ICON: str = r"""<svg xmlns="http://www.w3.org/2000/svg" height="48" viewBox="0 -960 960 960" width="48"><path fill="#1F77B4" d="M100.001-220.771v-45.383h391.154v45.383H100.001Zm0-203.076v-45.384h189.23v45.384h-189.23Zm0-203.077v-45.383h189.23v45.383h-189.23Zm727.998 406.923-159.23-158.077q-24.077 18.846-52.154 28.654-28.077 9.807-58.154 9.807-75.178 0-128.165-52.538-52.987-52.538-52.987-127.653 0-75.114 53.014-127.653 53.013-52.538 128.229-52.538 75.217 0 128.139 52.538Q739.614-594.922 739.614-520q0 30.077-9.808 58.154-9.807 28.076-29.038 51.768l159.231 158.077-32 32ZM558.226-385q56.793 0 96.399-39.227 39.605-39.226 39.605-95.345 0-56.12-39.37-95.582-39.37-39.461-96.163-39.461t-96.399 39.226q-39.605 39.226-39.605 95.346 0 56.12 39.37 95.581Q501.433-385 558.226-385Z"/></svg>"""  # noqa

# language=HTML
INDEX_HTML: bytes = r"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=no">
<meta name="robots" content="noindex,nofollow,noarchive">
<meta name="google" content="notranslate">
<title>{}</title>
<link rel="icon" href="data:image/svg+xml;base64,{}" type="image/svg+xml">
<style>{}</style>
</head>
<body>
    <div id="searchbar" class="widget element"><form id="query-form" onsubmit="event.preventDefault(); return false;"><fieldset id="query-fields" disabled>
        <div id="query-date">
            <select onchange="do_date_select(this)" id="query-start-sel" data-selected="86400"><option value="0">custom</option><option value="3600">1 hour ago</option><option value="43200">12 hours ago</option><option value="86400" selected>24 hours ago</option><option value="259200">3 days ago</option><option value="-1">all</option></select>
            <input onchange="do_date_select(this)" id="query-start" type="datetime-local" step="1">
            <input onchange="do_date_select(this)" id="query-end" type="datetime-local" step="1">
            <select onchange="do_date_select(this)" id="query-end-sel" data-selected="-1"><option value="0">custom</option><option value="3600">1 hour ago</option><option value="43200">12 hours ago</option><option value="86400">24 hours ago</option><option value="-1" selected>now</option></select>
        </div>
        <div id="query-line">
            <input id="query" placeholder="" title="" type="search" accesskey="s" name="query" autofocus>
            <input id="search" type="submit" value="&nbsp;🔍&nbsp;" title="execute query" accesskey="r" onclick="do_search(false); return false;">
            <input id="search" type="submit" value="📄" title="get plaintext" onclick="do_search(true); return false;">
            <input type="button" value="❓" title="show help" onclick="document.getElementById('query-help').classList.toggle('visible'); return false;">
        </div>
        <span id="error"></span>
        <span id="query-help">
            <b>Fields:</b> *, facility, severity, remote_host/client, hostname/host, application/app, pid, message/msg<br>
            <b>Values:</b> strings, &quot;quoted strings&quot;, /regular expressions/i<br>
            <b>Operators:</b> AND, OR, NOT, parenthesis<br>
            <b>Examples:</b><br>*:&quot;foo bar&quot;<br>NOT application:kernel<br>application:kernel OR NOT facility:/daemon|kernel/<br>(app:/^cron$/i OR hostname:localhost) AND NOT (NOT message:&quot;init:&quot; OR pid:1)
        </span>
    </fieldset></form><div id="spinner"></div></div>
    <div id="plot-timeline" class="plot widget element empty"></div>
    <div id="stats" class="element empty">
        <div id="plot-all" class="plot plot-pie empty" data-title="Results"></div>
        <div id="plot-facility" class="plot plot-pie empty" data-title="Facility"></div>
        <div id="plot-severity" class="plot plot-pie empty" data-title="Severity"></div>
        <div id="plot-hostname" class="plot plot-pie empty" data-title="Hostname"></div>
        <div id="plot-application" class="plot plot-pie empty" data-title="Application"></div>
    </div>
    <div id="nav" class="widget element"><form id="nav-form" onsubmit="event.preventDefault(); return false;"><fieldset id="nav-fields" disabled>
        <div><span id="results"></span></div>
        <input id="offset" type="hidden">
        <input id="total" type="hidden">
        <input type="submit" id="nav-start" value="⏪" title="first" onclick="do_nav(-2); return false;" disabled>
        <input type="submit" id="nav-prev" value="&nbsp;◀️&nbsp;" title="previous" onclick="do_nav(-1); return false;" disabled>
        <select id="limit" onchange="do_nav(0); return false;"><option value="0">none</option><option value="10">10</option><option value="100">100</option><option value="1000" selected>1000</option><option value="10000">10000</option><option value="">all</option></select>
        <input type="submit" id="nav-next" value="&nbsp;▶️&nbsp;" title="next" onclick="do_nav(1); return false;" disabled>
        <input type="submit" id="nav-end" value="⏩" title="last" onclick="do_nav(2); return false;" disabled>
    </fieldset></form></div>
    <div id="log" class="widget element"></div>
<script>{}</script>
<script>
{}
</script>
</body>
</html>
""".format(NAME, b64encode(_ICON.encode()).decode(), _INDEX_CSS, _INDEX_JS, get_plotlyjs()).encode("utf-8", errors="strict")  # noqa

INDEX_ETAG: str = md5(INDEX_HTML).hexdigest()  # nosec


# endregion
# region ENTRYPOINT
###


def main() -> int:
    try:
        parser = argparse.ArgumentParser(description=__doc__.strip(),
                                         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        Config.add_args(parser)
        config: Config = Config.from_args(parser.parse_args())
    except (ValueError, KeyError) as e:
        print(f"Invalid configuration, {e.__class__.__name__}: {str(e)}", file=sys.stderr)
        return 1
    else:
        return asyncio.run(run(config))


if __name__ == "__main__":
    sys.exit(main())


# endregion