state.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import base64
  2. import pickle
  3. from dataclasses import dataclass, field
  4. from enum import Enum
  5. from typing import Any
  6. from opendevin.controller.state.task import RootTask
  7. from opendevin.core.logger import opendevin_logger as logger
  8. from opendevin.core.metrics import Metrics
  9. from opendevin.core.schema import AgentState
  10. from opendevin.events.action import (
  11. MessageAction,
  12. )
  13. from opendevin.events.action.agent import AgentFinishAction
  14. from opendevin.memory.history import ShortTermHistory
  15. from opendevin.storage.files import FileStore
  16. class TrafficControlState(str, Enum):
  17. # default state, no rate limiting
  18. NORMAL = 'normal'
  19. # task paused due to traffic control
  20. THROTTLING = 'throttling'
  21. # traffic control is temporarily paused
  22. PAUSED = 'paused'
  23. RESUMABLE_STATES = [
  24. AgentState.RUNNING,
  25. AgentState.PAUSED,
  26. AgentState.AWAITING_USER_INPUT,
  27. AgentState.FINISHED,
  28. ]
  29. @dataclass
  30. class State:
  31. """
  32. OpenDevin is a multi-agentic system.
  33. A `task` is an end-to-end conversation between OpenDevin (the whole sytem) and the
  34. user, which might involve one or more inputs from the user. It starts with
  35. an initial input (typically a task statement) from the user, and ends with either
  36. a `AgentFinishAction` initiated by the agent, or an error.
  37. A `subtask` is an end-to-end conversation between an agent and the user, or
  38. another agent. If a `task` is conducted by a single agent, then it's also a `subtask`
  39. itself. Otherwise, a `task` consists of multiple `subtasks`, each executed by
  40. one agent.
  41. A `State` is a mutable object associated with a `subtask`. It includes several
  42. mutable and immutable fields, among which `iteration` is shared across
  43. subtasks.
  44. For example, considering a task from the user: `tell me how many GitHub stars
  45. OpenDevin repo has`. Let's assume the default agent is CodeActAgent.
  46. -- TASK STARTS (SUBTASK 0 STARTS) --
  47. DELEGATE_LEVEL 0, ITERATION 0, LOCAL_ITERATION 0
  48. CodeActAgent: I should request help from BrowsingAgent
  49. -- DELEGATE STARTS (SUBTASK 1 STARTS) --
  50. DELEGATE_LEVEL 1, ITERATION 1, LOCAL_ITERATION 0
  51. BrowsingAgent: Let me find the answer on GitHub
  52. DELEGATE_LEVEL 1, ITERATION 2, LOCAL_ITERATION 1
  53. BrowsingAgent: I found the answer, let me convey the result and finish
  54. -- DELEGATE ENDS (SUBTASK 1 ENDS) --
  55. DELEGATE_LEVEL 0, ITERATION 3, LOCAL_ITERATION 1
  56. CodeActAgent: I got the answer from BrowsingAgent, let me convey the result
  57. and finish
  58. -- TASK ENDS (SUBTASK 0 ENDS) --
  59. Note how ITERATION counter is shared across agents, while LOCAL_ITERATION
  60. is local to each subtask.
  61. """
  62. root_task: RootTask = field(default_factory=RootTask)
  63. # global iteration for the current task
  64. iteration: int = 0
  65. # local iteration for the current subtask
  66. local_iteration: int = 0
  67. # max number of iterations for the current task
  68. max_iterations: int = 100
  69. confirmation_mode: bool = False
  70. history: ShortTermHistory = field(default_factory=ShortTermHistory)
  71. inputs: dict = field(default_factory=dict)
  72. outputs: dict = field(default_factory=dict)
  73. last_error: str | None = None
  74. agent_state: AgentState = AgentState.LOADING
  75. resume_state: AgentState | None = None
  76. traffic_control_state: TrafficControlState = TrafficControlState.NORMAL
  77. # global metrics for the current task
  78. metrics: Metrics = field(default_factory=Metrics)
  79. # local metrics for the current subtask
  80. local_metrics: Metrics = field(default_factory=Metrics)
  81. # root agent has level 0, and every delegate increases the level by one
  82. delegate_level: int = 0
  83. # start_id and end_id track the range of events in history
  84. start_id: int = -1
  85. end_id: int = -1
  86. almost_stuck: int = 0
  87. # NOTE: This will never be used by the controller, but it can be used by different
  88. # evaluation tasks to store extra data needed to track the progress/state of the task.
  89. extra_data: dict[str, Any] = field(default_factory=dict)
  90. def save_to_session(self, sid: str, file_store: FileStore):
  91. pickled = pickle.dumps(self)
  92. logger.debug(f'Saving state to session {sid}:{self.agent_state}')
  93. encoded = base64.b64encode(pickled).decode('utf-8')
  94. try:
  95. file_store.write(f'sessions/{sid}/agent_state.pkl', encoded)
  96. except Exception as e:
  97. logger.error(f'Failed to save state to session: {e}')
  98. raise e
  99. @staticmethod
  100. def restore_from_session(sid: str, file_store: FileStore) -> 'State':
  101. try:
  102. encoded = file_store.read(f'sessions/{sid}/agent_state.pkl')
  103. pickled = base64.b64decode(encoded)
  104. state = pickle.loads(pickled)
  105. except Exception as e:
  106. logger.error(f'Failed to restore state from session: {e}')
  107. raise e
  108. # update state
  109. if state.agent_state in RESUMABLE_STATES:
  110. state.resume_state = state.agent_state
  111. else:
  112. state.resume_state = None
  113. # don't carry last_error anymore after restore
  114. state.last_error = None
  115. # first state after restore
  116. state.agent_state = AgentState.LOADING
  117. return state
  118. def __getstate__(self):
  119. state = self.__dict__.copy()
  120. # save the relevant data from recent history
  121. # so that we can restore it when the state is restored
  122. if 'history' in state:
  123. state['start_id'] = state['history'].start_id
  124. state['end_id'] = state['history'].end_id
  125. # don't save history object itself
  126. state.pop('history', None)
  127. return state
  128. def __setstate__(self, state):
  129. self.__dict__.update(state)
  130. # recreate the history object
  131. if not hasattr(self, 'history'):
  132. self.history = ShortTermHistory()
  133. # restore the relevant data in history from the state
  134. self.history.start_id = self.start_id
  135. self.history.end_id = self.end_id
  136. # remove the restored data from the state if any
  137. def get_current_user_intent(self):
  138. """Returns the latest user message and image(if provided) that appears after a FinishAction, or the first (the task) if nothing was finished yet."""
  139. last_user_message = None
  140. last_user_message_image_urls: list[str] | None = []
  141. for event in self.history.get_events(reverse=True):
  142. if isinstance(event, MessageAction) and event.source == 'user':
  143. last_user_message = event.content
  144. last_user_message_image_urls = event.images_urls
  145. elif isinstance(event, AgentFinishAction):
  146. if last_user_message is not None:
  147. return last_user_message
  148. return last_user_message, last_user_message_image_urls