agent_controller.py 10.0 KB


  1. import asyncio
  2. from typing import Optional, Type
  3. from opendevin.controller.agent import Agent
  4. from opendevin.controller.state.state import State
  5. from opendevin.core.config import config
  6. from opendevin.core.exceptions import (
  7. AgentMalformedActionError,
  8. AgentNoActionError,
  9. LLMOutputError,
  10. MaxCharsExceedError,
  11. )
  12. from opendevin.core.logger import opendevin_logger as logger
  13. from opendevin.core.schema import AgentState
  14. from opendevin.events.action import (
  15. Action,
  16. AddTaskAction,
  17. AgentDelegateAction,
  18. AgentFinishAction,
  19. ChangeAgentStateAction,
  20. MessageAction,
  21. ModifyTaskAction,
  22. NullAction,
  23. )
  24. from opendevin.events.event import Event
  25. from opendevin.events.observation import (
  26. AgentDelegateObservation,
  27. AgentStateChangedObservation,
  28. CmdOutputObservation,
  29. ErrorObservation,
  30. NullObservation,
  31. Observation,
  32. )
  33. from opendevin.events.stream import EventSource, EventStream, EventStreamSubscriber
  34. MAX_ITERATIONS = config.max_iterations
  35. MAX_CHARS = config.llm.max_chars
  36. class AgentController:
  37. id: str
  38. agent: Agent
  39. max_iterations: int
  40. event_stream: EventStream
  41. state: State
  42. agent_task: Optional[asyncio.Task] = None
  43. delegate: 'AgentController | None' = None
  44. _pending_action: Action | None = None
  45. def __init__(
  46. self,
  47. agent: Agent,
  48. event_stream: EventStream,
  49. sid: str = 'default',
  50. max_iterations: int = MAX_ITERATIONS,
  51. max_chars: int = MAX_CHARS,
  52. inputs: dict | None = None,
  53. ):
  54. """Initializes a new instance of the AgentController class.
  55. Args:
  56. agent: The agent instance to control.
  57. event_stream: The event stream to publish events to.
  58. sid: The session ID of the agent.
  59. max_iterations: The maximum number of iterations the agent can run.
  60. max_chars: The maximum number of characters the agent can output.
  61. inputs: The initial inputs to the agent.
  62. """
  63. self.id = sid
  64. self.agent = agent
  65. self.state = State(inputs=inputs or {}, max_iterations=max_iterations)
  66. self.event_stream = event_stream
  67. self.event_stream.subscribe(
  68. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
  69. )
  70. self.max_iterations = max_iterations
  71. self.max_chars = max_chars
  72. self.agent_task = asyncio.create_task(self._start_step_loop())
  73. async def close(self):
  74. if self.agent_task is not None:
  75. self.agent_task.cancel()
  76. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  77. await self.set_agent_state_to(AgentState.STOPPED)
  78. def update_state_before_step(self):
  79. self.state.iteration += 1
  80. def update_state_after_step(self):
  81. self.state.updated_info = []
  82. async def report_error(self, message: str):
  83. await self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
  84. async def add_history(self, action: Action, observation: Observation):
  85. if isinstance(action, NullAction) and isinstance(observation, NullObservation):
  86. return
  87. self.state.history.append((action, observation))
  88. self.state.updated_info.append((action, observation))
  89. async def _start_step_loop(self):
  90. while True:
  91. try:
  92. await self._step()
  93. except asyncio.CancelledError:
  94. logger.info('AgentController task was cancelled')
  95. break
  96. except Exception as e:
  97. logger.error(f'Error while running the agent: {e}')
  98. await self.report_error(
  99. 'There was an unexpected error while running the agent'
  100. )
  101. await self.set_agent_state_to(AgentState.ERROR)
  102. break
  103. await asyncio.sleep(0.1)
  104. async def on_event(self, event: Event):
  105. if isinstance(event, ChangeAgentStateAction):
  106. await self.set_agent_state_to(event.agent_state) # type: ignore
  107. elif isinstance(event, MessageAction):
  108. if event.source == EventSource.USER:
  109. await self.add_history(event, NullObservation(''))
  110. if self.get_agent_state() != AgentState.RUNNING:
  111. await self.set_agent_state_to(AgentState.RUNNING)
  112. elif event.source == EventSource.AGENT and event.wait_for_response:
  113. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  114. elif isinstance(event, AgentDelegateAction):
  115. await self.start_delegate(event)
  116. elif isinstance(event, AddTaskAction):
  117. self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
  118. elif isinstance(event, ModifyTaskAction):
  119. self.state.root_task.set_subtask_state(event.task_id, event.state)
  120. elif isinstance(event, AgentFinishAction):
  121. self.state.outputs = event.outputs # type: ignore[attr-defined]
  122. await self.set_agent_state_to(AgentState.FINISHED)
  123. elif isinstance(event, Observation):
  124. if self._pending_action and self._pending_action.id == event.cause:
  125. await self.add_history(self._pending_action, event)
  126. self._pending_action = None
  127. elif isinstance(event, CmdOutputObservation):
  128. await self.add_history(NullAction(), event)
  129. def reset_task(self):
  130. self.agent.reset()
  131. async def set_agent_state_to(self, new_state: AgentState):
  132. logger.info(
  133. f'Setting agent({type(self.agent).__name__}) state from {self.state.agent_state} to {new_state}'
  134. )
  135. if new_state == self.state.agent_state:
  136. return
  137. self.state.agent_state = new_state
  138. if new_state == AgentState.STOPPED or new_state == AgentState.ERROR:
  139. self.reset_task()
  140. await self.event_stream.add_event(
  141. AgentStateChangedObservation('', self.state.agent_state), EventSource.AGENT
  142. )
  143. def get_agent_state(self):
  144. """Returns the current state of the agent task."""
  145. return self.state.agent_state
  146. async def start_delegate(self, action: AgentDelegateAction):
  147. AgentCls: Type[Agent] = Agent.get_cls(action.agent)
  148. agent = AgentCls(llm=self.agent.llm)
  149. self.delegate = AgentController(
  150. sid=self.id + '-delegate',
  151. agent=agent,
  152. event_stream=self.event_stream,
  153. max_iterations=self.max_iterations,
  154. max_chars=self.max_chars,
  155. inputs=action.inputs,
  156. )
  157. async def _step(self):
  158. if self.get_agent_state() != AgentState.RUNNING:
  159. logger.debug('waiting for agent to run...')
  160. await asyncio.sleep(1)
  161. return
  162. if self._pending_action:
  163. logger.debug('waiting for pending action: ' + str(self._pending_action))
  164. await asyncio.sleep(1)
  165. return
  166. logger.info(f'STEP {self.state.iteration}', extra={'msg_type': 'STEP'})
  167. if self.state.iteration >= self.max_iterations:
  168. await self.report_error('Agent reached maximum number of iterations')
  169. await self.set_agent_state_to(AgentState.ERROR)
  170. return
  171. if self.delegate is not None:
  172. delegate_done = await self.delegate._step()
  173. if delegate_done:
  174. outputs = self.delegate.state.outputs if self.delegate.state else {}
  175. obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
  176. await self.event_stream.add_event(obs, EventSource.AGENT)
  177. self.delegate = None
  178. self.delegateAction = None
  179. return
  180. if self.state.num_of_chars > self.max_chars:
  181. raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
  182. self.update_state_before_step()
  183. action: Action = NullAction()
  184. try:
  185. action = self.agent.step(self.state)
  186. if action is None:
  187. raise AgentNoActionError('No action was returned')
  188. except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
  189. await self.report_error(str(e))
  190. return
  191. logger.info(action, extra={'msg_type': 'ACTION'})
  192. self.update_state_after_step()
  193. if action.runnable:
  194. self._pending_action = action
  195. else:
  196. await self.add_history(action, NullObservation(''))
  197. if not isinstance(action, NullAction):
  198. await self.event_stream.add_event(action, EventSource.AGENT)
  199. if self._is_stuck():
  200. await self.report_error('Agent got stuck in a loop')
  201. await self.set_agent_state_to(AgentState.ERROR)
  202. def get_state(self):
  203. return self.state
  204. def _is_stuck(self):
  205. # check if delegate stuck
  206. if self.delegate and self.delegate._is_stuck():
  207. return True
  208. if len(self.state.history) < 3:
  209. return False
  210. # if the last three (Action, Observation) tuples are too repetitive
  211. # the agent got stuck in a loop
  212. if all(
  213. [
  214. self.state.history[-i][0] == self.state.history[-3][0]
  215. for i in range(1, 3)
  216. ]
  217. ):
  218. # it repeats same action, give it a chance, but not if:
  219. if all(
  220. isinstance(self.state.history[-i][1], NullObservation)
  221. for i in range(1, 4)
  222. ):
  223. # same (Action, NullObservation): like 'think' the same thought over and over
  224. logger.warning('Action, NullObservation loop detected')
  225. return True
  226. elif all(
  227. isinstance(self.state.history[-i][1], ErrorObservation)
  228. for i in range(1, 4)
  229. ):
  230. # (NullAction, ErrorObservation): errors coming from an exception
  231. # (Action, ErrorObservation): the same action getting an error, even if not necessarily the same error
  232. logger.warning('Action, ErrorObservation loop detected')
  233. return True
  234. return False