state.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import base64
  2. import pickle
  3. from dataclasses import dataclass, field
  4. from enum import Enum
  5. from opendevin.controller.state.task import RootTask
  6. from opendevin.core.logger import opendevin_logger as logger
  7. from opendevin.core.metrics import Metrics
  8. from opendevin.core.schema import AgentState
  9. from opendevin.events.action import (
  10. MessageAction,
  11. )
  12. from opendevin.events.action.agent import AgentFinishAction
  13. from opendevin.memory.history import ShortTermHistory
  14. from opendevin.storage import get_file_store
  15. class TrafficControlState(str, Enum):
  16. # default state, no rate limiting
  17. NORMAL = 'normal'
  18. # task paused due to traffic control
  19. THROTTLING = 'throttling'
  20. # traffic control is temporarily paused
  21. PAUSED = 'paused'
  22. RESUMABLE_STATES = [
  23. AgentState.RUNNING,
  24. AgentState.PAUSED,
  25. AgentState.AWAITING_USER_INPUT,
  26. AgentState.FINISHED,
  27. ]
  28. @dataclass
  29. class State:
  30. """
  31. OpenDevin is a multi-agentic system.
  32. A `task` is an end-to-end conversation between OpenDevin (the whole sytem) and the
  33. user, which might involve one or more inputs from the user. It starts with
  34. an initial input (typically a task statement) from the user, and ends with either
  35. a `AgentFinishAction` initiated by the agent, or an error.
  36. A `subtask` is an end-to-end conversation between an agent and the user, or
  37. another agent. If a `task` is conducted by a single agent, then it's also a `subtask`
  38. itself. Otherwise, a `task` consists of multiple `subtasks`, each executed by
  39. one agent.
  40. A `State` is a mutable object associated with a `subtask`. It includes several
  41. mutable and immutable fields, among which `iteration` is shared across
  42. subtasks.
  43. For example, considering a task from the user: `tell me how many GitHub stars
  44. OpenDevin repo has`. Let's assume the default agent is CodeActAgent.
  45. -- TASK STARTS (SUBTASK 0 STARTS) --
  46. DELEGATE_LEVEL 0, ITERATION 0, LOCAL_ITERATION 0
  47. CodeActAgent: I should request help from BrowsingAgent
  48. -- DELEGATE STARTS (SUBTASK 1 STARTS) --
  49. DELEGATE_LEVEL 1, ITERATION 1, LOCAL_ITERATION 0
  50. BrowsingAgent: Let me find the answer on GitHub
  51. DELEGATE_LEVEL 1, ITERATION 2, LOCAL_ITERATION 1
  52. BrowsingAgent: I found the answer, let me convey the result and finish
  53. -- DELEGATE ENDS (SUBTASK 1 ENDS) --
  54. DELEGATE_LEVEL 0, ITERATION 3, LOCAL_ITERATION 1
  55. CodeActAgent: I got the answer from BrowsingAgent, let me convey the result
  56. and finish
  57. -- TASK ENDS (SUBTASK 0 ENDS) --
  58. Note how ITERATION counter is shared across agents, while LOCAL_ITERATION
  59. is local to each subtask.
  60. """
  61. root_task: RootTask = field(default_factory=RootTask)
  62. # global iteration for the current task
  63. iteration: int = 0
  64. # local iteration for the current subtask
  65. local_iteration: int = 0
  66. # max number of iterations for the current task
  67. max_iterations: int = 100
  68. confirmation_mode: bool = False
  69. history: ShortTermHistory = field(default_factory=ShortTermHistory)
  70. inputs: dict = field(default_factory=dict)
  71. outputs: dict = field(default_factory=dict)
  72. last_error: str | None = None
  73. agent_state: AgentState = AgentState.LOADING
  74. resume_state: AgentState | None = None
  75. traffic_control_state: TrafficControlState = TrafficControlState.NORMAL
  76. # global metrics for the current task
  77. metrics: Metrics = Metrics()
  78. # local metrics for the current subtask
  79. local_metrics: Metrics = Metrics()
  80. # root agent has level 0, and every delegate increases the level by one
  81. delegate_level: int = 0
  82. # start_id and end_id track the range of events in history
  83. start_id: int = -1
  84. end_id: int = -1
  85. almost_stuck: int = 0
  86. def save_to_session(self, sid: str):
  87. fs = get_file_store()
  88. pickled = pickle.dumps(self)
  89. logger.debug(f'Saving state to session {sid}:{self.agent_state}')
  90. encoded = base64.b64encode(pickled).decode('utf-8')
  91. try:
  92. fs.write(f'sessions/{sid}/agent_state.pkl', encoded)
  93. except Exception as e:
  94. logger.error(f'Failed to save state to session: {e}')
  95. raise e
  96. @staticmethod
  97. def restore_from_session(sid: str) -> 'State':
  98. fs = get_file_store()
  99. try:
  100. encoded = fs.read(f'sessions/{sid}/agent_state.pkl')
  101. pickled = base64.b64decode(encoded)
  102. state = pickle.loads(pickled)
  103. except Exception as e:
  104. logger.error(f'Failed to restore state from session: {e}')
  105. raise e
  106. if state.agent_state in RESUMABLE_STATES:
  107. state.resume_state = state.agent_state
  108. else:
  109. state.resume_state = None
  110. state.agent_state = AgentState.LOADING
  111. return state
  112. def __getstate__(self):
  113. state = self.__dict__.copy()
  114. # save the relevant data from recent history
  115. # so that we can restore it when the state is restored
  116. if 'history' in state:
  117. state['start_id'] = state['history'].start_id
  118. state['end_id'] = state['history'].end_id
  119. # don't save history object itself
  120. state.pop('history', None)
  121. return state
  122. def __setstate__(self, state):
  123. self.__dict__.update(state)
  124. # recreate the history object
  125. if not hasattr(self, 'history'):
  126. self.history = ShortTermHistory()
  127. # restore the relevant data in history from the state
  128. self.history.start_id = self.start_id
  129. self.history.end_id = self.end_id
  130. # remove the restored data from the state if any
  131. def get_current_user_intent(self):
  132. """Returns the latest user message that appears after a FinishAction, or the first (the task) if nothing was finished yet."""
  133. last_user_message = None
  134. for event in self.history.get_events(reverse=True):
  135. if isinstance(event, MessageAction) and event.source == 'user':
  136. last_user_message = event.content
  137. elif isinstance(event, AgentFinishAction):
  138. if last_user_message is not None:
  139. return last_user_message
  140. return last_user_message