| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import asyncio
- import json
- from datetime import datetime
- from enum import Enum
- from typing import Callable, Iterable
- from opendevin.core.logger import opendevin_logger as logger
- from opendevin.events.serialization.event import event_from_dict, event_to_dict
- from opendevin.storage import FileStore, get_file_store
- from .event import Event
- from .serialization.event import EventSource
- class EventStreamSubscriber(str, Enum):
- AGENT_CONTROLLER = 'agent_controller'
- SERVER = 'server'
- RUNTIME = 'runtime'
- MAIN = 'main'
- TEST = 'test'
- class EventStream:
- sid: str
- _subscribers: dict[str, Callable]
- _cur_id: int
- _lock: asyncio.Lock
- _file_store: FileStore
- def __init__(self, sid: str):
- self.sid = sid
- self._file_store = get_file_store()
- self._subscribers = {}
- self._cur_id = 0
- self._lock = asyncio.Lock()
- self._reinitialize_from_file_store()
- def _reinitialize_from_file_store(self):
- events = self._file_store.list(f'sessions/{self.sid}/events')
- for event_str in events:
- id = self._get_id_from_filename(event_str)
- if id >= self._cur_id:
- self._cur_id = id + 1
- def _get_filename_for_id(self, id: int) -> str:
- return f'sessions/{self.sid}/events/{id}.json'
- def _get_id_from_filename(self, filename: str) -> int:
- return int(filename.split('/')[-1].split('.')[0])
- def get_events(self, start_id=0, end_id=None) -> Iterable[Event]:
- events = self._file_store.list(f'sessions/{self.sid}/events')
- for event_str in events:
- id = self._get_id_from_filename(event_str)
- if start_id <= id and (end_id is None or id <= end_id):
- event = self.get_event(id)
- yield event
- def get_event(self, id: int) -> Event:
- filename = self._get_filename_for_id(id)
- content = self._file_store.read(filename)
- data = json.loads(content)
- return event_from_dict(data)
- def subscribe(self, id: EventStreamSubscriber, callback: Callable):
- if id in self._subscribers:
- raise ValueError('Subscriber already exists: ' + id)
- else:
- self._subscribers[id] = callback
- def unsubscribe(self, id: EventStreamSubscriber):
- if id not in self._subscribers:
- logger.warning('Subscriber not found during unsubscribe: ' + id)
- else:
- del self._subscribers[id]
- # TODO: make this not async
- async def add_event(self, event: Event, source: EventSource):
- async with self._lock:
- event._id = self._cur_id # type: ignore [attr-defined]
- self._cur_id += 1
- event._timestamp = datetime.now() # type: ignore [attr-defined]
- event._source = source # type: ignore [attr-defined]
- data = event_to_dict(event)
- self._file_store.write(self._get_filename_for_id(event.id), json.dumps(data))
- for key, fn in self._subscribers.items():
- await fn(event)
|