Source code for mergecal.calendar_merger
from dataclasses import dataclass, field
from icalendar import Calendar, Component
from x_wr_timezone import to_standard
ComponentId = tuple[str, int, str | None]
[docs]
def calendars_from_ical(data: bytes) -> list[Calendar]:
"""Parse ICS data, returning one Calendar per VCALENDAR component found."""
return Calendar.from_ical(data, multiple=True)
def generate_default_prodid() -> str:
"""Generate a default PRODID."""
return "-//mergecal.org//MergeCal//EN"
@dataclass
class _ComponentTracker:
target: Calendar
seen: set[ComponentId] = field(default_factory=set)
no_uid: list[Component] = field(default_factory=list)
def add(self, component: Component, calendar_color: str | None) -> None:
uid = component.uid
component_id: ComponentId = (
uid,
component.sequence,
component.get("recurrence-id", None),
)
if not uid:
if component in self.no_uid:
return
self.no_uid.append(component)
else:
if component_id in self.seen:
return
self.seen.add(component_id)
if calendar_color and not component.color:
component.color = calendar_color
self.target.add_component(component)
[docs]
class CalendarMerger:
"""Merge multiple calendars into one."""
def __init__(
self,
calendars: list[Calendar],
prodid: str | None = None,
version: str = "2.0",
calscale: str = "GREGORIAN",
method: str | None = None,
):
self.merged_calendar = Calendar()
self.merged_calendar.add("prodid", prodid or generate_default_prodid())
self.merged_calendar.add("version", version)
self.merged_calendar.add("calscale", calscale)
if method:
self.merged_calendar.add("method", method)
self.calendars: list[Calendar] = []
self._merged = False
for calendar in calendars:
self.add_calendar(calendar)
[docs]
def add_calendar(self, calendar: Calendar) -> None:
"""Add a calendar to be merged."""
self.calendars.append(to_standard(calendar, add_timezone_component=True))
[docs]
def merge(self) -> Calendar:
"""Merge the calendars."""
if self._merged:
raise RuntimeError(
"merge() can only be called once; "
"create a new CalendarMerger instance to merge again"
)
self._merged = True
tracker = _ComponentTracker(self.merged_calendar)
tzids: set[str] = set()
for cal in self.calendars:
# .color resolves COLOR then X-APPLE-CALENDAR-COLOR (RFC 7986 ยง5.9)
calendar_color = cal.color
if calendar_color and not self.merged_calendar.color:
self.merged_calendar.color = calendar_color
for timezone in cal.timezones:
if timezone.tz_name not in tzids:
self.merged_calendar.add_component(timezone)
tzids.add(timezone.tz_name)
for component in cal.events + cal.todos + cal.journals:
tracker.add(component, calendar_color)
return self.merged_calendar
[docs]
def merge_calendars(calendars: list[Calendar], **kwargs: object) -> Calendar:
"""Convenience function to merge calendars."""
merger = CalendarMerger(calendars, **kwargs) # type: ignore
return merger.merge()
__all__ = ["CalendarMerger", "calendars_from_ical", "merge_calendars"]