from typing import Optional, Tuple, AsyncIterator
from xml.sax.saxutils import escape
from .error import XmlError
try:
import lxml.etree as ET
except ModuleNotFoundError:
import xml.etree.ElementTree as ET
class DavParser:
"""
NextCloud-specific DAV XML request builder and response parsing.
Mostly reverse-engineered minimal payload from the Tasks Web App.
"""
@classmethod
async def _iterparse(cls, content: AsyncIterator[bytes]) -> AsyncIterator[ET.Element]:
parser: ET.XMLPullParser = ET.XMLPullParser(("end",))
try:
async for chunk in content:
try:
parser.feed(chunk)
for _, elem in parser.read_events():
yield elem
except ET.ParseError as e:
raise XmlError(f"Cannot parse XML response: {str(e)}") from None
finally:
try:
parser.close()
except ET.ParseError:
pass
@classmethod
async def _parse_propstat(cls,
elements: AsyncIterator[ET.Element],
code: int = 200) -> AsyncIterator[Tuple[str, ET.Element]]:
async for response in elements:
if response.tag == "{DAV:}response":
href: Optional[ET.Element] = response.find("{DAV:}href")
if href is not None and href.text:
for propstat in response.findall("{DAV:}propstat"):
status: Optional[ET.Element] = propstat.find("{DAV:}status")
if status is not None and status.text is not None and f" {code} " in status.text:
yield href.text, propstat
response.clear()
@classmethod
async def parse_propstat_for_status(cls, content: AsyncIterator[bytes]) -> AsyncIterator[str]:
"""Response hrefs with success statuscode."""
async for href, _ in cls._parse_propstat(cls._iterparse(content)):
yield href
@classmethod
async def parse_for_user_principal(cls, content: AsyncIterator[bytes]) -> AsyncIterator[str]:
"""Response to the current user principal, containing hrefs."""
async for _, propstat in cls._parse_propstat(cls._iterparse(content)):
href: Optional[ET.Element] = propstat.find("./{DAV:}prop/{DAV:}current-user-principal/{DAV:}href")
if href is not None and href.text:
yield href.text
@classmethod
async def parse_for_calendar_list(cls, content: AsyncIterator[bytes]) \
-> AsyncIterator[Tuple[str, Optional[str], Optional[str]]]:
"""Calendar (task list) href, displayname, and color."""
async for href, propstat in cls._parse_propstat(cls._iterparse(content)):
if propstat.find("./{DAV:}prop/{DAV:}resourcetype/{urn:ietf:params:xml:ns:caldav}calendar") is None:
continue
name_node: Optional[ET.Element] = propstat.find("./{DAV:}prop/{DAV:}displayname")
color_node: Optional[ET.Element] = propstat.find("./{DAV:}prop/{http://apple.com/ns/ical/}calendar-color")
name: Optional[str] = name_node.text if name_node is not None and name_node.text else None
color: Optional[str] = color_node.text if color_node is not None and color_node.text else None
yield href, name, color
@classmethod
async def parse_for_calendars(cls, content: AsyncIterator[bytes]) -> AsyncIterator[Tuple[str, Optional[str], str]]:
"""Task href, etag, and data."""
async for href, propstat in cls._parse_propstat(cls._iterparse(content)):
getetag: Optional[ET.Element] = propstat.find("./{DAV:}prop/{DAV:}getetag")
getcontenttype: Optional[ET.Element] = propstat.find("./{DAV:}prop/{DAV:}getcontenttype")
if getcontenttype is None or getcontenttype.text != "text/calendar; charset=utf-8; component=vtodo":
continue
calendar: Optional[ET.Element] = propstat.find("./{DAV:}prop/{urn:ietf:params:xml:ns:caldav}calendar-data")
if calendar is None or not calendar.text:
continue
yield href, getetag.text if getetag is not None and getetag.text else None, calendar.text
@classmethod
async def parse_for_calendar(cls, content: AsyncIterator[bytes]) -> Optional[Tuple[str, Optional[str], str]]:
"""Task href, etag, and data."""
async for href, etag, caldav in cls.parse_for_calendars(content):
return href, etag, caldav
return None
@classmethod
def get_user_principal(cls) -> bytes:
"""Request current user principal hrefs."""
# language=XML
return b"""<x0:propfind xmlns:x0="DAV:">
<x0:prop>
<x0:current-user-principal/>
</x0:prop>
</x0:propfind>"""
@classmethod
def get_propfind_calendar_list(cls) -> bytes:
"""Request calendar (task list) properties."""
# language=XML
return b"""<x0:propfind xmlns:x0="DAV:" xmlns:x1="http://apple.com/ns/ical/">
<x0:prop>
<x0:resourcetype/><x0:displayname/><x1:calendar-color/>
</x0:prop>
</x0:propfind>"""
@classmethod
def get_propfind_calendar(cls) -> bytes:
"""Request task properties."""
# language=XML
return b"""<x0:propfind xmlns:x0="DAV:" xmlns:x1="urn:ietf:params:xml:ns:caldav">
<x0:prop>
<x0:getcontenttype/><x0:getetag/><x1:calendar-data/>
</x0:prop>
</x0:propfind>"""
@classmethod
def get_report_calendars(cls, completed_filter: Optional[bool]) -> bytes:
"""Request tasks of a specific calendar."""
if completed_filter is None:
task_filter: bytes = b''
elif completed_filter: # language=XML
task_filter = b'<x1:prop-filter name="completed"><x1:is-defined/></x1:prop-filter>'
else: # language=XML
task_filter = b'<x1:prop-filter name="completed"><x1:is-not-defined/></x1:prop-filter>'
# language=XML
return b"""<x1:calendar-query xmlns:x0="DAV:" xmlns:x1="urn:ietf:params:xml:ns:caldav">
<x0:prop>
<x0:getcontenttype/><x0:getetag/><x1:calendar-data/>
</x0:prop>
<x1:filter>
<x1:comp-filter name="VCALENDAR">
<x1:comp-filter name="VTODO">
%b
</x1:comp-filter>
</x1:comp-filter>
</x1:filter>
</x1:calendar-query>""" % task_filter
@classmethod
def get_mkcol_calendar(cls, name: str, color: str) -> bytes:
"""Request creating a calendar."""
# language=XML
return b"""<x0:mkcol xmlns:x0="DAV:"
xmlns:x1="urn:ietf:params:xml:ns:caldav"
xmlns:x2="http://apple.com/ns/ical/">
<x0:set><x0:prop>
<x0:displayname>%b</x0:displayname>
<x2:calendar-color>%b</x2:calendar-color>
<x0:resourcetype>
<x0:collection/><x1:calendar/>
</x0:resourcetype>
<x1:supported-calendar-component-set>
<x1:comp name="VTODO"/>
</x1:supported-calendar-component-set>
</x0:prop></x0:set>
</x0:mkcol>""" % (escape(name).encode("utf-8", errors="strict"),
escape(color).encode("utf-8", errors="strict"))
@classmethod
def get_property_update(cls, name: str, color: str) -> bytes:
"""Request a calendar update."""
# language=XML
return b"""<x0:propertyupdate xmlns:x0="DAV:" xmlns:x1="http://apple.com/ns/ical/">
<x0:set><x0:prop>
<x0:displayname>%b</x0:displayname>
<x1:calendar-color>%b</x1:calendar-color>
</x0:prop></x0:set>
</x0:propertyupdate>""" % (escape(name).encode("utf-8", errors="strict"),
escape(color).encode("utf-8", errors="strict"))