#!/usr/bin/env python3
"""
Minimal ollama UI.
Featuring embedded web frontend, response streaming, wikipedia search tools, and conversation history.
No external dependencies or files are required.
"""
import argparse
import json
import os
import signal
import socket
import sys
import threading
import logging
import logging.config
from urllib.parse import ParseResult, urlparse, urlencode, parse_qs, urljoin
from urllib.request import Request, urlopen
from urllib.error import URLError
from html import escape
from http import HTTPStatus
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from http.client import HTTPResponse
from functools import lru_cache
from dataclasses import dataclass, field
from typing import Any, Iterator, Dict, Callable, Iterable, ClassVar, Literal
class ConversationHistory:
"""
Each frontend instance carries a unique token, to tell concurrent or new sessions apart.
For each, the message history is tracked can be re-fed alongside new prompts.
"""
def __init__(self, prompt: str | None = None, max_num: int = 24, max_len: int = 30000) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._hist: dict[str, list[dict]] = {}
self._lock: threading.Lock = threading.Lock()
self._max_num: int = max_num # number of sessions
self._max_len: int = max_len # string length of history (>tokens)
self._system: list[dict] = [{"role": "system", "content": prompt}] if prompt else []
def push(self, token: str | None, messages: list[dict]) -> list[dict]:
if token is None:
return self._system + messages
with self._lock:
# concurrent session limit
while len(self._hist) > self._max_num:
old_token: str = next(iter(self._hist.keys()))
del self._hist[old_token]
self._logger.debug(f"Removing conversation history for '{token}'")
# per-session length limit
while sum(len(_["content"]) for _ in self._hist.get(token, [])) > self._max_len:
self._hist[token].pop(0)
for message in messages:
if token not in self._hist:
self._hist[token] = [message]
self._logger.debug(f"Adding conversation '{token}'")
elif self._hist[token][-1]["role"] == message["role"] and message["role"] in ["user", "assistant"]:
self._hist[token][-1]["content"] += message["content"]
else:
self._hist[token].append(message)
return self._system + self._hist[token]
def clear(self, token: str | None) -> None:
with self._lock:
if token is not None and token in self._hist:
self._logger.warning(f"Clearing conversation history for '{token}'")
del self._hist[token]
class OllamaRequestError(Exception):
pass
class Tools:
"""
Wikipedia API calls as additional tools for the model to retrieve authoritative information.
"""
def __init__(self, language: str | None = "en") -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._language: str | None = language or None
self._ua: str = "Mozilla/5.0 (compatible; ollama-tool; +hackitu.de)"
def tools(self) -> dict[str, dict]:
return { # TODO: parse docstring and/or inspect signature with validated jsonschema
"wikipedia_search": {
"description": "Search Wikipedia for articles that match the given title",
"parameters": {
"title": "Term to search for",
}
},
"wikipedia_fetch": {
"description": "Retrieve the definition from the given Wikipedia link",
"parameters": {
"title": "Article to fetch for further details",
}
},
} if self._language is not None else {}
@lru_cache(maxsize=32)
def _call(self, name: str, **kwargs: Any) -> str:
try:
return getattr(self, f"_tool_{name}")(**kwargs)
except Exception as e:
self._logger.error(f"Tool '{name}': {str(e)}")
return str(e)
def call(self, name: str, arguments: dict[str, Any], **kwargs: Any) -> tuple[str, str]:
return name, self._call(name, **arguments)
def _request_api(self, params: dict[str, str]) -> Any:
assert self._language is not None
url: str = f"https://{self._language}.wikipedia.org/w/api.php?" + urlencode(params | {"format": "json"})
try:
response: HTTPResponse
with urlopen(Request(method="GET", url=url, headers={"User-Agent": self._ua}), timeout=10.0) as response:
if response.status != 200:
raise OllamaRequestError(f"Response status {response.status}: {url}")
elif (ct := response.headers.get("content-type", "")) != "application/json; charset=utf-8":
raise OllamaRequestError(f"Unexpected response type '{ct}': {url}")
else:
self._logger.info(f"{url} HTTP {response.status}")
return json.loads(response.read())
except (OSError, URLError, ValueError) as e:
raise OllamaRequestError(f"Request failed with {str(e)}: {url}") from None
def _api_opensearch(self, query: str) -> Iterator[tuple[str, str]]:
"""https://www.mediawiki.org/wiki/API:Opensearch"""
response: list = self._request_api({
"search": query,
"action": "opensearch", "namespace": "0",
"redirects": "resolve", "limit": "10",
})
for i, title in enumerate(response[1]):
yield title, response[3][i]
def _api_extract(self, query: str) -> Iterator[tuple[str, str]]:
"""https://www.mediawiki.org/wiki/Extension:TextExtracts#API"""
response: dict = self._request_api({
"titles": query,
"action": "query", "prop": "extracts",
"exintro": "1", "explaintext": "1",
})
for result in response["query"]["pages"].values():
if "pageid" in result:
yield result["title"], result["extract"]
def _tool_wikipedia_search(self, title: str) -> str:
return f"\n## \"{title}\"\n\n" + "".join([
f" * [{heading.replace(']', ']')}]({url.replace(')', ')')})\n"
for heading, url in self._api_opensearch(title)
])
def _tool_wikipedia_fetch(self, title: str) -> str:
return "".join([
f"\n## {heading}\n\n{extract.strip()}\n"
for heading, extract in self._api_extract(title)
])
class OllamaClient:
"""
Minimal Ollama HTTP JSON client with streaming and tool calling support.
"""
_decoder: ClassVar[json.JSONDecoder] = json.JSONDecoder()
_encoder: ClassVar[json.JSONEncoder] = json.JSONEncoder(ensure_ascii=False, check_circular=False, allow_nan=False, sort_keys=False)
def __init__(self, base_url: str, model: str, history: ConversationHistory, tools: Tools, timeout: float = 30.0) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._base_url: str = base_url
self._model: str = model
self._timeout: float = timeout
self._history: ConversationHistory = history
self._tools: Tools = tools
self._tools_schema: list[dict] = [{ # https://docs.ollama.com/capabilities/tool-calling#python
"type": "function",
"function": {
"name": name,
"description": tool["description"],
"parameters": {
"type": "object",
"required": list(tool["parameters"].keys()),
"properties": {k: {"type": "string", "description": v} for k, v in tool["parameters"].items()}
}
}
} for name, tool in self._tools.tools().items()]
def _request(self, method: str, url: str, expect_ct: str, data: bytes | None) -> Iterator[bytes]:
url = urljoin(self._base_url, url)
try:
response: HTTPResponse
with urlopen(Request(method=method, url=url, data=data), timeout=self._timeout) as response: # nosec
if response.status == 404 or response.status == 501:
raise OllamaRequestError(f"Response status {response.status}: {url}")
elif response.status != 200:
raise OllamaRequestError(f"Response status {response.status}: {url}")
elif (ct := response.headers.get("content-type", "")) != expect_ct:
raise OllamaRequestError(f"Unexpected response type '{ct}': {url}")
else:
self._logger.info(f"{method} {url} HTTP {response.status}")
while (line := response.readline()): # XXX: suboptimal implementation when .chunked
yield line
except (OSError, URLError) as e:
raise OllamaRequestError(f"Request failed with {str(e)}: {url}") from None
def _request_json(self, method: str, url: str, data: dict | None = None) -> dict:
return self._decoder.decode(b"".join(self._request(
method, url, "application/json; charset=utf-8",
data=self._encoder.encode(data).encode() if data is not None else None
)).decode(encoding="utf-8", errors="surrogatepass"))
def _request_ndjson(self, method: str, url: str, data: dict | None = None) -> Iterator[dict]:
body: bytes | None = self._encoder.encode(data).encode() if data is not None else None
for line in self._request(method, url, "application/x-ndjson", data=body):
yield self._decoder.decode(line.decode(encoding="utf-8", errors="surrogatepass"))
def check(self, token: str | None) -> str:
"""Start new sessions by checking connectivity and whether the model exists."""
models: list[str] = [_["name"] for _ in self._request_json("GET", "/api/tags")["models"]]
if self._model not in models:
raise OllamaRequestError(f"Model '{self._model}' not found in: {', '.join(models)}")
self._history.clear(token)
return self._model
def _chat(self, token: str | None, messages: list[dict]) -> Iterator[dict]:
"""Call the chat endpoint with message i/o."""
query: dict = {
"model": self._model,
"messages": self._history.push(token, messages),
"tools": self._tools_schema,
}
for chunk in self._request_ndjson("POST", "/api/chat", query):
if "message" in chunk:
self._history.push(token, [chunk["message"]])
yield chunk["message"]
def _generate(self, token: str | None, messages: list[dict]) -> Iterator[str]:
message_count: int = 0
tool_calls: list[dict] = []
for message in self._chat(token, messages):
message_count += 1
if message["content"]:
yield message["content"]
if "tool_calls" in message:
tool_calls.extend(message["tool_calls"])
self._logger.debug(f"Exchanged {len(messages)}/{message_count} message chunks")
tool_results: list[dict] = []
for tool_call in tool_calls:
name, content = self._tools.call(**tool_call["function"])
tool_results.append({"role": "tool", "tool_name": name, "content": content})
if content:
yield content
if tool_results: # recurse
yield "\n----\n" # <hr> to tell tools/model apart TODO: split message bubbles
yield from self._generate(token, tool_results)
def generate(self, token: str | None, prompt: str) -> Iterator[str]:
yield from self._generate(token, [{"role": "user", "content": prompt}])
@dataclass
class HttpRequest:
method: str
path: str
query: dict[str, str]
body: bytes = b""
@dataclass
class HttpResponse:
code: int
headers: Dict[str, str] = field(default_factory=dict)
body: bytes | Iterable[bytes] = b""
class OllamaUiHTTPServer(ThreadingHTTPServer):
"""
HTTP server with systemd socket support that delegates to external Handler class.
"""
class RequestHandler(BaseHTTPRequestHandler):
"""
Read and stream back bodies for accepted requests. Log to Logger.
"""
server: 'OllamaUiHTTPServer'
protocol_version = "HTTP/1.1"
_response_mode: Literal['chunked', 'stream', 'full'] = "stream"
def do_GET(self) -> None:
self._handle("GET")
def do_POST(self) -> None:
self._handle("POST")
def _handle(self, method: str) -> None:
try:
try:
content_length: int = int(self.headers.get("Content-Length", "0"))
post_data: bytes = self.rfile.read(content_length)
except (ValueError, OSError) as e:
self.server.logger.warning(f"Cannot read request body: {str(e)}")
response = HttpResponse(400, {"X-Exception": e.__class__.__name__})
else:
response = self.server.handle(method, self.path, post_data)
self.send_response(response.code)
self.send_header("Cache-Control", "no-cache")
for header, value in response.headers.items():
self.send_header(header.title(), value)
if isinstance(response.body, bytes):
self.send_header("Content-Length", str(len(response.body)))
self.end_headers()
self.wfile.write(response.body)
elif self._response_mode == "chunked": # exclicit transfer encoding
self.send_header("Transfer-Encoding", "chunked")
self.end_headers()
for chunk in response.body:
if chunk:
self.wfile.write(b"".join(("{:X}\r\n".format(len(chunk)).encode(), chunk, b"\r\n")))
self.wfile.write(b"0\r\n\r\n")
elif self._response_mode == "stream": # should be sufficient for local connections
self.send_header("Connection", "close")
self.end_headers()
for chunk in response.body:
self.wfile.write(chunk)
else: # wait for complete response for max compatibility
body: bytes = b"".join(response.body)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except OSError:
pass
def log_request(self, code: HTTPStatus | int | str = '-', size: int | str = '-') -> None:
if isinstance(code, HTTPStatus):
code = code.value
self._log(logging.DEBUG if code == 200 else logging.INFO,
'"%s" %s %s',
self.requestline, str(code), str(size))
def log_message(self, format: str, *args: Any) -> None:
self._log(logging.WARNING, format, *args)
def _log(self, level: int, format: str, *args: Any) -> None:
self.server.logger.log(level, "%s - - [%s] %s",
self.address_string(), self.log_date_time_string(), format % args)
def __init__(self,
server_address: tuple[str, int], systemd: bool,
handler: Callable[[HttpRequest], HttpResponse]) -> None:
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._systemd: bool = systemd and os.getenv("LISTEN_FDS", "") == "1"
self._handler: Callable[[HttpRequest], HttpResponse] = handler
super().__init__(server_address, self.RequestHandler, bind_and_activate=True)
def server_bind(self) -> None:
if not self._systemd:
super().server_bind()
else:
self.socket.close()
self.socket = socket.fromfd(3, self.address_family, self.socket_type) # SD_LISTEN_FDS_START
self.server_address = self.socket.getsockname()
self.logger.info("Obtained systemd socket")
def server_activate(self) -> None:
if not self._systemd:
super().server_activate()
else:
self._sd_notify(self._sd_notify_path(), b"READY=1\n")
def _sd_notify_path(self) -> str | None:
notify_path: str | None = os.getenv("NOTIFY_SOCKET") or None
if notify_path is not None and notify_path.startswith("@"): # abstract namespace socket
notify_path = "\0" + notify_path[1:]
if notify_path is None:
self.logger.warning("Cannot get NOTIFY_SOCKET")
return notify_path
def _sd_notify(self, notify_path: str | None, msg: bytes) -> None:
if notify_path is None:
return
else:
sock: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.setblocking(False)
try:
sock.connect(notify_path)
sock.sendall(msg)
except (OSError, UnicodeError) as e:
self.logger.warning(f"Cannot write to NOTIFY_SOCKET: {str(e)}")
else:
self.logger.debug("Sent READY status")
finally:
sock.close()
def handle(self, method: str, path: str, body: bytes) -> HttpResponse:
"""RequestHandler callback, calling external Handler class."""
try:
parsed: ParseResult = urlparse(path)
path = parsed.path
query: dict[str, str] = {k: v[0] for k, v in parse_qs(parsed.query).items()}
except ValueError as e:
self.logger.warning(f"Cannot parse request: {str(e)}")
return HttpResponse(400, {"X-Exception": e.__class__.__name__})
try:
return self._handler(HttpRequest(method, path, query, body))
except Exception as e:
self.logger.error(f"Cannot handle request: {str(e)}")
return HttpResponse(500, {"X-Exception": e.__class__.__name__})
def serve(self) -> bool:
"""Run until signal."""
shutdown_requested: threading.Event = threading.Event()
def _handler(signum: int, frame: Any) -> None:
shutdown_requested.set()
signal.signal(signal.SIGINT, _handler)
signal.signal(signal.SIGTERM, _handler)
thread: threading.Thread = threading.Thread(target=self.serve_forever)
thread.start()
self.logger.info("Serving on {}:{}".format(*self.server_address))
shutdown_requested.wait()
self.logger.info("Shutting down")
self.shutdown()
self.server_close()
thread.join()
return True
@classmethod
def run(cls, localhost: bool, port: int, systemd: bool, handler: Callable[[HttpRequest], HttpResponse]) -> bool:
try:
httpd: OllamaUiHTTPServer = OllamaUiHTTPServer(
server_address=("127.0.0.1" if localhost else "0.0.0.0", port), # nosec
systemd=systemd,
handler=handler,
)
except Exception as e:
raise RuntimeError(str(e)) from None
else:
return httpd.serve()
class TokenBuffer:
"""
Transform input tokens to HTTP output stream.
With more buffering involved than 'words', could already try to interpret markdown for innerHTML.
"""
@classmethod
def translate(cls, slop: Iterator[str]) -> Iterator[bytes]:
buffer: str = ""
for token in slop:
buffer += token
bound: int = max(buffer.rfind(" "), buffer.rfind("\n"))
if bound > 3:
token, buffer = buffer[:bound], buffer[bound:]
yield token.encode()
yield buffer.rstrip().encode()
class Handlers:
"""Routing of requests, handling actual functionality."""
def __init__(self, client: OllamaClient) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._client: OllamaClient = client
def handle(self, r: HttpRequest) -> HttpResponse:
try:
if r.path == "/":
return HttpResponse(
code=200,
headers={"Content-Type": "text/html; charset=utf-8"},
body=_INDEX_HTML,
)
elif r.path == "/check":
return HttpResponse(
code=200,
headers={"Content-Type": "text/plain; charset=utf-8"},
body=b"`" + escape(self._client.check(r.query.get("t"))).encode() + b"`",
)
elif r.path == "/q": # disregard /?q= as index
return HttpResponse(
code=200,
headers={"Content-Type": "text/plain; charset=utf-8"},
body=TokenBuffer.translate(self._client.generate(r.query.get("t"), r.body.decode())),
)
else:
return HttpResponse(404)
except OllamaRequestError as e:
self._logger.error(str(e))
return HttpResponse(502, {"X-Exception": e.__class__.__name__})
def _main(localhost: bool, port: int, systemd: bool, ollama_url: str, model: str,
system_prompt: str | None, tool_lang: str | None) -> int:
try:
handlers: Handlers = Handlers(OllamaClient(ollama_url, model,
ConversationHistory(prompt=system_prompt),
Tools(language=tool_lang)))
return 0 if OllamaUiHTTPServer.run(localhost, port, systemd, handlers.handle) else 1
except RuntimeError as e:
logging.getLogger(None).error(str(e))
return 1
def _setup_logging(debug: bool) -> None:
logging.raiseExceptions = False
logging.logThreads = True
logging.logMultiprocessing = False
logging.logProcesses = False
logging.config.dictConfig({
'version': 1,
'formatters': {'standard': {
'format': '%(levelname)s %(name)s: %(message)s',
}},
'handlers': {'default': {
'formatter': 'standard',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stderr',
}},
'loggers': {'': {
'handlers': ['default'],
'level': 'DEBUG' if debug else 'INFO',
'propagate': False,
}},
})
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.strip(),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--verbose', action='store_const', const=True, default=False,
help='enable debug logging')
parser.add_argument('--ollama-url', default='http://127.0.0.1:11434/',
help='ollama API base url', metavar="URL")
parser.add_argument('--model', default='llama3.1:latest',
help='ollama model to use', metavar="NAME")
parser.add_argument('--system', default=None,
help='custom system prompt', metavar="PROMPT")
parser.add_argument('--tool-lang', default=None,
help='wikipedia language such as "en" or "de"', metavar="LANG")
parser.add_argument('--localhost', action='store_const', const=True, default=False,
help='bind to localhost only')
parser.add_argument('--port', type=int, default=8080,
help='port to bind to')
parser.add_argument('--systemd', action='store_const', const=True, default=False,
help='use inherited socket for systemd activation')
args = parser.parse_args()
_setup_logging(args.verbose)
return _main(args.localhost, args.port, args.systemd, args.ollama_url, args.model, args.system, args.tool_lang)
# language=JS
_INDEX_JS: str = r"""
"use strict";
const conversation_token = (
Math.random().toString(16).substring(2) +
Math.random().toString(16).substring(2)
).substring(0, 12);
function sloppy_markdown(text) {
text = text
.replace(/\r\n/g, "\n").replace(/\r/g, "\n")
.replace(/&/g, "&")
.replace(/</g, "<").replace(/>/g, ">")
.replace(/"/g, """).replace(/'/g, "'");
text = text.replace(/^---+$/gm, "<hr>");
text = text.replace(/^##+([^#\n]+)#*$/gm, "<h3>$1</h3>");
text = text.replace(/^[ \t]*[*+-]+ +(.*)$/gm, "<ul><li>$1</li></ul>");
text = text.replace(/^[ \t]*([0-9]+[.)] +.*)$/gm, "\n<ul><li>$1</li></ul>\n");
text = text.replace(/\n+```+([^\n]+)?\n*([^`]+)\n+```\n+/g, "\n\n<pre><code title=\"$1\">$2</code></pre>\n\n");
text = text.replace(/\*\*+([^*\n]+)\*\*+/g, "<strong>$1</strong>");
text = text.replace(/`+([^`\n]+)`+/g, "<code>$1</code>");
text = text.replace(/\[([^\]]+)\]\((https?:\/\/)([^/)]+)([^)]*)\)/g, "<a href=\"$2$3$4\">$1 <em>($3)</em></a>");
text = text.replace(/([^>\n])\n([^<\n])/g, "$1<br>$2");
text = text.replace(/([^>\n])\n+([^<\n])/g, "$1<br><br>$2");
return text;
}
function question_cleanup(text) {
return sloppy_markdown(text)
.replace(/<([a-zA-Z-]+)>/g, "<em title=\"$1\">")
.replace(/<\/[a-zA-Z-]+>/g, "</em>");
}
function add_bubble(cls, html) {
const msg = document.createElement("div");
msg.classList.add(cls);
msg.innerHTML = html;
document.getElementById("convo").appendChild(msg);
msg.scrollIntoView();
return msg;
}
function add_question(text) {
return add_bubble("question", question_cleanup(text));
}
function add_answer(text) {
return add_bubble("answer", sloppy_markdown(text));
}
async function add_answer_stream(readable_stream) {
const msg = add_answer("");
const decoder = new TextDecoder("utf-8", {fatal: true});
let buffer = "";
for await (const chunk of readable_stream) {
try {
buffer += decoder.decode(chunk, {stream: false});
} catch (TypeError) {
buffer += decoder.decode(chunk, {stream: true});
}
msg.innerHTML = sloppy_markdown(buffer);
}
buffer += decoder.decode(new Uint8Array(), {stream: false});
msg.innerHTML = sloppy_markdown(buffer);
msg.scrollIntoView();
return msg;
}
function set_busy(is_busy) {
document.getElementsByTagName("fieldset")[0].disabled = !!is_busy;
document.getElementById("spinner").style.opacity = is_busy? 1: 0;
}
function send_query(form, query=null) {
if (query === null) {
if (!form.reportValidity()) return false;
query = form.getElementsByTagName("textarea")[0].value.trim();
}
set_busy(true);
add_question(query);
fetch(form.getAttribute("action") + "?t=" + conversation_token, {
method: form.getAttribute("method"),
headers: {"Content-Type": "text/plain"},
body: query.trim()
}).then(response => {
if (!response.ok) throw new Error(`Response status: ${response.status}`);
add_answer_stream(response.body).then(msg => {
form.reset();
}).catch(error => {
add_answer(String(error));
}).finally(() => {
set_busy(false);
})
}).catch(error => {
add_answer(String(error));
set_busy(false);
});
return false;
}
function send_check() {
const page_params = new URLSearchParams(window.location.search);
const page_query = page_params.get("q");
set_busy(true);
fetch("check" + "?t=" + conversation_token, {
method: "POST",
headers: {"Content-Type": "text/plain"},
}).then(response => {
if (!response.ok) throw new Error(`Response status: ${response.status}`);
return response.text();
}).then(text => {
add_answer(text);
if (page_query) {
send_query(document.getElementById("form"), page_query);
} else {
set_busy(false);
}
}).catch(error => {
add_answer(String(error));
set_busy(false);
});
}
document.getElementById("q").addEventListener("keypress", e => {
if (e.key === "Enter" && e.ctrlKey) {
e.preventDefault();
send_query(document.getElementById("form"));
}
});
send_check();
"""
# language=CSS
_INDEX_CSS: str = r"""
:root {
--font-size: 100%;
--text-color: #15141a;
--main-bg-color: #ffffff;
--highlight-color: #145ba6;
--ui-bg-color: #f0f0f4;
--box-bg-color-q: #eeeeef;
--box-bg-color-a: #f8f8f9;
--border-radius: 0.5rem;
--pad: 1rem;
--pad-s: 0.5rem;
--input-height: 20vh;
}
@media (prefers-color-scheme: dark) {
:root {
--text-color: #f8f9f9;
--main-bg-color: #1c1b22;
--ui-bg-color: #33323a;
--box-bg-color-q: #2c2b32;
--box-bg-color-a: #222128;
}
}
@media (max-width: 600px) {
:root {
--font-size: 0.9rem;
--pad: 0.8rem;
--pad-s: 0.4rem;
--input-height: 10vh;
}
}
* {
box-sizing: border-box;
}
html, body {
width: 100%;
height: 100%;
padding: 0;
margin: 0;
background-color: var(--main-bg-color);
color: var(--text-color);
font-size: var(--font-size);
font-family: sans-serif;
}
body {
display: grid;
grid-template-rows: 1fr var(--input-height);
}
fieldset {
padding: 0;
margin: 0;
border: none;
height: 100%;
display: grid;
grid-template-columns: 1fr 3rem;
justify-items: stretch;
gap: var(--pad-s);
}
form {
position: relative;
padding: var(--pad);
padding-top: var(--pad-s);
margin: 0;
border: none;
}
form textarea, form input {
outline: none;
border: none;
margin: 0;
transition: background-color 0.1s;
background-color: var(--ui-bg-color);
color: var(--text-color);
border-radius: var(--border-radius);
font-size: var(--font-size);
font-family: sans-serif;
}
form textarea:disabled, form input:disabled {
background-color: var(--box-bg-color-a);
}
form textarea {
padding: var(--pad);
resize: none;
min-height: 0;
min-width: 0;
}
form input {
cursor: pointer;
}
div#convo {
display: flex;
flex-direction: column;
gap: var(--pad);
padding-top: var(--pad);
padding-bottom: var(--pad-s);
overflow-y: scroll;
scroll-behavior: smooth;
}
div#convo hr, div#convo h3 {
margin: 1em 0 1em 0;
}
div#convo > div > :first-child {
margin-top: auto;
}
div#convo .question, div#convo .answer {
border-radius: var(--border-radius);
padding: var(--pad);
overflow-wrap: anywhere;
}
div#convo pre {
white-space: pre-wrap;
}
div#convo a {
color: inherit;
text-decoration: underline;
}
div#convo hr {
height: 0;
border: 0;
border-top: 1px solid var(--text-color);
}
div#convo .question {
align-self: flex-end;
margin: 0 var(--pad) 0 10vw;
background-color: var(--box-bg-color-q);
}
div#convo .answer {
align-self: flex-start;
margin: 0 10vw 0 var(--pad);
background-color: var(--box-bg-color-a);
}
#spinner {
opacity: 0;
display: block;
position: absolute;
top: -1px; left: 0;
height: 2px; width: 100%;
background-color: var(--highlight-color);
background-repeat: no-repeat;
background-image: linear-gradient(var(--main-bg-color) 0 0), linear-gradient(var(--main-bg-color) 0 0);
background-size: 60% 100%;
animation: css-loaders-progress-16 3s infinite;
transition: opacity 0.1s;
}
@keyframes css-loaders-progress-16 {
0% {background-position:-150% 0,-150% 0}
66% {background-position: 250% 0,-150% 0}
100% {background-position: 250% 0, 250% 0}
}
"""
# language=HTML
_INDEX_HTML: bytes = fr"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<meta name="robots" content="noindex,nofollow,nosnippet,notranslate,noarchive">
<meta name="theme-color" content="#ffffff">
<meta name="theme-color" content="#1c1b22" media="(prefers-color-scheme: dark)">
<title>Ollama</title>
<link rel="icon" href="data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAACXBIWXMAAAsTAAALEwEAmpwYAAAAAXNSR0IArs4c6QAAAARnQU1BAACxjwv8YQUAAAFfSURBVHgBfVPBcYMwEDx7XAAdmBLowLgCU4JdgXEFDBUQKiBUAFQAHUAH8OUFX17K7QU5igLZmR2E7m61dwgiC0opn5kwe/WDlpkxXdoDBx0UTtOkiqJQNuq6Vn0vmhB3topxirrf70IUmMW+7wsNR44pkOgIksqyVEEQiFAYhm9Bz/NMUwlqT2tfoemobVsahoGyLJP3x+NBruvaXYdcWx15EekdPpnmeZZkngPxiUKsz+ezxJBjICDdu7aPIe4BMWMOQA8HninpOA7FcUxd18nz9XrJmmdBG3BP9g5sXi4XqqqKbreb7OV5TnyyxGzAwWAL6GRNaTYIxB1ooINA9cvTOu3n8ynWwSiK3u1ZLjp9dQXWd96ENUT3eDgcGlZKIQfrTdPQHhAzWki5dpCVvsqW+uZnxA1VxlXGDIiV0Nh1WZaU/gFOH8cROde15i/QF/NTGRdMff/aHxiBnf8FaBDEF08szE0AAAAASUVORK5CYII=">
<style>{_INDEX_CSS}</style>
</head>
<body>
<div id="convo"></div>
<form method="POST" action="q" onsubmit="send_query(this); return false;" id="form">
<div id="spinner"></div>
<fieldset>
<textarea id="q" placeholder="Ask AI" autocomplete="off" autofocus required></textarea>
<input type="submit" value="➤" title="Send (Ctrl+Enter)">
</fieldset>
</form>
<script>{_INDEX_JS}</script>
</body>
</html>
""".encode() # noqa
if __name__ == "__main__":
sys.exit(main())