import asyncio
import logging
from collections import defaultdict
from dataclasses import dataclass
from pathlib import PurePath
from typing import Optional, Dict, Callable, Awaitable, DefaultDict, Iterable, List, AsyncIterator
from uuid import UUID, uuid4
from nextcloud_tasks_api import TaskList, TaskFile, NextcloudTasksApi, ApiError
from nextcloud_tasks_api.ical import Task, ICalParser
from .status import StatusBar
@dataclass
class TaskRef:
uid: str
etag: str
href: PurePath
ical: Task
@classmethod
def from_task_file(cls, task: TaskFile) -> 'TaskRef':
ical: Task = Task(task.content)
uid, etag = ical.uid, task.etag
if uid is None or etag is None:
raise ValueError("Empty task UID")
return cls(uid=uid, etag=etag, href=task.href, ical=ical)
class TaskUpdateQueue:
def __init__(self) -> None:
self._queue: asyncio.Queue[Optional[Task]] = asyncio.Queue()
self._queued: DefaultDict[Optional[str], int] = defaultdict(int)
self._closed: bool = False
def put(self, task: Task) -> None:
self._queued[task.uid] += 1
self._queue.put_nowait(task)
async def join(self) -> None:
await self._queue.join()
async def join_uid(self, uid: Optional[str]) -> None:
while self._queued[uid] > 0:
await self._queue.join()
async def get_multi(self) -> List[Task]:
if self._closed:
return []
task: Optional[Task] = await self._queue.get()
if task is None:
self._closed = True
return []
tasks: List[Task] = [task]
while True:
try:
task = self._queue.get_nowait()
except asyncio.QueueEmpty:
break
if task is None:
self._closed = True
break
else:
tasks.append(task)
return tasks
def tasks_done(self, tasks: List[Task]) -> None:
for task in tasks:
self._queued[task.uid] -= 1
self._queue.task_done()
async def close(self) -> None:
self._queue.put_nowait(None)
await self._queue.join()
def closed(self) -> None:
# self._queued.clear()
self._queue.task_done() # acknowledge last None
self._closed = False
class TaskUpdater:
def __init__(self, api: NextcloudTasksApi, status: StatusBar, cb: Callable[[TaskRef], None]) -> None:
self._logger: logging.Logger = logging.getLogger("API")
self._api: NextcloudTasksApi = api
self._status: StatusBar = status
self._queue: TaskUpdateQueue = TaskUpdateQueue()
self._daemon: Optional[asyncio.Task] = None
self._task_list: Optional[TaskList] = None
self._task_refs: Dict[str, TaskRef] = {}
self._lock: asyncio.Lock = asyncio.Lock() # write task refs, API, status sync; needed by worker to drain queue
self._callback: Callable[[TaskRef], None] = cb
@property
def current_list(self) -> Optional[TaskList]:
return self._task_list
def _add(self, task: TaskFile) -> Optional[TaskRef]:
try:
ref: TaskRef = TaskRef.from_task_file(task)
self._task_refs[ref.uid] = ref
return ref
except ValueError as e:
self._logger.error(f"Cannot parse {task.href}: {str(e)}")
return None
async def join(self) -> None:
await self._queue.join()
async def get_lists(self) -> AsyncIterator[TaskList]:
with self._status.busy_ctx():
async with self._lock:
with self._status.sync_ctx():
try:
async for task_list in self._api.get_lists():
yield task_list
except ApiError as e:
self._logger.error(str(e))
async def create_list(self, name: str) -> Optional[TaskList]:
with self._status.busy_ctx():
async with self._lock:
with self._status.sync_ctx():
try:
return await self._api.create_list(name)
except ApiError as e:
self._logger.error(str(e))
return None
async def delete_current_list(self, delete_list: TaskList) -> None:
with self._status.busy_ctx():
task_list: Optional[TaskList] = self._task_list
if task_list is not None and task_list is delete_list:
await self.reset(None)
async with self._lock:
with self._status.sync_ctx():
try:
await self._api.delete_list(task_list)
except ApiError as e:
self._logger.error(str(e))
async def reset(self, task_list: Optional[TaskList]) -> None:
daemon, self._daemon = self._daemon, None
if daemon is not None:
await self._queue.close() # joined
await daemon
async with self._lock:
self._status.active = False
self._task_refs.clear()
if task_list is not None:
with self._status.sync_ctx():
try:
async for task in self._api.get_list(task_list):
self._add(task)
except ApiError as e:
self._logger.error(str(e))
self._task_list = None
self._task_refs.clear()
else:
self._task_list = task_list
self._daemon = asyncio.create_task(self._worker())
self._status.active = True
async def list_tasks(self) -> Iterable[TaskRef]:
async with self._lock:
if self._daemon is None or self._task_list is None:
return []
return self._task_refs.values()
async def _worker(self) -> None:
while True:
tasks: List[Task] = await self._queue.get_multi()
unique_tasks: Dict[str, Task] = {_.uid: _ for _ in tasks}
if not tasks:
break
with self._status.busy_ctx():
async with self._lock:
with self._status.sync_ctx():
await self._update(list(unique_tasks.values()))
for uid in unique_tasks.keys():
self._callback(self._task_refs[uid])
self._queue.tasks_done(tasks)
self._queue.closed()
async def _update(self, tasks: List[Task]) -> None:
refs: List[TaskRef] = [self._task_refs[_.uid] for _ in tasks]
futures: List[Awaitable[TaskFile]] = [
self._api.update_task(TaskFile(href=refs[i].href, etag=refs[i].etag, content=tasks[i].to_string()))
for i in range(len(tasks))
]
for task in await asyncio.gather(*futures, return_exceptions=True):
if isinstance(task, BaseException):
self._logger.error(str(task))
else:
self._add(task) # parse again, just in case
def set(self, task: Task) -> None:
assert self._daemon is not None and self._task_list is not None
self._queue.put(task)
async def get(self, uid: str) -> Task:
"""most recent state, no changes pending"""
assert self._daemon is not None and self._task_list is not None
with self._status.busy_ctx():
await self._queue.join_uid(uid)
return self._task_refs[uid].ical.copy() # NB: no lock
async def create_task(self, parent_uid: Optional[str], summaries: List[str]) -> AsyncIterator[Task]:
assert self._daemon is not None and self._task_list is not None
with self._status.busy_ctx():
await self._queue.join()
async with self._lock:
with self._status.sync_ctx():
futures: List[Awaitable[TaskFile]] = []
for summary in summaries:
uid: UUID = uuid4()
ical: Task = Task(ICalParser.create(uid))
ical.summary = summary
ical.related_to = parent_uid
futures.append(self._api.create_task(self._task_list, ical.to_string(), uid))
tasks = await asyncio.gather(*futures, return_exceptions=True)
for task in tasks:
if isinstance(task, BaseException):
self._logger.error(str(task))
else:
ref: Optional[TaskRef] = self._add(task)
if ref is not None:
yield ref.ical.copy()
async def delete_task(self, uid: List[List[str]]) -> None:
assert self._daemon is not None and self._task_list is not None
with self._status.busy_ctx():
await self._queue.join()
async with self._lock:
with self._status.sync_ctx():
for uid_list in uid:
tasks: List[TaskFile] = [TaskFile(self._task_refs[_].href, self._task_refs[_].etag, "")
for _ in uid_list if _ in self._task_refs]
for e in await asyncio.gather(*[self._api.delete_task(_) for _ in tasks],
return_exceptions=True):
if isinstance(e, BaseException):
self._logger.error(str(e))
for _ in uid_list:
if _ in self._task_refs:
del self._task_refs[_]