|
|
@@ -2,13 +2,14 @@ import asyncio
|
|
|
import json
|
|
|
from datetime import datetime
|
|
|
from enum import Enum
|
|
|
-from typing import Callable
|
|
|
+from typing import Callable, Iterable
|
|
|
|
|
|
from opendevin.core.logger import opendevin_logger as logger
|
|
|
from opendevin.events.serialization.event import event_from_dict, event_to_dict
|
|
|
from opendevin.storage import FileStore, get_file_store
|
|
|
|
|
|
-from .event import Event, EventSource
|
|
|
+from .event import Event
|
|
|
+from .serialization.event import EventSource
|
|
|
|
|
|
|
|
|
class EventStreamSubscriber(str, Enum):
|
|
|
@@ -22,7 +23,7 @@ class EventStreamSubscriber(str, Enum):
|
|
|
class EventStream:
|
|
|
sid: str
|
|
|
_subscribers: dict[str, Callable]
|
|
|
- _events: list[Event]
|
|
|
+ _cur_id: int
|
|
|
_lock: asyncio.Lock
|
|
|
_file_store: FileStore
|
|
|
|
|
|
@@ -30,22 +31,36 @@ class EventStream:
|
|
|
self.sid = sid
|
|
|
self._file_store = get_file_store()
|
|
|
self._subscribers = {}
|
|
|
- self._events = []
|
|
|
+ self._cur_id = 0
|
|
|
self._lock = asyncio.Lock()
|
|
|
+ self._reinitialize_from_file_store()
|
|
|
|
|
|
- def _get_filename_for_event(self, event: Event):
|
|
|
- # TODO: change to .id once that prop is in
|
|
|
- return f'sessions/{self.sid}/events/{event._id}.json' # type: ignore [attr-defined]
|
|
|
+ def _reinitialize_from_file_store(self):
|
|
|
+ events = self._file_store.list(f'sessions/{self.sid}/events')
|
|
|
+ for event_str in events:
|
|
|
+ id = self._get_id_from_filename(event_str)
|
|
|
+ if id >= self._cur_id:
|
|
|
+ self._cur_id = id + 1
|
|
|
|
|
|
- async def _rehydrate(self):
|
|
|
- async with self._lock:
|
|
|
- self._events = []
|
|
|
- events = self._file_store.list(f'sessions/{self.sid}/events')
|
|
|
- for event_str in events:
|
|
|
- content = self._file_store.read(event_str)
|
|
|
- data = json.loads(content)
|
|
|
- event = event_from_dict(data)
|
|
|
- self._events.append(event)
|
|
|
+ def _get_filename_for_id(self, id: int) -> str:
|
|
|
+ return f'sessions/{self.sid}/events/{id}.json'
|
|
|
+
|
|
|
+ def _get_id_from_filename(self, filename: str) -> int:
|
|
|
+ return int(filename.split('/')[-1].split('.')[0])
|
|
|
+
|
|
|
+ def get_events(self, start_id=0, end_id=None) -> Iterable[Event]:
|
|
|
+ events = self._file_store.list(f'sessions/{self.sid}/events')
|
|
|
+ for event_str in events:
|
|
|
+ id = self._get_id_from_filename(event_str)
|
|
|
+ if start_id <= id and (end_id is None or id <= end_id):
|
|
|
+ event = self.get_event(id)
|
|
|
+ yield event
|
|
|
+
|
|
|
+ def get_event(self, id: int) -> Event:
|
|
|
+ filename = self._get_filename_for_id(id)
|
|
|
+ content = self._file_store.read(filename)
|
|
|
+ data = json.loads(content)
|
|
|
+ return event_from_dict(data)
|
|
|
|
|
|
def subscribe(self, id: EventStreamSubscriber, callback: Callable):
|
|
|
if id in self._subscribers:
|
|
|
@@ -62,12 +77,11 @@ class EventStream:
|
|
|
# TODO: make this not async
|
|
|
async def add_event(self, event: Event, source: EventSource):
|
|
|
async with self._lock:
|
|
|
- event._id = len(self._events) # type: ignore [attr-defined]
|
|
|
- event._timestamp = datetime.now() # type: ignore [attr-defined]
|
|
|
- event._source = source # type: ignore [attr-defined]
|
|
|
- self._file_store.write(
|
|
|
- self._get_filename_for_event(event), json.dumps(event_to_dict(event))
|
|
|
- )
|
|
|
- self._events.append(event)
|
|
|
+ event._id = self._cur_id # type: ignore [attr-defined]
|
|
|
+ self._cur_id += 1
|
|
|
+ event._timestamp = datetime.now() # type: ignore [attr-defined]
|
|
|
+ event._source = source # type: ignore [attr-defined]
|
|
|
+ data = event_to_dict(event)
|
|
|
+ self._file_store.write(self._get_filename_for_id(event.id), json.dumps(data))
|
|
|
for key, fn in self._subscribers.items():
|
|
|
await fn(event)
|