agent_controller.py 10 KB


  1. import asyncio
  2. import traceback
  3. from typing import Optional, Type
  4. from opendevin.controller.agent import Agent
  5. from opendevin.controller.state.state import State
  6. from opendevin.core.config import config
  7. from opendevin.core.exceptions import (
  8. AgentMalformedActionError,
  9. AgentNoActionError,
  10. LLMOutputError,
  11. MaxCharsExceedError,
  12. )
  13. from opendevin.core.logger import opendevin_logger as logger
  14. from opendevin.core.schema import AgentState
  15. from opendevin.events.action import (
  16. Action,
  17. AddTaskAction,
  18. AgentDelegateAction,
  19. AgentFinishAction,
  20. ChangeAgentStateAction,
  21. MessageAction,
  22. ModifyTaskAction,
  23. NullAction,
  24. )
  25. from opendevin.events.event import Event
  26. from opendevin.events.observation import (
  27. AgentDelegateObservation,
  28. AgentStateChangedObservation,
  29. CmdOutputObservation,
  30. ErrorObservation,
  31. NullObservation,
  32. Observation,
  33. )
  34. from opendevin.events.stream import EventSource, EventStream, EventStreamSubscriber
  35. MAX_ITERATIONS = config.max_iterations
  36. MAX_CHARS = config.llm.max_chars
  37. class AgentController:
  38. id: str
  39. agent: Agent
  40. max_iterations: int
  41. event_stream: EventStream
  42. state: State
  43. agent_task: Optional[asyncio.Task] = None
  44. delegate: 'AgentController | None' = None
  45. _pending_action: Action | None = None
  46. def __init__(
  47. self,
  48. agent: Agent,
  49. event_stream: EventStream,
  50. sid: str = 'default',
  51. max_iterations: int = MAX_ITERATIONS,
  52. max_chars: int = MAX_CHARS,
  53. inputs: dict | None = None,
  54. ):
  55. """Initializes a new instance of the AgentController class.
  56. Args:
  57. agent: The agent instance to control.
  58. event_stream: The event stream to publish events to.
  59. sid: The session ID of the agent.
  60. max_iterations: The maximum number of iterations the agent can run.
  61. max_chars: The maximum number of characters the agent can output.
  62. inputs: The initial inputs to the agent.
  63. """
  64. self.id = sid
  65. self.agent = agent
  66. self.state = State(inputs=inputs or {}, max_iterations=max_iterations)
  67. self.event_stream = event_stream
  68. self.event_stream.subscribe(
  69. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
  70. )
  71. self.max_iterations = max_iterations
  72. self.max_chars = max_chars
  73. self.agent_task = asyncio.create_task(self._start_step_loop())
  74. async def close(self):
  75. if self.agent_task is not None:
  76. self.agent_task.cancel()
  77. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  78. await self.set_agent_state_to(AgentState.STOPPED)
  79. def update_state_before_step(self):
  80. self.state.iteration += 1
  81. def update_state_after_step(self):
  82. self.state.updated_info = []
  83. async def report_error(self, message: str, exception: Exception | None = None):
  84. self.state.error = message
  85. if exception:
  86. self.state.error += f': {str(exception)}'
  87. await self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
  88. async def add_history(self, action: Action, observation: Observation):
  89. if isinstance(action, NullAction) and isinstance(observation, NullObservation):
  90. return
  91. self.state.history.append((action, observation))
  92. self.state.updated_info.append((action, observation))
  93. async def _start_step_loop(self):
  94. while True:
  95. try:
  96. await self._step()
  97. except asyncio.CancelledError:
  98. logger.info('AgentController task was cancelled')
  99. break
  100. except Exception as e:
  101. traceback.print_exc()
  102. logger.error(f'Error while running the agent: {e}')
  103. await self.report_error(
  104. 'There was an unexpected error while running the agent', exception=e
  105. )
  106. await self.set_agent_state_to(AgentState.ERROR)
  107. break
  108. await asyncio.sleep(0.1)
  109. async def on_event(self, event: Event):
  110. if isinstance(event, ChangeAgentStateAction):
  111. await self.set_agent_state_to(event.agent_state) # type: ignore
  112. elif isinstance(event, MessageAction):
  113. if event.source == EventSource.USER:
  114. await self.add_history(event, NullObservation(''))
  115. if self.get_agent_state() != AgentState.RUNNING:
  116. await self.set_agent_state_to(AgentState.RUNNING)
  117. elif event.source == EventSource.AGENT and event.wait_for_response:
  118. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  119. elif isinstance(event, AgentDelegateAction):
  120. await self.start_delegate(event)
  121. elif isinstance(event, AddTaskAction):
  122. self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
  123. elif isinstance(event, ModifyTaskAction):
  124. self.state.root_task.set_subtask_state(event.task_id, event.state)
  125. elif isinstance(event, AgentFinishAction):
  126. self.state.outputs = event.outputs # type: ignore[attr-defined]
  127. await self.set_agent_state_to(AgentState.FINISHED)
  128. elif isinstance(event, Observation):
  129. if self._pending_action and self._pending_action.id == event.cause:
  130. await self.add_history(self._pending_action, event)
  131. self._pending_action = None
  132. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  133. elif isinstance(event, CmdOutputObservation):
  134. await self.add_history(NullAction(), event)
  135. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  136. def reset_task(self):
  137. self.agent.reset()
  138. async def set_agent_state_to(self, new_state: AgentState):
  139. logger.info(
  140. f'Setting agent({type(self.agent).__name__}) state from {self.state.agent_state} to {new_state}'
  141. )
  142. if new_state == self.state.agent_state:
  143. return
  144. self.state.agent_state = new_state
  145. if new_state == AgentState.STOPPED or new_state == AgentState.ERROR:
  146. self.reset_task()
  147. await self.event_stream.add_event(
  148. AgentStateChangedObservation('', self.state.agent_state), EventSource.AGENT
  149. )
  150. def get_agent_state(self):
  151. """Returns the current state of the agent task."""
  152. return self.state.agent_state
  153. async def start_delegate(self, action: AgentDelegateAction):
  154. AgentCls: Type[Agent] = Agent.get_cls(action.agent)
  155. agent = AgentCls(llm=self.agent.llm)
  156. self.delegate = AgentController(
  157. sid=self.id + '-delegate',
  158. agent=agent,
  159. event_stream=self.event_stream,
  160. max_iterations=self.max_iterations,
  161. max_chars=self.max_chars,
  162. inputs=action.inputs,
  163. )
  164. async def _step(self):
  165. if self.get_agent_state() != AgentState.RUNNING:
  166. logger.debug('waiting for agent to run...')
  167. await asyncio.sleep(1)
  168. return
  169. if self._pending_action:
  170. logger.debug('waiting for pending action: ' + str(self._pending_action))
  171. await asyncio.sleep(1)
  172. return
  173. logger.info(f'STEP {self.state.iteration}', extra={'msg_type': 'STEP'})
  174. if self.state.iteration >= self.max_iterations:
  175. await self.report_error('Agent reached maximum number of iterations')
  176. await self.set_agent_state_to(AgentState.ERROR)
  177. return
  178. if self.delegate is not None:
  179. delegate_done = await self.delegate._step()
  180. if delegate_done:
  181. outputs = self.delegate.state.outputs if self.delegate.state else {}
  182. obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
  183. await self.event_stream.add_event(obs, EventSource.AGENT)
  184. self.delegate = None
  185. self.delegateAction = None
  186. return
  187. if self.state.num_of_chars > self.max_chars:
  188. raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
  189. self.update_state_before_step()
  190. action: Action = NullAction()
  191. try:
  192. action = self.agent.step(self.state)
  193. if action is None:
  194. raise AgentNoActionError('No action was returned')
  195. except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
  196. await self.report_error(str(e))
  197. return
  198. logger.info(action, extra={'msg_type': 'ACTION'})
  199. self.update_state_after_step()
  200. if action.runnable:
  201. self._pending_action = action
  202. else:
  203. await self.add_history(action, NullObservation(''))
  204. if not isinstance(action, NullAction):
  205. await self.event_stream.add_event(action, EventSource.AGENT)
  206. if self._is_stuck():
  207. await self.report_error('Agent got stuck in a loop')
  208. await self.set_agent_state_to(AgentState.ERROR)
  209. def get_state(self):
  210. return self.state
  211. def _is_stuck(self):
  212. # check if delegate stuck
  213. if self.delegate and self.delegate._is_stuck():
  214. return True
  215. if len(self.state.history) < 3:
  216. return False
  217. # if the last three (Action, Observation) tuples are too repetitive
  218. # the agent got stuck in a loop
  219. if all(
  220. [
  221. self.state.history[-i][0] == self.state.history[-3][0]
  222. for i in range(1, 3)
  223. ]
  224. ):
  225. # it repeats same action, give it a chance, but not if:
  226. if all(
  227. isinstance(self.state.history[-i][1], NullObservation)
  228. for i in range(1, 4)
  229. ):
  230. # same (Action, NullObservation): like 'think' the same thought over and over
  231. logger.warning('Action, NullObservation loop detected')
  232. return True
  233. elif all(
  234. isinstance(self.state.history[-i][1], ErrorObservation)
  235. for i in range(1, 4)
  236. ):
  237. # (NullAction, ErrorObservation): errors coming from an exception
  238. # (Action, ErrorObservation): the same action getting an error, even if not necessarily the same error
  239. logger.warning('Action, ErrorObservation loop detected')
  240. return True
  241. return False