agent_controller.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import asyncio
  2. from typing import Optional, Type
  3. from agenthub.codeact_agent.codeact_agent import CodeActAgent
  4. from opendevin.controller.agent import Agent
  5. from opendevin.controller.state.plan import Plan
  6. from opendevin.controller.state.state import State
  7. from opendevin.core.config import config
  8. from opendevin.core.exceptions import (
  9. AgentMalformedActionError,
  10. AgentNoActionError,
  11. LLMOutputError,
  12. MaxCharsExceedError,
  13. )
  14. from opendevin.core.logger import opendevin_logger as logger
  15. from opendevin.core.schema import AgentState
  16. from opendevin.events.action import (
  17. Action,
  18. AddTaskAction,
  19. AgentDelegateAction,
  20. AgentFinishAction,
  21. ChangeAgentStateAction,
  22. MessageAction,
  23. ModifyTaskAction,
  24. NullAction,
  25. )
  26. from opendevin.events.event import Event
  27. from opendevin.events.observation import (
  28. AgentDelegateObservation,
  29. AgentStateChangedObservation,
  30. ErrorObservation,
  31. NullObservation,
  32. Observation,
  33. )
  34. from opendevin.events.stream import EventSource, EventStream, EventStreamSubscriber
  35. from opendevin.runtime import DockerSSHBox
  36. from opendevin.runtime.runtime import Runtime
  37. from opendevin.runtime.server.runtime import ServerRuntime
  38. MAX_ITERATIONS = config.max_iterations
  39. MAX_CHARS = config.llm.max_chars
  40. class AgentController:
  41. id: str
  42. agent: Agent
  43. max_iterations: int
  44. runtime: Runtime
  45. event_stream: EventStream
  46. agent_task: Optional[asyncio.Task] = None
  47. delegate: 'AgentController | None' = None
  48. state: State | None = None
  49. _agent_state: AgentState = AgentState.LOADING
  50. _cur_step: int = 0
  51. def __init__(
  52. self,
  53. agent: Agent,
  54. event_stream: EventStream,
  55. sid: str = 'default',
  56. max_iterations: int = MAX_ITERATIONS,
  57. max_chars: int = MAX_CHARS,
  58. ):
  59. """Initializes a new instance of the AgentController class.
  60. Args:
  61. agent: The agent instance to control.
  62. sid: The session ID of the agent.
  63. max_iterations: The maximum number of iterations the agent can run.
  64. max_chars: The maximum number of characters the agent can output.
  65. """
  66. self.id = sid
  67. self.agent = agent
  68. self.event_stream = event_stream
  69. self.event_stream.subscribe(
  70. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
  71. )
  72. self.max_iterations = max_iterations
  73. self.runtime = ServerRuntime(self.id)
  74. self.max_chars = max_chars
  75. # Initialize agent-required plugins for sandbox (if any)
  76. self.runtime.init_sandbox_plugins(agent.sandbox_plugins)
  77. if isinstance(agent, CodeActAgent) and not isinstance(
  78. self.runtime.sandbox, DockerSSHBox
  79. ):
  80. logger.warning(
  81. 'CodeActAgent requires DockerSSHBox as sandbox! Using other sandbox that are not stateful (LocalBox, DockerExecBox) will not work properly.'
  82. )
  83. async def close(self):
  84. if self.agent_task is not None:
  85. self.agent_task.cancel()
  86. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  87. self.runtime.sandbox.close()
  88. self.runtime.browser.close()
  89. await self.set_agent_state_to(AgentState.STOPPED)
  90. def update_state_for_step(self, i):
  91. if self.state is None:
  92. return
  93. self.state.iteration = i
  94. self.state.background_commands_obs = self.runtime.get_background_obs()
  95. def update_state_after_step(self):
  96. if self.state is None:
  97. return
  98. self.state.updated_info = []
  99. async def add_error_to_history(self, message: str):
  100. await self.add_history(NullAction(), ErrorObservation(message))
  101. async def add_history(
  102. self, action: Action, observation: Observation, add_to_stream=True
  103. ):
  104. if self.state is None:
  105. raise ValueError('Added history while state was None')
  106. if not isinstance(action, Action):
  107. raise TypeError(
  108. f'action must be an instance of Action, got {type(action).__name__} instead'
  109. )
  110. if not isinstance(observation, Observation):
  111. raise TypeError(
  112. f'observation must be an instance of Observation, got {type(observation).__name__} instead'
  113. )
  114. self.state.history.append((action, observation))
  115. self.state.updated_info.append((action, observation))
  116. if add_to_stream:
  117. await self.event_stream.add_event(action, EventSource.AGENT)
  118. await self.event_stream.add_event(observation, EventSource.AGENT)
  119. async def _run(self):
  120. if self.state is None:
  121. return
  122. if self._agent_state != AgentState.RUNNING:
  123. raise ValueError('Task is not in running state')
  124. for i in range(self._cur_step, self.max_iterations):
  125. self._cur_step = i
  126. try:
  127. finished = await self.step(i)
  128. if finished:
  129. await self.set_agent_state_to(AgentState.FINISHED)
  130. break
  131. except Exception:
  132. logger.error('Error in loop', exc_info=True)
  133. await self.set_agent_state_to(AgentState.ERROR)
  134. await self.add_error_to_history(
  135. 'Oops! Something went wrong while completing your task. You can check the logs for more info.'
  136. )
  137. break
  138. if self._is_stuck():
  139. logger.info('Loop detected, stopping task')
  140. await self.set_agent_state_to(AgentState.ERROR)
  141. await self.add_error_to_history(
  142. 'I got stuck into a loop, the task has stopped.'
  143. )
  144. break
  145. await asyncio.sleep(
  146. 0.001
  147. ) # Give back control for a tick, so other async stuff can run
  148. final_state = self.get_agent_state()
  149. if final_state == AgentState.RUNNING:
  150. await self.set_agent_state_to(AgentState.PAUSED)
  151. async def setup_task(self, task: str, inputs: dict = {}):
  152. """Sets up the agent controller with a task."""
  153. await self.set_agent_state_to(AgentState.INIT)
  154. self.state = State(Plan(task))
  155. self.state.inputs = inputs
  156. async def on_event(self, event: Event):
  157. if isinstance(event, ChangeAgentStateAction):
  158. await self.set_agent_state_to(event.agent_state) # type: ignore
  159. elif isinstance(event, MessageAction) and event.source == EventSource.USER:
  160. await self.add_history(event, NullObservation(''), add_to_stream=False)
  161. if self.get_agent_state() == AgentState.AWAITING_USER_INPUT:
  162. await self.set_agent_state_to(AgentState.RUNNING)
  163. async def reset_task(self):
  164. if self.agent_task is not None:
  165. self.agent_task.cancel()
  166. self.state = None
  167. self._cur_step = 0
  168. self.agent.reset()
  169. async def set_agent_state_to(self, new_state: AgentState):
  170. logger.info(f'Setting agent({type(self.agent).__name__}) state from {self._agent_state} to {new_state}')
  171. if new_state == self._agent_state:
  172. return
  173. self._agent_state = new_state
  174. if new_state == AgentState.RUNNING:
  175. self.agent_task = asyncio.create_task(self._run())
  176. elif (
  177. new_state == AgentState.PAUSED
  178. or new_state == AgentState.AWAITING_USER_INPUT
  179. ):
  180. self._cur_step += 1
  181. if self.agent_task is not None:
  182. self.agent_task.cancel()
  183. elif new_state == AgentState.STOPPED or new_state == AgentState.ERROR or new_state == AgentState.FINISHED:
  184. await self.reset_task()
  185. await self.event_stream.add_event(
  186. AgentStateChangedObservation('', self._agent_state), EventSource.AGENT
  187. )
  188. def get_agent_state(self):
  189. """Returns the current state of the agent task."""
  190. return self._agent_state
  191. async def start_delegate(self, action: AgentDelegateAction):
  192. AgentCls: Type[Agent] = Agent.get_cls(action.agent)
  193. agent = AgentCls(llm=self.agent.llm)
  194. self.delegate = AgentController(
  195. sid=self.id + '-delegate',
  196. agent=agent,
  197. event_stream=self.event_stream,
  198. max_iterations=self.max_iterations,
  199. max_chars=self.max_chars,
  200. )
  201. task = action.inputs.get('task') or ''
  202. await self.delegate.setup_task(task, action.inputs)
  203. async def step(self, i: int) -> bool:
  204. if self.state is None:
  205. raise ValueError('No task to run')
  206. if self.delegate is not None:
  207. delegate_done = await self.delegate.step(i)
  208. if delegate_done:
  209. outputs = self.delegate.state.outputs if self.delegate.state else {}
  210. obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
  211. await self.add_history(NullAction(), obs)
  212. self.delegate = None
  213. self.delegateAction = None
  214. return False
  215. logger.info(f'STEP {i}', extra={'msg_type': 'STEP'})
  216. if i == 0:
  217. logger.info(self.state.plan.main_goal, extra={'msg_type': 'PLAN'})
  218. if self.state.num_of_chars > self.max_chars:
  219. raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
  220. log_obs = self.runtime.get_background_obs()
  221. for obs in log_obs:
  222. await self.add_history(NullAction(), obs)
  223. logger.info(obs, extra={'msg_type': 'BACKGROUND LOG'})
  224. self.update_state_for_step(i)
  225. action: Action = NullAction()
  226. observation: Observation = NullObservation('')
  227. try:
  228. action = self.agent.step(self.state)
  229. if action is None:
  230. raise AgentNoActionError('No action was returned')
  231. except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
  232. observation = ErrorObservation(str(e))
  233. logger.info(action, extra={'msg_type': 'ACTION'})
  234. self.update_state_after_step()
  235. if isinstance(action, AgentFinishAction):
  236. self.state.outputs = action.outputs # type: ignore[attr-defined]
  237. logger.info(action, extra={'msg_type': 'INFO'})
  238. return True
  239. elif isinstance(action, MessageAction) and action.wait_for_response:
  240. # FIXME: remove this once history is managed outside the agent controller
  241. await self.add_history(action, NullObservation(''))
  242. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  243. return False
  244. elif isinstance(action, AgentDelegateAction):
  245. await self.start_delegate(action)
  246. elif isinstance(action, AddTaskAction):
  247. self.state.plan.add_subtask(action.parent, action.goal, action.subtasks)
  248. elif isinstance(action, ModifyTaskAction):
  249. self.state.plan.set_subtask_state(action.id, action.state)
  250. elif not isinstance(observation, ErrorObservation):
  251. observation = await self.runtime.run_action(action)
  252. if not isinstance(observation, NullObservation):
  253. logger.info(observation, extra={'msg_type': 'OBSERVATION'})
  254. await self.add_history(action, observation)
  255. return False
  256. def get_state(self):
  257. return self.state
  258. def _is_stuck(self):
  259. # check if delegate stuck
  260. if self.delegate and self.delegate._is_stuck():
  261. return True
  262. if (
  263. self.state is None
  264. or self.state.history is None
  265. or len(self.state.history) < 3
  266. ):
  267. return False
  268. # if the last three (Action, Observation) tuples are too repetitive
  269. # the agent got stuck in a loop
  270. if all(
  271. [
  272. self.state.history[-i][0] == self.state.history[-3][0]
  273. for i in range(1, 3)
  274. ]
  275. ):
  276. # it repeats same action, give it a chance, but not if:
  277. if all(
  278. isinstance(self.state.history[-i][1], NullObservation)
  279. for i in range(1, 4)
  280. ):
  281. # same (Action, NullObservation): like 'think' the same thought over and over
  282. logger.debug('Action, NullObservation loop detected')
  283. return True
  284. elif all(
  285. isinstance(self.state.history[-i][1], ErrorObservation)
  286. for i in range(1, 4)
  287. ):
  288. # (NullAction, ErrorObservation): errors coming from an exception
  289. # (Action, ErrorObservation): the same action getting an error, even if not necessarily the same error
  290. logger.debug('Action, ErrorObservation loop detected')
  291. return True
  292. return False