#!/usr/bin/env python3
"""
Syslog server accepting RFC5424 compliant messages over TCP.
Records are maintained in flat JSON files.
A convenient built-in Web interface allows searching with a simple query language and statistics plotting support.
"""
import argparse
import asyncio
import gzip
import json
import logging
import math
import os
import re
import signal
import socket
import ssl
import subprocess
import sys
import threading
import time
from abc import abstractmethod
from base64 import b64encode
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, field
from datetime import datetime, timezone
from enum import Enum
from gc import collect
from hashlib import md5
from hmac import compare_digest
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from queue import Queue, Empty as QueueEmpty
from string import Template
from typing import get_args, Dict, Optional, Tuple, ClassVar, Callable, List, Literal, DefaultDict, Any, Iterator, \
Final, Union, Type, AsyncContextManager, ContextManager
from aiohttp import web, BasicAuth, hdrs
from lark import Lark, Transformer, Token, LarkError
from plotly.offline import get_plotlyjs
# region CONFIG
###
NAME: Final[str] = "syslogsearch"
TRACE: Final[int] = logging.DEBUG - 1
@dataclass(frozen=True)
class Config:
"""Settings from corresponding commandline arguments."""
verbose: bool = field(default=False, metadata={"help": "enable verbose logging"})
debug: bool = field(default=False, metadata={"help": "enable even more verbose debug logging"})
read_only: bool = field(default=False, metadata={"help": "don't actually write received records"})
reverse: bool = field(default=False, metadata={"help": "order results and pagination by most recent entry first"})
systemd_notify: bool = field(default=False, metadata={"help": "try to signal systemd readiness"})
syslog_bind_all: bool = field(default=False, metadata={"help": "bind to 0.0.0.0 instead of localhost only"})
syslog_port: int = field(default=5140, metadata={"help": "syslog TCP port to listen on", "metavar": "PORT"})
http_bind_all: bool = field(default=False, metadata={"help": "bind to 0.0.0.0 instead of localhost only"})
http_port: int = field(default=5141, metadata={"help": "HTTP port to listen on", "metavar": "PORT"})
http_basic_auth: bool = field(default=False, metadata={"help": "require basic authentication using credentials "
"from LOGSEARCH_USER and LOGSEARCH_PASS environment "
"variables"})
http_basic_credentials: Optional[Tuple[str, str]] = field(init=False, default=None)
https_certfile: Optional[Path] = field(default=None, metadata={"help": "certificate for HTTPS", "metavar": "PEM"})
https_keyfile: Optional[Path] = field(default=None, metadata={"help": "key file for HTTPS", "metavar": "PEM"})
data_dir: Path = field(default=Path("./"), metadata={"help": "storage directory", "metavar": "DIR"})
flush_interval: float = field(default=60.0, metadata={"help": "write out records this often", "metavar": "SEC"})
max_buffered: int = field(default=1000, metadata={"help": "record limit before early flush", "metavar": "LEN"})
max_shards: int = field(default=15, metadata={"help": "number of 'daily' files to maintain", "metavar": "NUM"})
rotate_compress: bool = field(default=False, metadata={"help": "create .gz files for old shards instead of delete"})
file_format: str = field(default="json", metadata={"choices": ["json", "csv"], "metavar": "EXT",
"help": "format for new files, either 'json' or 'csv'"})
json_full: bool = field(default=False, metadata={"help": "assume full objects instead of tuples in JSON mode"})
filters: Optional[Path] = field(default=None, metadata={"help": "filter query definitions file", "metavar": "TXT"})
alerts: Optional[Path] = field(default=None, metadata={"help": "alert definitions file", "metavar": "JSON"})
def __post_init__(self) -> None:
if self.http_basic_auth:
super().__setattr__("http_basic_credentials", (os.environ["LOGSEARCH_USER"], os.environ["LOGSEARCH_PASS"]))
del os.environ["LOGSEARCH_USER"]
del os.environ["LOGSEARCH_PASS"]
@classmethod
def add_args(cls, p: argparse.ArgumentParser) -> None:
for f in fields(cls):
flag: str = f"--{f.name.replace('_', '-')}"
if f.init:
if f.default is None: # Union[X, NoneType]
p.add_argument(flag, type=get_args(f.type)[0], default=None, **f.metadata)
elif f.type is bool:
p.add_argument(flag, action="store_const", const=not f.default, default=f.default, **f.metadata)
else:
p.add_argument(flag, type=f.type, default=f.default, **f.metadata)
@classmethod
def from_args(cls, args: argparse.Namespace) -> 'Config':
return cls(**{_.name: getattr(args, _.name) for _ in fields(cls) if _.init})
# endregion
# region SYSLOG
###
@dataclass(frozen=True)
class SyslogRecord:
"""Main datastructure as parsing result, API response, and JSON file i/o."""
timestamp: float
facility: str
severity: str
remote_host: Optional[str]
hostname: Optional[str]
application: Optional[str]
pid: Optional[int]
mid: Optional[str]
data: Optional[dict]
message: str
__slots__ = ("timestamp", "facility", "severity", "remote_host", "hostname",
"application", "pid", "mid", "data", "message")
def to_dict(self) -> Dict[str, Any]:
"""As API response."""
return {_: getattr(self, _) for _ in self.fields()}
def to_row(self) -> Tuple:
"""In storage format."""
return tuple(getattr(self, _) for _ in self.fields())
def to_log_line(self) -> bytes:
"""As pre-formatted tail similar to frontend, UTC."""
return "{} {} {} {}[{}] {}.{}: {}\n".format(
datetime.fromtimestamp(self.timestamp, tz=timezone.utc).isoformat(),
self.remote_host or "-", self.hostname or "-", self.application or "-", self.pid or "-",
self.facility, self.severity, self.message,
).encode("utf-8", errors="replace")
@classmethod
def from_dict(cls, data: Dict) -> 'SyslogRecord':
return cls(**data)
@classmethod
def from_row(cls, data: List) -> 'SyslogRecord':
return cls(*data)
@classmethod
def fields(cls) -> Tuple[str, ...]:
return cls.__slots__
class Syslog5424Parser:
"""
Parser for raw RFC5424 syslog messages into the record datastructure.
https://datatracker.ietf.org/doc/html/rfc5424
"""
_header_rx: ClassVar['re.Pattern[bytes]'] = re.compile(
br'^'
br'(?:[0-9]+ )?' # rsyslog '(o)' framing
br'<(?P<priority>[0-9]{1,3})>'
br'[0-9]{1,3} ' # version
br'(?P<timestamp>[!-~]+) '
br'(?P<hostname>[!-~]{1,255}) '
br'(?P<application>[!-~]{1,48}) '
br'(?P<pid>[!-~]{1,128}) '
br'(?P<mid>[!-~]{1,32}) '
br'(?:-|\[(?:[^]\\]|\\.)*]) ' # TODO: parse (certain) structured data
)
_reserved: Dict[int, int] = str.maketrans("\t\f\r\n", " ") # little sense in logs and for CSV
severities: ClassVar[List[str]] = [
"emerg", "alert", "crit", "error", "warn", "notice", "info", "debug"
]
facilities: ClassVar[List[str]] = [
"kern", "user", "mail", "daemon", "auth", "syslog", "lpr", "news", "uucp", "cron", "authpriv", "ftp", "ntp",
"audit", "alert", "clockd", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"
]
def __init__(self) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
def _parse_int(self, value: Optional[str]) -> Optional[int]:
try:
return int(value) if value is not None else None
except Exception as e:
self._logger.warning(f"Cannot parse integer '{value}': {str(e)}")
return None
def _parse_header(self, line: bytes) -> Tuple[bytes, Dict[str, str]]:
match: Optional[re.Match] = self._header_rx.match(line)
if match is None:
raise ValueError("Cannot parse syslog header")
return line[match.end():], {
k: v.decode("utf-8", errors="replace")
for k, v in match.groupdict().items()
if v and v != b'-'
}
def _parse_timestamp(self, timestamp: Optional[str]) -> Optional[float]:
if timestamp is None:
return None
if timestamp.endswith('Z'):
timestamp = timestamp[:-1] + '+0000'
try:
return datetime.strptime( # TODO: RFC allows non-padded microseconds or milliseconds
timestamp, '%Y-%m-%dT%H:%M:%S.%f%z' if '.' in timestamp else '%Y-%m-%dT%H:%M:%S%z'
).timestamp()
except (ValueError, OverflowError) as e:
self._logger.warning(f"Cannot parse timestamp '{timestamp}': {str(e)}")
return None
def _parse_message(self, data: bytes) -> str:
if data.startswith(b'\xEF\xBB\xBF'): # UTF-8 BOM
data = data[3:]
if data.startswith(b' '): # https://www.rsyslog.com/doc/master/configuration/modules/mmrm1stspace.html
data = data[1:]
return data.decode("utf-8", errors="replace").translate(self._reserved).rstrip()
def _parse_priority(self, priority: Optional[str]) -> Tuple[str, str]:
if priority is None:
raise ValueError("No priority")
try:
priority_val: int = int(priority)
return self.severities[priority_val & 0x7], self.facilities[priority_val >> 3]
except IndexError:
raise ValueError(f"Cannot parse priority '{priority}'") from None
def parse(self, line: bytes, remote_host: Optional[str]) -> SyslogRecord:
self._logger.log(TRACE, line.decode("ascii", errors="replace").rstrip())
line, header = self._parse_header(line)
message: str = self._parse_message(line)
severity, facility = self._parse_priority(header.get('priority', None))
timestamp: Optional[float] = self._parse_timestamp(header.get('timestamp', None))
return SyslogRecord(
facility=facility,
severity=severity,
timestamp=timestamp if timestamp is not None else time.time(),
hostname=header.get('hostname', None),
application=header.get('application', None),
pid=self._parse_int(header.get('pid', None)),
mid=header.get('mid', None),
data=None,
message=message,
remote_host=remote_host,
)
# endregion
# region QUERY
###
class QueryParseError(ValueError):
pass
class QueryMatch:
@abstractmethod
def __call__(self, record: SyslogRecord) -> bool:
"""Interface for a query tree as parsed from search syntax: Record to boolean."""
raise NotImplementedError
class QueryOperator:
@abstractmethod
def __call__(self, value: str) -> bool:
raise NotImplementedError
class QueryFactory:
"""Parse search input into query tree."""
_GRAMMAR: ClassVar[str] = r"""
?start: query
| noop
noop:
?query: subquery_list_and
| subquery_list_or
subquery_list_and: subquery ("AND" subquery)+
subquery_list_or: subquery ("OR" subquery)*
?subquery: query_negated
| query_inner
| match_negated
| match
query_negated: "NOT" "(" query ")"
?query_inner: "(" query ")"
match_negated: "NOT" match
match: FIELD ":" condition
condition: IREGEX
| REGEX
| QSTRING
| STRING
FIELD: "*" | ("_"|LCASE_LETTER)+
STRING: /[^ \t\f\r\n"()]/+
QSTRING: ESCAPED_STRING
IREGEX: "/" /([^\/\\]|\\.)*/ "/i"
REGEX: "/" /([^\/\\]|\\.)*/ "/"
%import common.LCASE_LETTER
%import common.WORD
%import common.ESCAPED_STRING
%import common.WS
%ignore WS
"""
class QueryContainsOperator(QueryOperator):
def __init__(self, target: str) -> None:
self._target: str = target
def __call__(self, value: str) -> bool:
return self._target in value
def __str__(self) -> str:
return f'"{self._target}"'
class QueryRegexOperator(QueryOperator):
def __init__(self, regex: str, sensitive: bool) -> None:
try:
self._regex: re.Pattern = re.compile(regex, flags=0 if sensitive else re.IGNORECASE)
except re.error as e:
raise QueryParseError(f"Cannot parse '{regex}': {str(e)}") from None
def __call__(self, value: str) -> bool:
return self._regex.search(value) is not None
def __str__(self) -> str:
return f"/{self._regex.pattern}/{'i' if re.IGNORECASE & self._regex.flags else ''}"
class QueryNotMatch(QueryMatch):
def __init__(self, match: QueryMatch) -> None:
self._match: QueryMatch = match
def __call__(self, record: SyslogRecord) -> bool:
return not self._match(record)
def __str__(self) -> str:
return f"<NOT {str(self._match)}>"
class QueryAndMatch(QueryMatch):
def __init__(self, matches: List[QueryMatch]) -> None:
self._matches: List[QueryMatch] = matches
def __call__(self, record: SyslogRecord) -> bool:
return len(self._matches) > 0 and all(_(record) for _ in self._matches) # True -> False upon empty
def __str__(self) -> str:
return f"<AND {' '.join(str(_) for _ in self._matches) if len(self._matches) else 'False'}>"
class QueryOrMatch(QueryMatch):
def __init__(self, matches: List[QueryMatch]) -> None:
self._matches: List[QueryMatch] = matches
def __call__(self, record: SyslogRecord) -> bool:
return len(self._matches) == 0 or any(_(record) for _ in self._matches) # False -> True upon empty
def __str__(self) -> str:
return f"<OR {' '.join(str(_) for _ in self._matches) if len(self._matches) else 'True'}>"
class QueryFieldMatch(QueryMatch):
_short_fields: ClassVar[Dict[str, Optional[str]]] = {
"*": None,
"client": "remote_host",
"host": "hostname",
"app": "application",
"msg": "message",
}
def __init__(self, key: str, operator: QueryOperator) -> None:
if key in self._short_fields:
self._key: Optional[str] = self._short_fields[key]
elif key in SyslogRecord.fields():
self._key = key
else:
raise KeyError(f"Unknown field '{key}'. "
f"Expected one of: {', '.join(SyslogRecord.fields())} "
f"(or: {', '.join(self._short_fields.keys())})")
self._operator: QueryOperator = operator
@classmethod
def _get_attr(cls, record: SyslogRecord, attr: str) -> str:
value: Any = getattr(record, attr, None)
return value if isinstance(value, str) else "" if value is None else str(value)
def __call__(self, record: SyslogRecord) -> bool:
if self._key is None:
return any(self._operator(self._get_attr(record, _)) for _ in SyslogRecord.fields())
return self._operator(self._get_attr(record, self._key))
def __str__(self) -> str:
return f"<{self._key if self._key is not None else '*'}:{str(self._operator)}>"
class QueryBuilder(Transformer[Token, QueryMatch]):
"""Transformer for creating a query tree on-the-fly while parsing tokens."""
@classmethod
def _unescape(cls, s: str) -> str:
return re.sub(r"\\(.)", r"\1", s)
def condition(self, s: Tuple[Token]) -> QueryOperator:
token: Token = s[0]
if token.type == "STRING":
return QueryFactory.QueryContainsOperator(token.value)
elif token.type == "QSTRING":
return QueryFactory.QueryContainsOperator(self._unescape(token.value[1:-1]))
elif token.type == "IREGEX":
return QueryFactory.QueryRegexOperator(self._unescape(token.value[1:-2]), sensitive=False)
elif token.type == "REGEX":
return QueryFactory.QueryRegexOperator(self._unescape(token.value[1:-1]), sensitive=True)
else: # TODO: severity enum gt/lt support
raise QueryParseError(f"Unexpected token of type '{token.type}'")
def noop(self, s: Tuple[()]) -> QueryMatch:
return QueryFactory.QueryOrMatch([])
def match(self, s: Tuple[Token, QueryOperator]) -> QueryMatch:
return QueryFactory.QueryFieldMatch(s[0].value, s[1])
def match_negated(self, s: Tuple[QueryMatch]) -> QueryMatch:
return QueryFactory.QueryNotMatch(s[0])
def query_negated(self, s: Tuple[QueryMatch]) -> QueryMatch:
return QueryFactory.QueryNotMatch(s[0])
def subquery_list_or(self, s: Tuple[QueryMatch, ...]) -> QueryMatch:
return QueryFactory.QueryOrMatch(list(s)) if len(s) != 1 else s[0]
def subquery_list_and(self, s: Tuple[QueryMatch, ...]) -> QueryMatch:
return QueryFactory.QueryAndMatch(list(s)) if len(s) != 1 else s[0]
_inst: ClassVar[Optional['QueryFactory']] = None # singleton
def __init__(self) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._parser: Lark = Lark(self._GRAMMAR, start="start", parser="lalr", transformer=QueryFactory.QueryBuilder())
@classmethod
def get_inst(cls) -> 'QueryFactory':
if cls._inst is None:
cls._inst = QueryFactory()
return cls._inst
def parse(self, query: str) -> QueryMatch:
try:
match: QueryMatch = self._parser.parse(query) # type: ignore
except QueryParseError:
raise
except re.error as e:
raise QueryParseError(f"Cannot parse regex: {str(e)}") from None
except (ValueError, KeyError, LarkError) as e:
raise QueryParseError(f"Cannot parse query: {str(e)}") from None
else:
self._logger.debug(str(match))
return match
class Timeline:
"""Counter for timestamps (from records) into fixed-number but variable-width buckets. (Bar-Chart)"""
def __init__(self, start: float, end: float, samples: int = 250) -> None:
self._size: int = 0
self._start: float = start
self._end: float = max(end, start + 1.0)
self._samples: int = samples
self._range: float = self._end - self._start
self._width: float = self._range / self._samples
self._scale: float = self._samples / self._range
self._buckets: Dict[int, int] = {_: 0 for _ in range(self._samples)} # faster than DefaultDict, similar to List
def _count(self, timestamp: float, value: int) -> None:
bucket: float = self._scale * (timestamp - self._start)
self._buckets[min(max(math.floor(bucket), 0), self._samples - 1)] += value
def push(self, timestamp: float) -> None:
self._count(timestamp, 1)
self._size += 1
def pop(self, timestamps: List[float]) -> None:
for timestamp in timestamps:
self._count(timestamp, -1)
self._size -= 1
def to_dict(self) -> Dict:
return {
"range": (self._start, self._end, 0, max(self._buckets.values())),
"x": [self._start + (_ * self._width) + (self._width / 2) for _ in self._buckets.keys()],
"y": list(self._buckets.values()),
# "interval": (next((_ for _ in self._buckets.keys() if self._buckets[_]), 0),
# next((_ for _ in reversed(self._buckets.keys()) if self._buckets[_]), self._samples - 1)),
}
def __len__(self) -> int:
return self._size
class Stats:
"""Count record property values for statistics. (Pie-Chart)"""
def __init__(self) -> None:
self._keys: Tuple[str, ...] = ("all",)
self._record_keys: Tuple[str, ...] = ("facility", "severity", "hostname", "application")
self._stats: Dict[str, DefaultDict[Optional[str], int]] = {k: defaultdict(int)
for k in self._record_keys + self._keys}
def count_record(self, record: SyslogRecord) -> None:
for key in self._record_keys:
self._stats[key][getattr(record, key)] += 1
def count_value(self, key: str, value: str, count: int = 1) -> None:
self._stats[key][value] += count
def to_dict(self) -> Dict:
return self._stats
@dataclass(frozen=True)
class LogSearch:
"""Actual search task to be done while scanning input records. Built by API endpoint."""
start: float
end: float
offset: int
limit: Optional[int]
query: QueryMatch
@dataclass
class LogSearchResult:
"""Search result from scanning input records. Returned by the API."""
timeline_all: Timeline
timeline_query: Timeline
timeline_result: Timeline
stats: Stats
offset: int
total: int = 0
results: List[SyslogRecord] = field(default_factory=list)
def to_dict(self) -> Dict:
return {
"timeline_all": self.timeline_all.to_dict(),
"timeline_query": self.timeline_query.to_dict(),
"timeline_result": self.timeline_result.to_dict(),
"stats": self.stats.to_dict(),
"offset": self.offset,
"total": self.total,
"results": [_.to_dict() for _ in self.results],
}
class LogSearchEvaluator:
"""Run the given search on a stream of records, with result builder and offset/limit logic."""
def __init__(self, query: LogSearch, reverse: bool) -> None:
self._query: LogSearch = query
self._reverse: bool = reverse
self._result: LogSearchResult = LogSearchResult(timeline_all=Timeline(query.start, query.end),
timeline_query=Timeline(query.start, query.end),
timeline_result=Timeline(query.start, query.end),
stats=Stats(),
offset=query.offset)
def get_result(self) -> LogSearchResult:
if self._reverse and self._query.offset > 0:
self._result.timeline_result.pop([_.timestamp for _ in self._result.results[:self._query.offset]])
self._result.results = self._result.results[self._query.offset:]
self._result.stats.count_value("all", "result",
len(self._result.timeline_result))
self._result.stats.count_value("all", "query",
len(self._result.timeline_query) - len(self._result.timeline_result))
self._result.stats.count_value("all", "all",
len(self._result.timeline_all) - len(self._result.timeline_query))
return self._result
def evaluate(self, record: SyslogRecord) -> None:
"""Feed a candidate read from file."""
if self._query.start <= record.timestamp <= self._query.end:
if self._query.query(record):
if self._query.limit is not None and self._query.limit <= 0:
pass
elif self._reverse:
self._result.timeline_result.push(record.timestamp)
self._result.results.insert(0, record)
if self._query.limit is not None:
if len(self._result.results) > self._query.offset + self._query.limit:
self._result.timeline_result.pop([self._result.results[-1].timestamp])
self._result.results.pop()
else:
if self._query.offset <= self._result.total:
if self._query.limit is None or len(self._result.results) < self._query.limit:
self._result.timeline_result.push(record.timestamp)
self._result.results.append(record)
self._result.total += 1
self._result.stats.count_record(record)
self._result.timeline_query.push(record.timestamp)
self._result.timeline_all.push(record.timestamp)
# endregion
# region FILTERS
###
class AlertRuleset:
"""Load alert trigger definitions from dedicated JSON configuration file."""
@dataclass(frozen=True)
class Alert:
pos: int
name: str
query: QueryMatch
command: List[Template]
limit: float
def to_dict(self) -> Dict:
return {"name": self.name, "query": str(self.query), "command": [_.template for _ in self.command]}
def __str__(self) -> str:
return f"{self.name}: {str(self.query)} -> {' '.join([_.template for _ in self.command])}"
def __init__(self, ruleset: Optional[Path]) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._alerts: List[AlertRuleset.Alert] = list(self._load_alerts(ruleset)) if ruleset is not None else []
for alert in self._alerts:
self._logger.info(str(alert))
@classmethod
def _load_alerts(cls, filename: Path) -> Iterator[Alert]:
with filename.open("r") as fp:
ruleset: List[Dict[str, Any]] = json.load(fp)
if not isinstance(ruleset, list) or not all(isinstance(_, dict) for _ in ruleset):
raise ValueError(f"Cannot parse '{ruleset}': Expecting object list")
pos: int = 0
queries: QueryFactory = QueryFactory.get_inst()
for rule in ruleset:
if not isinstance(rule["command"], list) or not all(isinstance(_, str) for _ in rule["command"]):
raise ValueError(f"Cannot parse '{ruleset}': Expecting command list")
pos += 1
yield AlertRuleset.Alert(pos=pos,
name=str(rule.get("name", "")),
query=queries.parse(str(rule["query"])),
command=[Template(str(_)) for _ in rule["command"]],
limit=float(rule.get("limit", 0.0)))
def dump(self) -> List[Dict]:
return [alert.to_dict() for alert in self._alerts]
def match(self, record: SyslogRecord) -> Iterator[Alert]:
for alert in self._alerts:
if alert.query(record):
yield alert
class AlertDispatcher(ContextManager['AlertDispatcher']):
"""Check parsed records against the alert ruleset. Trigger commands are executed in a separate thread."""
class Runnable:
def __init__(self, logger: logging.Logger, alert: AlertRuleset.Alert, record: SyslogRecord) -> None:
self._logger: logging.Logger = logger
self._timeout: float = 60.0
self._name: str = alert.name
self._command: List[str] = self._interpolate_command(alert.command, record)
@classmethod
def _interpolate_command(cls, command: List[Template], record: SyslogRecord) -> List[str]:
if len(command) < 1:
raise ValueError("Empty command")
values: Dict[str, str] = {k: "" if v is None else str(v) for k, v in record.to_dict().items()}
return [_.safe_substitute(values) for _ in command]
def run(self) -> None:
try:
_: subprocess.CompletedProcess = subprocess.run(
self._command, env=None, timeout=self._timeout,
shell=False, check=True, close_fds=True,
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
except (OSError, subprocess.SubprocessError) as e:
self._logger.error(f"Cannot run alert '{self._name}': {str(e)}")
else:
self._logger.info(f"Triggered alert '{self._name}'")
def __init__(self, alerts: AlertRuleset) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._queue: Queue[Optional[AlertDispatcher.Runnable]] = Queue()
self._thread: Optional[threading.Thread] = None
self._alerts: AlertRuleset = alerts
self._last_seen: DefaultDict[int, float] = defaultdict(float)
def check(self, record: SyslogRecord) -> None:
now: float = time.monotonic()
for alert in self._alerts.match(record):
if alert.limit > 0.0 and self._last_seen[alert.pos] > now - alert.limit:
continue
self._last_seen[alert.pos] = now
self._queue.put_nowait(AlertDispatcher.Runnable(self._logger, alert, record))
def _worker(self) -> None:
while True:
task: Optional[AlertDispatcher.Runnable] = self._queue.get(block=True)
try:
if task is None:
break
task.run()
finally:
self._queue.task_done()
def __enter__(self) -> 'AlertDispatcher':
assert self._thread is None
self._thread = threading.Thread(target=self._worker, name=self.__class__.__name__)
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
assert self._thread is not None
self._logger.debug("Shutting down")
self._queue.put_nowait(None) # sentinel
self._queue.join()
self._thread.join()
self._thread = None
return False
class FilterRuleset:
"""Load filter query definitions from dedicated text file."""
def __init__(self, ruleset: Optional[Path]) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._filters: List[QueryMatch] = list(self._load_queries(ruleset)) if ruleset is not None else []
for query in self._filters:
self._logger.info(str(query))
@classmethod
def _load_queries(cls, filename: Path) -> Iterator[QueryMatch]:
queries: QueryFactory = QueryFactory.get_inst()
with filename.open("r") as fp:
for line in fp:
line = line.strip()
if line and not line.startswith("#"):
yield queries.parse(line)
def dump(self) -> List[str]:
return [str(_) for _ in self._filters]
def match(self, record: SyslogRecord) -> bool:
for query in self._filters:
if query(record):
return True
return False
# endregion
# region FILES
###
class Serializer:
"""Conversion of record fields from/into storage format."""
@classmethod
@abstractmethod
def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
raise NotImplementedError
@classmethod
@abstractmethod
def decode(cls, line: str) -> SyslogRecord:
raise NotImplementedError
class CompactJSONSerializer(Serializer):
"""
Most compact tuple representation by whitespace-less JSON lists
"""
_decoder: ClassVar[json.JSONDecoder] = json.JSONDecoder(strict=False)
_encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False, check_circular=False,
allow_nan=False, separators=(',', ':'))
@classmethod
def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
for chunk in cls._encoder.iterencode(record.to_row(), _one_shot=True):
yield chunk.encode("utf-8", errors="replace")
yield b"\n"
@classmethod
def decode(cls, line: str) -> SyslogRecord:
try:
return SyslogRecord.from_row(cls._decoder.raw_decode(line)[0])
except (UnicodeError, json.JSONDecodeError, ValueError, TypeError) as e:
raise ValueError(f"Cannot decode JSON: {repr(e)}") from None
class FullJSONSerializer(Serializer):
"""
Full JSON object representation, which always repeats keys.
"""
_decoder: ClassVar[json.JSONDecoder] = json.JSONDecoder(strict=False)
_encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False)
@classmethod
def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
for chunk in cls._encoder.iterencode(record.to_dict(), _one_shot=True):
yield chunk.encode("utf-8", errors="replace")
yield b"\n"
@classmethod
def decode(cls, line: str) -> SyslogRecord:
try:
return SyslogRecord.from_dict(cls._decoder.raw_decode(line)[0])
except (UnicodeError, json.JSONDecodeError, ValueError, TypeError) as e:
raise ValueError(f"Cannot decode JSON: {repr(e)}") from None
class CSVSerializer(Serializer):
"""
Tab-separated CSV dialect parser that supports None, str, int, and float values.
Assumes no newlines, tabs, or similar are present. Does not escape any other special or unicode chars.
"""
@classmethod
def _encode(cls, values: Tuple) -> Iterator[bytes]:
for value in values:
if value is None:
yield b"" # as we quote all strings, we can distinguish None from an empty string
elif isinstance(value, (float, int)):
yield str(value).encode("utf-8", errors="strict")
elif isinstance(value, str):
escaped: str = value.replace("\\", "\\\\").replace("\"", "\\\"")
yield b"".join((b"\"", escaped.encode("utf-8", errors="replace"), b"\""))
else:
raise ValueError(f"Unsupported CSV value: '{value}'")
@classmethod
def encode(cls, record: SyslogRecord) -> Iterator[bytes]:
yield b"\t".join(cls._encode(record.to_row()))
yield b"\n"
@classmethod
def _decode(cls, row: str) -> Iterator:
for value in row.split("\t"):
if not len(value):
yield None
elif value.startswith("\"") and value.endswith("\"") and len(value) >= 2:
yield value[1:-1].replace("\\\"", "\"").replace("\\\\", "\\")
else:
yield float(value) if "." in value else int(value)
@classmethod
def decode(cls, row: str) -> SyslogRecord:
if not row.endswith("\n"):
raise ValueError("Truncated CSV line")
return SyslogRecord.from_row(list(cls._decode(row[:-1])))
class FileCompressor(ContextManager['FileCompressor']):
"""Compress or delete old shard files."""
def __init__(self, enabled: bool) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._enabled: bool = enabled
self._buf_size: int = 4096*16
self._executor: Optional[ThreadPoolExecutor] = None
def __enter__(self) -> 'FileCompressor':
assert self._executor is None
if self._enabled:
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=self.__class__.__name__)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
return False
def _fallback(self, file: Path) -> None:
try:
file.unlink(missing_ok=True)
except OSError as e:
self._logger.warning(f"Cannot delete {file}: {str(e)}")
else:
self._logger.info(f"Deleted {file}")
def _handle(self, file: Path) -> None:
try:
temp: Path = file.with_suffix(file.suffix + ".gz.tmp")
target: Path = file.with_suffix(file.suffix + ".gz")
try:
with file.open("rb", buffering=self._buf_size) as in_fp:
with gzip.GzipFile(temp, "wb", mtime=0) as out_fp:
while True:
buf: bytes = in_fp.read(self._buf_size)
if not buf:
break
out_fp.write(buf)
temp.rename(target)
except OSError:
temp.unlink(missing_ok=True)
target.unlink(missing_ok=True)
raise
else:
file.unlink(missing_ok=True)
self._logger.info(f"Created {target}")
except OSError as e:
self._logger.warning(f"Cannot compress {file}: {str(e)}")
self._fallback(file)
def handle(self, file: Path) -> None:
if self._executor is not None:
self._executor.submit(self._handle, file)
else:
self._fallback(file)
class FileRegistry:
"""Maintain an upper-limit of timestamp-based JSON files/shards."""
@dataclass(frozen=True)
class ShardFile:
timestamp: int
shard: str
path: Path
def __lt__(self, other: 'FileRegistry.ShardFile') -> bool:
return self.timestamp < other.timestamp if isinstance(other, FileRegistry.ShardFile) else NotImplemented
def __init__(self, root: Path, fmt: str, json_full: bool,
jitter: float, max_shards: int, rotate_cb: Callable[[Path], None]) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._root: Path = Path(root)
self._jitter_margin: float = jitter * 2.0
self._json_full: bool = json_full
self._fmt: str = fmt
self._shard_format: str = "%d-%m-%Y" # daily, sortable
self._shard_glob: str = "*.log.*"
self._shard_re: re.Pattern = re.compile(r"[0-9]+-[0-9]+-[0-9]+_[0-9]+\.log\.(json|csv)")
self._max_shards: int = max_shards
self._rotate_cb: Callable[[Path], None] = rotate_cb
self._index: List[FileRegistry.ShardFile] = sorted(self._crawl())
self._lock: threading.Lock = threading.Lock()
self._logger.info(f"Found {len(self._index)} files in {self._root}")
def _crawl(self) -> Iterator[ShardFile]:
for fn in self._root.glob(self._shard_glob): # type: Path
if self._shard_re.fullmatch(fn.name) is not None:
shard, ts = fn.stem.split("_")
yield FileRegistry.ShardFile(int(ts.split(".")[0]), shard, fn)
def get_start(self) -> Optional[float]:
"""Smallest timestamp to be expected (from the oldest file)."""
with self._lock:
return float(self._index[0].timestamp - self._jitter_margin) if len(self._index) else None
def get_current(self) -> Path:
"""Current file to write to. Possibly create a new entry and rotate/delete old ones."""
now: datetime = datetime.now(timezone.utc)
ts: int = math.floor(now.timestamp())
shard: str = now.strftime(self._shard_format)
with self._lock:
if not len(self._index) or self._index[-1].shard != shard:
while len(self._index) >= self._max_shards: # maintenance on old shards whenever we add a new one
old: FileRegistry.ShardFile = self._index.pop(0)
self._rotate_cb(old.path)
filename: Path = self._root / Path(f"{shard}_{ts}").with_suffix(f".log.{self._fmt}")
self._index.append(FileRegistry.ShardFile(ts, shard, filename))
self._logger.info(f"Creating {filename}")
return self._index[-1].path
def get_candidates(self, query_start: float, query_end: float) -> Iterator[Path]:
"""Files to search, which might contain records from the given timestamp interval."""
results: List[Path] = []
with self._lock:
for i in range(len(self._index)):
entry: FileRegistry.ShardFile = self._index[i]
file_end: Optional[int] = self._index[i + 1].timestamp if i + 1 < len(self._index) else None
if file_end is None or query_start <= file_end + self._jitter_margin:
if query_end >= entry.timestamp - self._jitter_margin:
results.append(entry.path)
yield from results # NB: not yielding during lock
def get_serializer(self, filename: Path) -> Type[Serializer]:
if filename.suffix == ".json":
return FullJSONSerializer if self._json_full else CompactJSONSerializer
elif filename.suffix == ".csv":
return CSVSerializer
else:
raise RuntimeError(f"Unsupported file extension in '{filename}'")
class FileWriter(ContextManager['FileWriter']):
"""Single-thread-based write queue for records to file, as async file i/o is not a real thing in python anyway."""
class QueueMessage(Enum):
eof = "eof" # shutdown sentinel
flush = "flush"
def __init__(self, files: FileRegistry, read_only: bool, max_buffered: int, flush_interval: float) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._enabled: bool = not read_only
self._max_buffered: int = max_buffered
self._flush_interval: float = flush_interval
self._files: FileRegistry = files
self._queue: Queue[Union[SyslogRecord, FileWriter.QueueMessage]] = Queue()
self._thread: Optional[threading.Thread] = None
def _worker(self) -> None:
last_flush: float = time.monotonic()
buffer: List[SyslogRecord] = []
done: bool = False
while len(buffer) or not done:
try:
timeout: float = max(1.0, last_flush + self._flush_interval - time.monotonic())
record: Union[SyslogRecord, FileWriter.QueueMessage] = self._queue.get(block=True, timeout=timeout)
if record is FileWriter.QueueMessage.eof:
done = True
elif record is FileWriter.QueueMessage.flush:
last_flush = 0.0
else:
assert isinstance(record, SyslogRecord)
self._logger.log(TRACE, str(record))
buffer.append(record)
except QueueEmpty:
pass
else:
self._queue.task_done()
now: float = time.monotonic()
if len(buffer) >= self._max_buffered or last_flush <= now - self._flush_interval or done:
last_flush = now
if len(buffer):
if self._enabled:
self._write(buffer)
buffer.clear()
collect()
def _write(self, records: List[SyslogRecord]) -> None:
filename: Path = self._files.get_current()
try:
encoder: Type[Serializer] = self._files.get_serializer(filename)
with filename.open("ab", buffering=4096 * 16) as fp: # TODO: keep (text) fd open
for record in records:
for chunk in encoder.encode(record):
fp.write(chunk)
fp.flush()
except (OSError, ValueError, RuntimeError) as e:
self._logger.error(f"Cannot write to {filename}: {str(e)}")
else:
self._logger.log(TRACE, f"Flushed {len(records)} records to {filename}")
def __enter__(self) -> 'FileWriter':
assert self._thread is None
self._thread = threading.Thread(target=self._worker, name=self.__class__.__name__)
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
assert self._thread is not None
self._logger.debug("Shutting down")
self._queue.put_nowait(FileWriter.QueueMessage.eof)
self._queue.join()
self._thread.join()
self._thread = None
return False
def put(self, record: SyslogRecord) -> None:
assert self._thread is not None
self._queue.put_nowait(record) # TODO: wait, full, and timeout?
def flush(self) -> None:
assert self._thread is not None
self._queue.put_nowait(FileWriter.QueueMessage.flush)
class FileReader(ContextManager['FileReader']):
"""Thread-pool-based searching through files for query matches."""
def __init__(self, files: FileRegistry, reverse: bool, max_workers: int = 4) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._files: FileRegistry = files
self._reverse: bool = reverse
self._max_workers: int = max_workers
self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
self._executor: Optional[ThreadPoolExecutor] = None
def __enter__(self) -> 'FileReader':
assert self._executor is None
self._executor = ThreadPoolExecutor(max_workers=self._max_workers, thread_name_prefix=self.__class__.__name__)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
assert self._executor is not None
self._executor.shutdown(wait=True)
self._executor = None
return False
def _search(self, query: LogSearch) -> LogSearchResult:
start: float = time.perf_counter()
evaluator: LogSearchEvaluator = LogSearchEvaluator(query, self._reverse)
for filename in self._files.get_candidates(query.start, query.end):
try:
decoder: Type[Serializer] = self._files.get_serializer(filename)
with filename.open("r", buffering=4096*16, newline="\n", encoding="utf-8") as fp:
self._logger.log(TRACE, f"Scanning {filename}")
for line in fp:
if not line.endswith("\n"):
break # dirty read
evaluator.evaluate(decoder.decode(line))
except (OSError, ValueError) as e:
self._logger.error(f"Cannot search {filename}: {str(e)}")
result: LogSearchResult = evaluator.get_result()
duration: int = math.ceil((time.perf_counter() - start) * 1000)
self._logger.debug(f"Returning {len(result.results)} of {result.total} results after {duration}ms")
return result
async def search(self, query: LogSearch) -> LogSearchResult:
assert self._executor is not None
return await asyncio.wrap_future(self._executor.submit(self._search, query))
# endregion
# region SERVER
###
class SyslogServer(AsyncContextManager['SyslogServer']):
"""TCP server for syslog parsing with new record callback to write queue."""
def __init__(self, exclude_cb: Callable[[SyslogRecord], bool], cb: List[Callable[[SyslogRecord], None]],
bind_all: bool = False, port: int = 5140) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._host: str = "0.0.0.0" if bind_all else "127.0.0.1"
self._port: int = port
self._server: Optional[asyncio.AbstractServer] = None
self._parser: Syslog5424Parser = Syslog5424Parser()
self._exclude_cb: Callable[[SyslogRecord], bool] = exclude_cb
self._cb: List[Callable[[SyslogRecord], None]] = cb
async def __aenter__(self) -> 'SyslogServer':
try:
assert self._server is None
self._server = await asyncio.start_server(self._handler, self._host, self._port,
family=socket.AF_INET, flags=socket.IPPROTO_TCP,
reuse_address=True, reuse_port=True,
start_serving=True)
except OSError as e:
raise RuntimeError(f"Cannot start syslog server: {str(e)}") from None
else:
self._logger.info(f"Listening at {self._host}:{self._port}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
assert self._server is not None
self._logger.debug("Shutting down")
self._server.close()
await self._server.wait_closed()
self._server = None
return False
async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
remote_addr, remote_port = writer.get_extra_info('peername') # type: str, int
self._logger.debug(f"New connection from {remote_addr}:{remote_port}")
while True:
try:
data: bytes = await reader.readline()
if not data.endswith(b"\n"):
self._logger.debug(f"EOF from {remote_addr}:{remote_port}")
break
record: SyslogRecord = self._parser.parse(data, remote_addr)
if not self._exclude_cb(record):
for callback in self._cb:
callback(record)
except OSError as e:
self._logger.warning(f"Cannot read from {remote_addr}: {str(e)}")
break
except ValueError as e:
self._logger.warning(f"Cannot parse syslog line from {remote_addr}: {str(e)}")
break
writer.close()
await writer.wait_closed()
class ApiHandler:
"""Handler methods for API requests from aiohttp webserver."""
_encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False, separators=(',', ':'),
check_circular=False, allow_nan=False)
_query_factory: ClassVar[QueryFactory] = QueryFactory.get_inst()
_compression_threshold: ClassVar[int] = 10 * 1024
class JsonResponse(web.Response):
def __init__(self, status: int, data: Union[Dict, List]) -> None:
super().__init__(status=status, body=ApiHandler._encoder.encode(data),
content_type="application/json", charset="utf-8",
zlib_executor_size=ApiHandler._compression_threshold)
self.enable_compression()
class StreamedPlainResponse(web.StreamResponse):
def __init__(self, status: int) -> None:
super().__init__(status=status)
self.content_type = "text/plain"
self.charset = "utf-8"
self.enable_compression()
async def write_all(self, request: web.Request, data: Iterator[bytes]) -> web.StreamResponse:
await self.prepare(request)
try:
for chunk in data:
await self.write(chunk)
finally:
await self.write_eof()
return self
class HtmlResponse(web.Response):
def __init__(self, status: int, data: bytes = b"", etag: Optional[str] = None) -> None:
super().__init__(status=status, body=data,
content_type="text/html", charset="utf-8",
zlib_executor_size=ApiHandler._compression_threshold)
if len(data) >= ApiHandler._compression_threshold:
self.enable_compression()
if etag is not None:
self.headers.update({
"ETag": f'"{etag}"',
"Cache-Control": "public, must-revalidate, max-age=604800",
"Vary": "Accept-Encoding",
})
def __init__(self, alerts: AlertRuleset, filters: FilterRuleset,
files: FileRegistry, writer: FileWriter, reader: FileReader) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._alerts: AlertRuleset = alerts
self._filters: FilterRuleset = filters
self._files: FileRegistry = files
self._writer: FileWriter = writer
self._reader: FileReader = reader
async def _search(self, request: web.Request) -> LogSearchResult:
query: QueryMatch = self._query_factory.parse(request.query.get("q", ""))
end: float = float(request.query.get("e", "") or math.ceil(time.time()))
start: float = float(request.query.get("s", "") or self._files.get_start() or (end - 3600.0))
offset: int = int(request.query.get("o", "0") or 0)
limit: Optional[int] = int(request.query["l"]) if request.query.get("l", "") else None
self._writer.flush()
return await self._reader.search(query=LogSearch(
start=min(start, end), end=max(start, end), offset=offset, limit=limit, query=query
))
async def _handle_search(self, request: web.Request) -> web.StreamResponse:
try:
result: LogSearchResult = await self._search(request)
return self.JsonResponse(status=200, data=result.to_dict())
except (QueryParseError, ValueError, RuntimeError) as e:
self._logger.warning(str(e))
return self.JsonResponse(status=200, data={"error": str(e)})
except Exception as e:
self._logger.error(repr(e))
return self.JsonResponse(status=500, data={"error": e.__class__.__name__})
async def _handle_tail(self, request: web.Request) -> web.StreamResponse:
try:
result: LogSearchResult = await self._search(request)
return await self.StreamedPlainResponse(status=200).write_all(
request, (record.to_log_line() for record in result.results)
)
except (QueryParseError, ValueError, RuntimeError) as e:
self._logger.warning(str(e))
return self.JsonResponse(status=200, data={"error": str(e)})
except Exception as e:
self._logger.error(repr(e))
return self.JsonResponse(status=500, data={"error": e.__class__.__name__})
async def _handle_flush(self, request: web.Request) -> web.Response:
self._writer.flush()
return self.HtmlResponse(status=202)
async def _handle_alerts(self, request: web.Request) -> web.Response:
return self.JsonResponse(status=200, data=self._alerts.dump())
async def _handle_filters(self, request: web.Request) -> web.Response:
return self.JsonResponse(status=200, data=self._filters.dump())
async def _handle_index(self, request: web.Request) -> web.Response:
if request.headers.get("ETag", "") == f'"{INDEX_ETAG}"':
return self.HtmlResponse(status=304, data=b"", etag=INDEX_ETAG)
return self.HtmlResponse(status=200, data=INDEX_HTML, etag=INDEX_ETAG)
def get_routes(self) -> List[web.RouteDef]:
return [
web.route("GET", "/", self._handle_index),
web.route("GET", "/api/search", self._handle_search),
web.route("POST", "/api/search", self._handle_search),
web.route("GET", "/api/tail", self._handle_tail),
web.route("POST", "/api/tail", self._handle_tail),
web.route("GET", "/api/flush", self._handle_flush),
web.route("POST", "/api/flush", self._handle_flush),
web.route("GET", "/api/alerts", self._handle_alerts),
web.route("GET", "/api/filters", self._handle_filters),
]
class ApiRunner(AsyncContextManager['ApiRunner']):
"""Run aiohttp webserver for registered API route handlers."""
def __init__(self, routes: List[web.RouteDef], bind_all: bool = False, port: int = 5141,
certfile: Optional[Path] = None, keyfile: Optional[Path] = None,
auth: Optional[Tuple[str, str]] = None) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._host: str = "0.0.0.0" if bind_all else "127.0.0.1"
self._port: int = port
self._auth: Optional[BasicAuth] = BasicAuth(login=auth[0], password=auth[1]) if auth is not None else None
self._ssl: Optional[ssl.SSLContext] = None
if certfile is not None:
self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self._ssl.load_cert_chain(certfile=certfile, keyfile=keyfile)
self._app: web.Application = web.Application(middlewares=[self._auth_middleware])
self._app.add_routes(routes)
self._runner = web.AppRunner(self._app)
self._site: Optional[web.BaseSite] = None
@web.middleware
async def _auth_middleware(self, request: web.Request, handler) -> web.Response:
if self._auth is not None:
try:
authorization: Optional[str] = request.headers.get(hdrs.AUTHORIZATION)
auth: Optional[BasicAuth] = BasicAuth.decode(authorization) if authorization is not None else None
except ValueError:
return web.Response(status=400)
if auth is None or not compare_digest(auth, self._auth):
return web.Response(status=401, headers={hdrs.WWW_AUTHENTICATE: f'Basic realm="{NAME}"'})
return await handler(request)
async def __aenter__(self) -> 'ApiRunner':
assert self._site is None
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port, ssl_context=self._ssl)
await self._site.start()
self._logger.info("Listening at {}://{}:{}/ (authentication {})".format(
"https" if self._ssl is not None else "http",
self._host, self._port,
"enabled" if self._auth is not None else "disabled"
))
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
assert self._site is not None
self._logger.debug("Shutting down")
await self._runner.cleanup()
self._site = None
return False
# endregion
# region MAIN
###
class SystemdNotifier(AsyncContextManager[None]):
"""Send ready/stopping commands to the systemd socket, if present."""
def __init__(self, enabled: bool) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
self._enabled: bool = enabled
self._notify_path: Optional[str] = os.getenv("NOTIFY_SOCKET") or None
if self._notify_path is not None and self._notify_path.startswith("@"): # abstract namespace socket
self._notify_path = "\0" + self._notify_path[1:]
if self._enabled and self._notify_path is None:
self._logger.warning("Cannot get NOTIFY_SOCKET")
async def _send(self, unix_path: str, command: str) -> bool:
sock: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.setblocking(False)
try:
await self._loop.sock_connect(sock, unix_path)
await self._loop.sock_sendall(sock, f"{command}\n".encode(encoding="utf-8", errors="strict"))
except (OSError, UnicodeError) as e:
self._logger.warning(f"Cannot send {command}: {str(e)}")
return False
else:
self._logger.info(f"Sent {command}")
return True
finally:
sock.close()
async def __aenter__(self) -> None:
if self._enabled and self._notify_path is not None:
await self._send(self._notify_path, "READY=1")
async def __aexit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
if self._enabled and self._notify_path is not None:
await self._send(self._notify_path, "STOPPING=1")
return False
class QueuedLogger(ContextManager[None]):
"""Move logging i/o into a separate thread."""
def __init__(self, logger: logging.Logger) -> None:
self._logger: logging.Logger = logger
self._queue: Queue[Optional[logging.LogRecord]] = Queue()
self._handler: logging.Handler = QueueHandler(self._queue)
self._listener: QueueListener = QueueListener(self._queue, *logger.handlers)
self._logger.handlers.clear()
self._logger.handlers.append(self._handler)
def __enter__(self) -> None:
self._listener.start()
def __exit__(self, *args: Any) -> None:
self._listener.stop()
self._queue.join()
self._handler.close()
for handler in self._listener.handlers:
handler.close()
self._listener.handlers = tuple()
async def wait_for_signal() -> None:
"""Async blocking until shutdown signal interrupt."""
logger: logging.Logger = logging.getLogger(NAME)
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
shutdown_requested: asyncio.Event = asyncio.Event()
def _handler(signum: int) -> None:
logger.debug(f"Received signal {signum}")
shutdown_requested.set()
for s in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
loop.add_signal_handler(s, _handler, s)
await shutdown_requested.wait()
logger.info("Exiting")
async def run(config: Config) -> int:
"""Main entrypoint."""
logging.addLevelName(TRACE, "TRACE")
logging.basicConfig(level=TRACE if config.debug else logging.DEBUG if config.verbose else logging.INFO,
format="%(levelname)s %(name)s: %(message)s")
with QueuedLogger(logging.getLogger()):
os.umask(0o027)
alerts: AlertRuleset = AlertRuleset(config.alerts)
filters: FilterRuleset = FilterRuleset(config.filters)
with FileCompressor(config.rotate_compress) as rotate_compressor:
files: FileRegistry = FileRegistry(root=config.data_dir, fmt=config.file_format, json_full=config.json_full,
jitter=config.flush_interval, max_shards=config.max_shards,
rotate_cb=rotate_compressor.handle)
with FileWriter(files, read_only=config.read_only,
max_buffered=config.max_buffered, flush_interval=config.flush_interval) as writer:
with FileReader(files, reverse=config.reverse) as reader:
with AlertDispatcher(alerts) as checker:
async with SyslogServer(filters.match, [writer.put, checker.check],
bind_all=config.syslog_bind_all, port=config.syslog_port):
api: ApiHandler = ApiHandler(alerts, filters, files, writer, reader)
async with ApiRunner(api.get_routes(),
bind_all=config.http_bind_all, port=config.http_port,
certfile=config.https_certfile, keyfile=config.https_keyfile,
auth=config.http_basic_credentials):
async with SystemdNotifier(enabled=config.systemd_notify):
await wait_for_signal()
return 0
# endregion
# region FRONTEND
###
# language=CSS
_INDEX_CSS: str = r"""
:root {
--pad: 1em;
--hpad: 0.5em;
--border: #dddddd;
--bg: #eeeeee;
}
html, body {
background-color: var(--bg);
color: #000000;
margin: 0;
padding: 0;
}
body, input {
font-family: monospace;
}
form, fieldset {
border: none;
padding: 0;
margin: 0;
}
.widget, .plot {
background-color: #ffffff;
border: var(--border) 1px solid;
border-radius: var(--hpad);
overflow: hidden;
}
.element {
margin: var(--pad);
}
#error {
display: block;
text-align: center;
color: #ff0000;
}
#error:empty {
display: none;
}
#query-help {
display: none;
text-align: center;
}
#query-help.visible {
display: block;
}
#searchbar {
padding: 0 var(--pad);
position: sticky;
top: var(--pad);
z-index: 1;
box-shadow: 0 0 0.25em 0.25em var(--bg);
}
#query-date, #query-line, #error, #query-help {
width: 100%;
margin: var(--pad) 0;
}
#query-line {
display: flex;
gap: var(--pad);
align-items: stretch;
}
#query-line #query {
flex-grow: 1;
}
#query-date {
display: flex;
gap: var(--pad);
flex-wrap: wrap;
justify-content: center;
}
.plot {
height: 200px;
}
#stats {
display: flex;
flex-flow: row wrap;
gap: var(--pad);
}
#stats .plot {
min-width: 250px;
flex-grow: 1;
box-sizing: border-box;
}
#stats .plot .modebar-container {
display: none;
}
#nav fieldset {
display: flex;
flex-wrap: wrap;
gap: var(--hpad);
margin: var(--pad);
align-items: stretch;
}
#nav fieldset > :first-child {
flex-grow: 1;
display: table;
}
#results {
white-space: nowrap;
display: table-cell;
vertical-align: middle;
}
#log {
max-height: 70vh;
white-space: pre;
overflow: scroll;
padding: var(--pad);
}
#log p {
margin: 0;
}
.element.empty, .element:empty, #stats.empty, .plot.empty {
display: none;
}
fieldset:disabled span, fieldset:disabled #error {
color: #aaaaaa;
}
#spinner {
display: none;
position: absolute;
bottom: 0; left: 0;
height: 2px; width: 100%;
background-color: #8fbbda;
background-repeat: no-repeat;
background-image: linear-gradient(#ffffff 0 0), linear-gradient(#ffffff 0 0);
background-size: 60% 100%;
animation: css-loaders-progress-16 3s infinite;
}
@keyframes css-loaders-progress-16 {
0% {background-position:-150% 0,-150% 0}
66% {background-position: 250% 0,-150% 0}
100% {background-position: 250% 0, 250% 0}
}
""" # noqa
# language=JS
_INDEX_JS: str = r"""
"use_strict";
const $ = document.getElementById.bind(document);
function log_format(record) {
return new Date(record.timestamp * 1000).toString().split(" (")[0] + " " +
(record.remote_host || "-") + " " + (record.hostname || "-") + " " + (record.application || "-") + "[" + (record.pid || "-") + "] " +
record.facility + "." + record.severity + ": " + record.message + "\n";
}
function date_from_offset(offset) {
let now = new Date();
now.setTime(now.getTime() - (offset * 1000));
now.setMinutes(now.getMinutes() - now.getTimezoneOffset());
return now.toISOString().slice(0,19);
}
function date_from_timestamp(ts) {
let dt = new Date(parseInt(ts) * 1000);
dt.setMinutes(dt.getMinutes() - dt.getTimezoneOffset());
return dt.toISOString().slice(0,19);
}
function date_to_timestamp(value) {
return String(Date.parse(value) / 1000);
}
function build_table(results) {
const log = $("log");
if (results === null) {
log.classList.add("empty")
return;
}
const fragment = new DocumentFragment();
for (let i=0; i<results.length; i++) {
fragment.append(document.createTextNode(log_format(results[i])));
}
log.replaceChildren(fragment);
log.scrollTo(0, 0);
log.classList.remove("empty")
}
function build_nav(data) {
if (data === null) {
$("results").textContent = $("offset").value = $("total").value = "";
$("nav-prev").disabled = $("nav-start").disabled = true;
$("nav-next").disabled = $("nav-end").disabled = true;
} else {
$("results").textContent = data.results.length? String(data.offset + 1) + " - " + String(data.offset + data.results.length) + " / " + String(data.total): String(data.total);
$("offset").value = data.offset;
$("total").value = data.total;
$("nav-prev").disabled = $("nav-start").disabled = data.offset <= 0;
$("nav-next").disabled = $("nav-end").disabled = data.offset + data.results.length >= data.total;
}
}
function build_plots(stats) {
const widget = $("stats");
widget.classList.add("empty");
if (stats === null) {
return;
}
for (const key in stats) {
const panel = $("plot-" + key);
if (Object.keys(stats[key]).length === 0 || Math.max(...Object.values(stats[key])) === 0) {
panel.classList.add("empty");
continue;
} else {
widget.classList.remove("empty");
panel.classList.remove("empty");
}
const color_discrete_map = {all: "rgb(192, 192, 192)", query: "rgb(143, 187, 218)", result: "rgb(31, 119, 180)"};
Plotly.react(panel,
[{
type: "pie", textinfo: "label", textposition: "inside",
labels: Object.keys(stats[key]), values: Object.values(stats[key]),
marker: {
colors: (key === "all")?
Object.keys(stats[key]).map(label => color_discrete_map[label]):
['rgb(102, 197, 204)', 'rgb(246, 207, 113)', 'rgb(248, 156, 116)', 'rgb(220, 176, 242)', 'rgb(135, 197, 95)', 'rgb(158, 185, 243)', 'rgb(254, 136, 177)', 'rgb(201, 219, 116)', 'rgb(139, 224, 164)', 'rgb(180, 151, 231)', 'rgb(179, 179, 179)'] // Pastel
},
}], {
autosize: true, margin: {b: 8, r: 8, l: 8, t: 8}, showlegend: false,
paper_bgcolor: "rgba(255,255,255,0)", plot_bgcolor: "rgba(255,255,255,0)",
title: {
text: panel.dataset.title, font: {size: 12},
x: 0, y: 1, xanchor: 'left', 'yanchor': 'top', pad: {l: 8, t: 8},
yref: 'container', automargin: false,
},
}, {responsive: true},
);
window.requestAnimationFrame(function(){ Plotly.relayout(panel, {}); });
}
}
function build_timeline(timeline_all, timeline_query, timeline_result) {
const panel = $("plot-timeline");
if (timeline_all === null || timeline_query === null || timeline_result === null || timeline_all.x.length === 0) {
panel.classList.add("empty");
return;
} else {
panel.classList.remove("empty");
}
Plotly.react(panel,
[
{type: "bar", name: "all", marker: {color: "rgb(192, 192, 192)"}, x: timeline_all.x.map(ts => new Date(ts * 1000.0)), y: timeline_all.y},
{type: "bar", name: "query", marker: {color: "rgb(143, 187, 218)"}, x: timeline_query.x.map(ts => new Date(ts * 1000.0)), y: timeline_query.y},
{type: "bar", name: "result", marker: {color: "rgb(31, 119, 180)"}, x: timeline_result.x.map(ts => new Date(ts * 1000.0)), y: timeline_result.y},
], {
barmode: "overlay", bargap: 0,
autosize: true, margin: {b: 20, r: 8, l: 8, t: 8}, showlegend: true,
legend: {x: 0, y: 1, xanchor: 'left', yanchor: 'top'},
paper_bgcolor: "rgba(255,255,255,0)", plot_bgcolor: "rgba(255,255,255,0)",
xaxis: {
range: [new Date(Math.min(timeline_all.range[0], timeline_query.range[0]) * 1000.0), new Date(Math.max(timeline_all.range[1], timeline_query.range[1]) * 1000)],
showgrid: false, showline: false, zeroline: false, visible: true
},
yaxis: {
range: [Math.min(timeline_all.range[2], timeline_query.range[2]), Math.max(timeline_all.range[3], timeline_query.range[3])],
showgrid: false, showline: false, zeroline: false, visible: false
},
}, {responsive: true},
);
window.requestAnimationFrame(function(){ Plotly.relayout(panel, {}); });
}
function process(data) {
build_table(data === null? null: data.results);
build_nav(data);
build_timeline(data === null? null: data.timeline_all, data === null? null: data.timeline_query, data === null? null: data.timeline_result);
build_plots(data === null? null: data.stats);
}
function disable(disabled) {
$("query-fields").disabled = disabled;
$("nav-fields").disabled = disabled;
$("query-help").classList.remove("visible");
$("spinner").style.display = disabled? "block": "none";
}
function request(method, url, callback) {
const xhr = new XMLHttpRequest();
xhr.withCredentials = true;
xhr.open(method, url, true);
xhr.onreadystatechange = function() {
if (this.readyState === XMLHttpRequest.DONE) {
const ct = this.getResponseHeader("Content-Type");
const data = (ct !== null && ct.startsWith("application/json"))? JSON.parse(this.responseText): null;
callback(data, this.status);
}
}
xhr.send();
}
function search(plaintext_tab) {
const error = $("error");
const start = $("query-start");
const end = $("query-end");
let params = new URLSearchParams();
params.append("q", $("query").value);
params.append("s", start.value? date_to_timestamp(start.value): "");
params.append("e", end.value? date_to_timestamp(end.value): "");
let page_params = new URLSearchParams();
page_params.append("_s", $("query-start-sel").value);
page_params.append("_e", $("query-end-sel").value);
let search_params = new URLSearchParams();
search_params.append("l", $("limit").value); // TODO: restore? replace state?
search_params.append("o", $("offset").value);
const http_params = (new URLSearchParams([...params, ...search_params])).toString();
const history_params = (new URLSearchParams([...params, ...page_params])).toString();
if (plaintext_tab) {
window.open("api/tail?" + http_params, '_blank').focus();
return;
}
disable(true);
request("POST", "api/search?" + http_params, function (data, status) {
if (history.state != history_params) {
history.pushState(history_params, null, "#" + history_params);
} else {
history.replaceState(history_params, null, "#" + history_params);
}
if (data !== null && data.hasOwnProperty("error")) {
error.textContent = data.error;
process(null);
} else if (status !== 200) {
error.textContent = "Request error, status " + String(status);
process(null);
} else if (data === null) {
error.textContent = "Request error, no data";
process(null);
} else {
error.textContent = "";
process(data);
}
disable(false);
});
}
function do_search(plaintext_tab) {
$("offset").value = 0;
$("total").value = 0;
search(plaintext_tab);
}
function do_nav(mode) {
const offset = $("offset");
const total = $("total");
const limit = $("limit");
if (mode === -2) offset.value = 0;
else if (mode === -1) offset.value = Math.max(0, parseInt(offset.value) - parseInt(limit.value));
else if (mode === 0) { if (!total.value || total.value === "0") return; }
else if (mode === 1) offset.value = parseInt(offset.value) + parseInt(limit.value);
else if (mode === 2) offset.value = Math.max(0, parseInt(total.value) - parseInt(limit.value));
search();
}
function do_date_select(elem) {
if (elem.id === "query-start-sel") $("query-start").value = (elem.value != -1)? date_from_offset(elem.value): "";
else if (elem.id === "query-start") $("query-start-sel").value = (elem.value != "")? 0: -1;
else if (elem.id === "query-end") $("query-end-sel").value = (elem.value != "")? 0: -1;
else if (elem.id === "query-end-sel") $("query-end").value = (elem.value != -1)? date_from_offset(elem.value): "";
}
function restore_search(params, update_history) {
if (params.has("_s") && params.get("_s") != "0") { $("query-start-sel").value = params.get("_s"); do_date_select($("query-start-sel")); }
else if (params.get("s")) { $("query-start").value = date_from_timestamp(params.get("s")); do_date_select($("query-start")); }
else { $("query-start-sel").value = $("query-start-sel").dataset.selected; do_date_select($("query-start-sel")); }
if (params.has("_e") && params.get("_e") != "0") { $("query-end-sel").value = params.get("_e"); do_date_select($("query-end-sel")); }
else if (params.get("e")) { $("query-end").value = date_from_timestamp(params.get("e")); do_date_select($("query-end")); }
else { $("query-end-sel").value = $("query-end-sel").dataset.selected; do_date_select($("query-end-sel")); }
if (params.has("l")) { $("limit").value = params.get("l"); }
if (params.has("o")) { $("offset").value = params.get("o"); }
if (params.has("q")) { $("query").value = params.get("q"); }
process(null);
disable(false);
}
window.addEventListener("popstate", function(event) {
restore_search(event.state? new URLSearchParams(event.state): new URLSearchParams());
});
window.addEventListener("load", function() {
restore_search(window.location.hash? new URLSearchParams(window.location.hash.substr(1)): new URLSearchParams());
});
""" # noqa
# language=XML
_ICON: str = r"""<svg xmlns="http://www.w3.org/2000/svg" height="48" viewBox="0 -960 960 960" width="48"><path fill="#1F77B4" d="M100.001-220.771v-45.383h391.154v45.383H100.001Zm0-203.076v-45.384h189.23v45.384h-189.23Zm0-203.077v-45.383h189.23v45.383h-189.23Zm727.998 406.923-159.23-158.077q-24.077 18.846-52.154 28.654-28.077 9.807-58.154 9.807-75.178 0-128.165-52.538-52.987-52.538-52.987-127.653 0-75.114 53.014-127.653 53.013-52.538 128.229-52.538 75.217 0 128.139 52.538Q739.614-594.922 739.614-520q0 30.077-9.808 58.154-9.807 28.076-29.038 51.768l159.231 158.077-32 32ZM558.226-385q56.793 0 96.399-39.227 39.605-39.226 39.605-95.345 0-56.12-39.37-95.582-39.37-39.461-96.163-39.461t-96.399 39.226q-39.605 39.226-39.605 95.346 0 56.12 39.37 95.581Q501.433-385 558.226-385Z"/></svg>""" # noqa
# language=HTML
INDEX_HTML: bytes = r"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1,user-scalable=no">
<meta name="robots" content="noindex,nofollow,noarchive">
<meta name="google" content="notranslate">
<title>{}</title>
<link rel="icon" href="data:image/svg+xml;base64,{}" type="image/svg+xml">
<style>{}</style>
</head>
<body>
<div id="searchbar" class="widget element"><form id="query-form" onsubmit="event.preventDefault(); return false;"><fieldset id="query-fields" disabled>
<div id="query-date">
<select onchange="do_date_select(this)" id="query-start-sel" data-selected="86400"><option value="0">custom</option><option value="3600">1 hour ago</option><option value="43200">12 hours ago</option><option value="86400" selected>24 hours ago</option><option value="259200">3 days ago</option><option value="-1">all</option></select>
<input onchange="do_date_select(this)" id="query-start" type="datetime-local" step="1">
<input onchange="do_date_select(this)" id="query-end" type="datetime-local" step="1">
<select onchange="do_date_select(this)" id="query-end-sel" data-selected="-1"><option value="0">custom</option><option value="3600">1 hour ago</option><option value="43200">12 hours ago</option><option value="86400">24 hours ago</option><option value="-1" selected>now</option></select>
</div>
<div id="query-line">
<input id="query" placeholder="" title="" type="search" accesskey="s" name="query" autofocus>
<input id="search" type="submit" value=" 🔍 " title="execute query" accesskey="r" onclick="do_search(false); return false;">
<input id="search" type="submit" value="📄" title="get plaintext" onclick="do_search(true); return false;">
<input type="button" value="❓" title="show help" onclick="document.getElementById('query-help').classList.toggle('visible'); return false;">
</div>
<span id="error"></span>
<span id="query-help">
<b>Fields:</b> *, facility, severity, remote_host/client, hostname/host, application/app, pid, message/msg<br>
<b>Values:</b> strings, "quoted strings", /regular expressions/i<br>
<b>Operators:</b> AND, OR, NOT, parenthesis<br>
<b>Examples:</b><br>*:"foo bar"<br>NOT application:kernel<br>application:kernel OR NOT facility:/daemon|kernel/<br>(app:/^cron$/i OR hostname:localhost) AND NOT (NOT message:"init:" OR pid:1)
</span>
</fieldset></form><div id="spinner"></div></div>
<div id="plot-timeline" class="plot widget element empty"></div>
<div id="stats" class="element empty">
<div id="plot-all" class="plot plot-pie empty" data-title="Results"></div>
<div id="plot-facility" class="plot plot-pie empty" data-title="Facility"></div>
<div id="plot-severity" class="plot plot-pie empty" data-title="Severity"></div>
<div id="plot-hostname" class="plot plot-pie empty" data-title="Hostname"></div>
<div id="plot-application" class="plot plot-pie empty" data-title="Application"></div>
</div>
<div id="nav" class="widget element"><form id="nav-form" onsubmit="event.preventDefault(); return false;"><fieldset id="nav-fields" disabled>
<div><span id="results"></span></div>
<input id="offset" type="hidden">
<input id="total" type="hidden">
<input type="submit" id="nav-start" value="⏪" title="first" onclick="do_nav(-2); return false;" disabled>
<input type="submit" id="nav-prev" value=" ◀️ " title="previous" onclick="do_nav(-1); return false;" disabled>
<select id="limit" onchange="do_nav(0); return false;"><option value="0">none</option><option value="10">10</option><option value="100">100</option><option value="1000" selected>1000</option><option value="10000">10000</option><option value="">all</option></select>
<input type="submit" id="nav-next" value=" ▶️ " title="next" onclick="do_nav(1); return false;" disabled>
<input type="submit" id="nav-end" value="⏩" title="last" onclick="do_nav(2); return false;" disabled>
</fieldset></form></div>
<div id="log" class="widget element"></div>
<script>{}</script>
<script>
{}
</script>
</body>
</html>
""".format(NAME, b64encode(_ICON.encode()).decode(), _INDEX_CSS, _INDEX_JS, get_plotlyjs()).encode("utf-8", errors="strict") # noqa
INDEX_ETAG: str = md5(INDEX_HTML).hexdigest() # nosec
# endregion
# region ENTRYPOINT
###
def main() -> int:
try:
parser = argparse.ArgumentParser(description=__doc__.strip(),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
Config.add_args(parser)
config: Config = Config.from_args(parser.parse_args())
except (ValueError, KeyError) as e:
print(f"Invalid configuration, {e.__class__.__name__}: {str(e)}", file=sys.stderr)
return 1
else:
return asyncio.run(run(config))
if __name__ == "__main__":
sys.exit(main())
# endregion