Source code for mergecal.calendar_merger

from dataclasses import dataclass, field

from icalendar import Calendar, Component
from x_wr_timezone import to_standard

ComponentId = tuple[str, int, str | None]


[docs] def calendars_from_ical(data: bytes) -> list[Calendar]: """Parse ICS data, returning one Calendar per VCALENDAR component found.""" return Calendar.from_ical(data, multiple=True)
def generate_default_prodid() -> str: """Generate a default PRODID.""" return "-//mergecal.org//MergeCal//EN" @dataclass class _ComponentTracker: target: Calendar seen: set[ComponentId] = field(default_factory=set) no_uid: list[Component] = field(default_factory=list) def add(self, component: Component, calendar_color: str | None) -> None: uid = component.uid component_id: ComponentId = ( uid, component.sequence, component.get("recurrence-id", None), ) if not uid: if component in self.no_uid: return self.no_uid.append(component) else: if component_id in self.seen: return self.seen.add(component_id) if calendar_color and not component.color: component.color = calendar_color self.target.add_component(component)
[docs] class CalendarMerger: """Merge multiple calendars into one.""" def __init__( self, calendars: list[Calendar], prodid: str | None = None, version: str = "2.0", calscale: str = "GREGORIAN", method: str | None = None, ): self.merged_calendar = Calendar() self.merged_calendar.add("prodid", prodid or generate_default_prodid()) self.merged_calendar.add("version", version) self.merged_calendar.add("calscale", calscale) if method: self.merged_calendar.add("method", method) self.calendars: list[Calendar] = [] self._merged = False for calendar in calendars: self.add_calendar(calendar)
[docs] def add_calendar(self, calendar: Calendar) -> None: """Add a calendar to be merged.""" self.calendars.append(to_standard(calendar, add_timezone_component=True))
[docs] def merge(self) -> Calendar: """Merge the calendars.""" if self._merged: raise RuntimeError( "merge() can only be called once; " "create a new CalendarMerger instance to merge again" ) self._merged = True tracker = _ComponentTracker(self.merged_calendar) tzids: set[str] = set() for cal in self.calendars: # .color resolves COLOR then X-APPLE-CALENDAR-COLOR (RFC 7986 ยง5.9) calendar_color = cal.color if calendar_color and not self.merged_calendar.color: self.merged_calendar.color = calendar_color for timezone in cal.timezones: if timezone.tz_name not in tzids: self.merged_calendar.add_component(timezone) tzids.add(timezone.tz_name) for component in cal.events + cal.todos + cal.journals: tracker.add(component, calendar_color) return self.merged_calendar
[docs] def merge_calendars(calendars: list[Calendar], **kwargs: object) -> Calendar: """Convenience function to merge calendars.""" merger = CalendarMerger(calendars, **kwargs) # type: ignore return merger.merge()
__all__ = ["CalendarMerger", "calendars_from_ical", "merge_calendars"]