agent_controller.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import asyncio
  2. from typing import Optional, Type
  3. from agenthub.codeact_agent.codeact_agent import CodeActAgent
  4. from opendevin.controller.action_manager import ActionManager
  5. from opendevin.controller.agent import Agent
  6. from opendevin.controller.state.plan import Plan
  7. from opendevin.controller.state.state import State
  8. from opendevin.core.config import config
  9. from opendevin.core.exceptions import (
  10. AgentMalformedActionError,
  11. AgentNoActionError,
  12. LLMOutputError,
  13. MaxCharsExceedError,
  14. )
  15. from opendevin.core.logger import opendevin_logger as logger
  16. from opendevin.core.schema import AgentState
  17. from opendevin.events.action import (
  18. Action,
  19. AgentDelegateAction,
  20. AgentFinishAction,
  21. AgentRejectAction,
  22. ChangeAgentStateAction,
  23. MessageAction,
  24. NullAction,
  25. )
  26. from opendevin.events.event import Event
  27. from opendevin.events.observation import (
  28. AgentDelegateObservation,
  29. AgentStateChangedObservation,
  30. ErrorObservation,
  31. NullObservation,
  32. Observation,
  33. )
  34. from opendevin.events.stream import EventSource, EventStream, EventStreamSubscriber
  35. from opendevin.runtime import DockerSSHBox
  36. from opendevin.runtime.browser.browser_env import BrowserEnv
  37. MAX_ITERATIONS = config.max_iterations
  38. MAX_CHARS = config.llm.max_chars
  39. class AgentController:
  40. id: str
  41. agent: Agent
  42. max_iterations: int
  43. action_manager: ActionManager
  44. browser: BrowserEnv
  45. event_stream: EventStream
  46. agent_task: Optional[asyncio.Task] = None
  47. delegate: 'AgentController | None' = None
  48. state: State | None = None
  49. _agent_state: AgentState = AgentState.LOADING
  50. _cur_step: int = 0
  51. def __init__(
  52. self,
  53. agent: Agent,
  54. event_stream: EventStream,
  55. sid: str = 'default',
  56. max_iterations: int = MAX_ITERATIONS,
  57. max_chars: int = MAX_CHARS,
  58. ):
  59. """Initializes a new instance of the AgentController class.
  60. Args:
  61. agent: The agent instance to control.
  62. sid: The session ID of the agent.
  63. max_iterations: The maximum number of iterations the agent can run.
  64. max_chars: The maximum number of characters the agent can output.
  65. """
  66. self.id = sid
  67. self.agent = agent
  68. self.event_stream = event_stream
  69. self.event_stream.subscribe(
  70. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
  71. )
  72. self.max_iterations = max_iterations
  73. self.action_manager = ActionManager(self.id)
  74. self.max_chars = max_chars
  75. # Initialize agent-required plugins for sandbox (if any)
  76. self.action_manager.init_sandbox_plugins(agent.sandbox_plugins)
  77. # Initialize browser environment
  78. self.browser = BrowserEnv()
  79. if isinstance(agent, CodeActAgent) and not isinstance(
  80. self.action_manager.sandbox, DockerSSHBox
  81. ):
  82. logger.warning(
  83. 'CodeActAgent requires DockerSSHBox as sandbox! Using other sandbox that are not stateful (LocalBox, DockerExecBox) will not work properly.'
  84. )
  85. async def close(self):
  86. if self.agent_task is not None:
  87. self.agent_task.cancel()
  88. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  89. self.action_manager.sandbox.close()
  90. self.browser.close()
  91. await self.set_agent_state_to(AgentState.STOPPED)
  92. def update_state_for_step(self, i):
  93. if self.state is None:
  94. return
  95. self.state.iteration = i
  96. self.state.background_commands_obs = self.action_manager.get_background_obs()
  97. def update_state_after_step(self):
  98. if self.state is None:
  99. return
  100. self.state.updated_info = []
  101. async def add_error_to_history(self, message: str):
  102. await self.add_history(NullAction(), ErrorObservation(message))
  103. async def add_history(
  104. self, action: Action, observation: Observation, add_to_stream=True
  105. ):
  106. if self.state is None:
  107. raise ValueError('Added history while state was None')
  108. if not isinstance(action, Action):
  109. raise TypeError(
  110. f'action must be an instance of Action, got {type(action).__name__} instead'
  111. )
  112. if not isinstance(observation, Observation):
  113. raise TypeError(
  114. f'observation must be an instance of Observation, got {type(observation).__name__} instead'
  115. )
  116. self.state.history.append((action, observation))
  117. self.state.updated_info.append((action, observation))
  118. if add_to_stream:
  119. await self.event_stream.add_event(action, EventSource.AGENT)
  120. await self.event_stream.add_event(observation, EventSource.AGENT)
  121. async def _run(self):
  122. if self.state is None:
  123. return
  124. if self._agent_state != AgentState.RUNNING:
  125. raise ValueError('Task is not in running state')
  126. for i in range(self._cur_step, self.max_iterations):
  127. self._cur_step = i
  128. try:
  129. finished = await self.step(i)
  130. if finished:
  131. await self.set_agent_state_to(AgentState.FINISHED)
  132. break
  133. except Exception:
  134. logger.error('Error in loop', exc_info=True)
  135. await self.set_agent_state_to(AgentState.ERROR)
  136. await self.add_error_to_history(
  137. 'Oops! Something went wrong while completing your task. You can check the logs for more info.'
  138. )
  139. break
  140. if self._is_stuck():
  141. logger.info('Loop detected, stopping task')
  142. await self.set_agent_state_to(AgentState.ERROR)
  143. await self.add_error_to_history(
  144. 'I got stuck into a loop, the task has stopped.'
  145. )
  146. break
  147. await asyncio.sleep(
  148. 0.001
  149. ) # Give back control for a tick, so other async stuff can run
  150. final_state = self.get_agent_state()
  151. if final_state == AgentState.RUNNING:
  152. await self.set_agent_state_to(AgentState.PAUSED)
  153. async def setup_task(self, task: str, inputs: dict = {}):
  154. """Sets up the agent controller with a task."""
  155. await self.set_agent_state_to(AgentState.INIT)
  156. self.state = State(Plan(task))
  157. self.state.inputs = inputs
  158. async def on_event(self, event: Event):
  159. if isinstance(event, ChangeAgentStateAction):
  160. await self.set_agent_state_to(event.agent_state) # type: ignore
  161. elif isinstance(event, MessageAction) and event.source == EventSource.USER:
  162. await self.add_history(event, NullObservation(''), add_to_stream=False)
  163. if self.get_agent_state() == AgentState.AWAITING_USER_INPUT:
  164. await self.set_agent_state_to(AgentState.RUNNING)
  165. async def reset_task(self):
  166. if self.agent_task is not None:
  167. self.agent_task.cancel()
  168. self.state = None
  169. self._cur_step = 0
  170. self.agent.reset()
  171. async def set_agent_state_to(self, new_state: AgentState):
  172. logger.info(f'Setting agent state from {self._agent_state} to {new_state}')
  173. if new_state == self._agent_state:
  174. return
  175. self._agent_state = new_state
  176. if new_state == AgentState.RUNNING:
  177. self.agent_task = asyncio.create_task(self._run())
  178. elif (
  179. new_state == AgentState.PAUSED
  180. or new_state == AgentState.AWAITING_USER_INPUT
  181. ):
  182. self._cur_step += 1
  183. if self.agent_task is not None:
  184. self.agent_task.cancel()
  185. elif new_state == AgentState.STOPPED:
  186. await self.reset_task()
  187. elif new_state == AgentState.FINISHED:
  188. await self.reset_task()
  189. await self.event_stream.add_event(
  190. AgentStateChangedObservation('', self._agent_state), EventSource.AGENT
  191. )
  192. def get_agent_state(self):
  193. """Returns the current state of the agent task."""
  194. return self._agent_state
  195. async def start_delegate(self, action: AgentDelegateAction):
  196. AgentCls: Type[Agent] = Agent.get_cls(action.agent)
  197. agent = AgentCls(llm=self.agent.llm)
  198. self.delegate = AgentController(
  199. sid=self.id + '-delegate',
  200. agent=agent,
  201. event_stream=self.event_stream,
  202. max_iterations=self.max_iterations,
  203. max_chars=self.max_chars,
  204. )
  205. task = action.inputs.get('task') or ''
  206. await self.delegate.setup_task(task, action.inputs)
  207. async def step(self, i: int) -> bool:
  208. if self.state is None:
  209. raise ValueError('No task to run')
  210. if self.delegate is not None:
  211. delegate_done = await self.delegate.step(i)
  212. if delegate_done:
  213. outputs = self.delegate.state.outputs if self.delegate.state else {}
  214. obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
  215. await self.add_history(NullAction(), obs)
  216. self.delegate = None
  217. self.delegateAction = None
  218. return False
  219. logger.info(f'STEP {i}', extra={'msg_type': 'STEP'})
  220. if i == 0:
  221. logger.info(self.state.plan.main_goal, extra={'msg_type': 'PLAN'})
  222. if self.state.num_of_chars > self.max_chars:
  223. raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
  224. log_obs = self.action_manager.get_background_obs()
  225. for obs in log_obs:
  226. await self.add_history(NullAction(), obs)
  227. logger.info(obs, extra={'msg_type': 'BACKGROUND LOG'})
  228. self.update_state_for_step(i)
  229. action: Action = NullAction()
  230. observation: Observation = NullObservation('')
  231. try:
  232. action = self.agent.step(self.state)
  233. if action is None:
  234. raise AgentNoActionError('No action was returned')
  235. except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
  236. observation = ErrorObservation(str(e))
  237. logger.info(action, extra={'msg_type': 'ACTION'})
  238. self.update_state_after_step()
  239. if isinstance(action, MessageAction) and action.wait_for_response:
  240. # FIXME: remove this once history is managed outside the agent controller
  241. await self.add_history(action, NullObservation(''))
  242. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  243. return False
  244. finished = isinstance(action, AgentFinishAction) or isinstance(
  245. action, AgentRejectAction
  246. )
  247. if finished:
  248. self.state.outputs = action.outputs # type: ignore[attr-defined]
  249. logger.info(action, extra={'msg_type': 'INFO'})
  250. return True
  251. if isinstance(observation, NullObservation):
  252. observation = await self.action_manager.run_action(action, self)
  253. if not isinstance(observation, NullObservation):
  254. logger.info(observation, extra={'msg_type': 'OBSERVATION'})
  255. await self.add_history(action, observation)
  256. return False
  257. def get_state(self):
  258. return self.state
  259. def _is_stuck(self):
  260. if (
  261. self.state is None
  262. or self.state.history is None
  263. or len(self.state.history) < 3
  264. ):
  265. return False
  266. # if the last three (Action, Observation) tuples are too repetitive
  267. # the agent got stuck in a loop
  268. if all(
  269. [
  270. self.state.history[-i][0] == self.state.history[-3][0]
  271. for i in range(1, 3)
  272. ]
  273. ):
  274. # it repeats same action, give it a chance, but not if:
  275. if all(
  276. isinstance(self.state.history[-i][1], NullObservation)
  277. for i in range(1, 4)
  278. ):
  279. # same (Action, NullObservation): like 'think' the same thought over and over
  280. logger.debug('Action, NullObservation loop detected')
  281. return True
  282. elif all(
  283. isinstance(self.state.history[-i][1], ErrorObservation)
  284. for i in range(1, 4)
  285. ):
  286. # (NullAction, ErrorObservation): errors coming from an exception
  287. # (Action, ErrorObservation): the same action getting an error, even if not necessarily the same error
  288. logger.debug('Action, ErrorObservation loop detected')
  289. return True
  290. return False