agent_controller.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. import asyncio
  2. import traceback
  3. from typing import Type
  4. from opendevin.controller.agent import Agent
  5. from opendevin.controller.state.state import State, TrafficControlState
  6. from opendevin.controller.stuck import StuckDetector
  7. from opendevin.core.config import AgentConfig, LLMConfig
  8. from opendevin.core.exceptions import (
  9. LLMMalformedActionError,
  10. LLMNoActionError,
  11. LLMResponseError,
  12. )
  13. from opendevin.core.logger import opendevin_logger as logger
  14. from opendevin.core.schema import AgentState
  15. from opendevin.events import EventSource, EventStream, EventStreamSubscriber
  16. from opendevin.events.action import (
  17. Action,
  18. ActionConfirmationStatus,
  19. AddTaskAction,
  20. AgentDelegateAction,
  21. AgentFinishAction,
  22. AgentRejectAction,
  23. ChangeAgentStateAction,
  24. CmdRunAction,
  25. IPythonRunCellAction,
  26. MessageAction,
  27. ModifyTaskAction,
  28. NullAction,
  29. )
  30. from opendevin.events.event import Event
  31. from opendevin.events.observation import (
  32. AgentDelegateObservation,
  33. AgentStateChangedObservation,
  34. CmdOutputObservation,
  35. ErrorObservation,
  36. Observation,
  37. )
  38. from opendevin.llm.llm import LLM
  39. # note: RESUME is only available on web GUI
  40. TRAFFIC_CONTROL_REMINDER = (
  41. "Please click on resume button if you'd like to continue, or start a new task."
  42. )
  43. class AgentController:
  44. id: str
  45. agent: Agent
  46. max_iterations: int
  47. event_stream: EventStream
  48. state: State
  49. confirmation_mode: bool
  50. agent_to_llm_config: dict[str, LLMConfig]
  51. agent_configs: dict[str, AgentConfig]
  52. agent_task: asyncio.Task | None = None
  53. parent: 'AgentController | None' = None
  54. delegate: 'AgentController | None' = None
  55. _pending_action: Action | None = None
  56. def __init__(
  57. self,
  58. agent: Agent,
  59. event_stream: EventStream,
  60. max_iterations: int,
  61. max_budget_per_task: float | None = None,
  62. agent_to_llm_config: dict[str, LLMConfig] | None = None,
  63. agent_configs: dict[str, AgentConfig] | None = None,
  64. sid: str = 'default',
  65. confirmation_mode: bool = False,
  66. initial_state: State | None = None,
  67. is_delegate: bool = False,
  68. headless_mode: bool = True,
  69. ):
  70. """Initializes a new instance of the AgentController class.
  71. Args:
  72. agent: The agent instance to control.
  73. event_stream: The event stream to publish events to.
  74. max_iterations: The maximum number of iterations the agent can run.
  75. max_budget_per_task: The maximum budget (in USD) allowed per task, beyond which the agent will stop.
  76. agent_to_llm_config: A dictionary mapping agent names to LLM configurations in the case that
  77. we delegate to a different agent.
  78. agent_configs: A dictionary mapping agent names to agent configurations in the case that
  79. we delegate to a different agent.
  80. sid: The session ID of the agent.
  81. initial_state: The initial state of the controller.
  82. is_delegate: Whether this controller is a delegate.
  83. headless_mode: Whether the agent is run in headless mode.
  84. """
  85. self._step_lock = asyncio.Lock()
  86. self.id = sid
  87. self.agent = agent
  88. self.headless_mode = headless_mode
  89. # subscribe to the event stream
  90. self.event_stream = event_stream
  91. self.event_stream.subscribe(
  92. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, append=is_delegate
  93. )
  94. # state from the previous session, state from a parent agent, or a fresh state
  95. self.set_initial_state(
  96. state=initial_state,
  97. max_iterations=max_iterations,
  98. confirmation_mode=confirmation_mode,
  99. )
  100. self.max_budget_per_task = max_budget_per_task
  101. self.agent_to_llm_config = agent_to_llm_config if agent_to_llm_config else {}
  102. self.agent_configs = agent_configs if agent_configs else {}
  103. # stuck helper
  104. self._stuck_detector = StuckDetector(self.state)
  105. if not is_delegate:
  106. self.agent_task = asyncio.create_task(self._start_step_loop())
  107. async def close(self):
  108. if self.agent_task is not None:
  109. self.agent_task.cancel()
  110. await self.set_agent_state_to(AgentState.STOPPED)
  111. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  112. def update_state_before_step(self):
  113. self.state.iteration += 1
  114. self.state.local_iteration += 1
  115. async def update_state_after_step(self):
  116. # update metrics especially for cost
  117. self.state.local_metrics = self.agent.llm.metrics
  118. async def report_error(self, message: str, exception: Exception | None = None):
  119. """This error will be reported to the user and sent to the LLM next step, in the hope it can self-correct.
  120. This method should be called for a particular type of errors, which have:
  121. - a user-friendly message, which will be shown in the chat box. This should not be a raw exception message.
  122. - an ErrorObservation that can be sent to the LLM by the agent, with the exception message, so it can self-correct next time.
  123. """
  124. self.state.last_error = message
  125. if exception:
  126. self.state.last_error += f': {exception}'
  127. self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
  128. async def _start_step_loop(self):
  129. logger.info(f'[Agent Controller {self.id}] Starting step loop...')
  130. while True:
  131. try:
  132. await self._step()
  133. except asyncio.CancelledError:
  134. logger.info('AgentController task was cancelled')
  135. break
  136. except Exception as e:
  137. traceback.print_exc()
  138. logger.error(f'Error while running the agent: {e}')
  139. logger.error(traceback.format_exc())
  140. await self.report_error(
  141. 'There was an unexpected error while running the agent', exception=e
  142. )
  143. await self.set_agent_state_to(AgentState.ERROR)
  144. break
  145. await asyncio.sleep(0.1)
  146. async def on_event(self, event: Event):
  147. if isinstance(event, ChangeAgentStateAction):
  148. await self.set_agent_state_to(event.agent_state) # type: ignore
  149. elif isinstance(event, MessageAction):
  150. if event.source == EventSource.USER:
  151. logger.info(
  152. event,
  153. extra={'msg_type': 'ACTION', 'event_source': EventSource.USER},
  154. )
  155. if self.get_agent_state() != AgentState.RUNNING:
  156. await self.set_agent_state_to(AgentState.RUNNING)
  157. elif event.source == EventSource.AGENT and event.wait_for_response:
  158. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  159. elif isinstance(event, AgentDelegateAction):
  160. await self.start_delegate(event)
  161. elif isinstance(event, AddTaskAction):
  162. self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
  163. elif isinstance(event, ModifyTaskAction):
  164. self.state.root_task.set_subtask_state(event.task_id, event.state)
  165. elif isinstance(event, AgentFinishAction):
  166. self.state.outputs = event.outputs
  167. self.state.metrics.merge(self.state.local_metrics)
  168. await self.set_agent_state_to(AgentState.FINISHED)
  169. elif isinstance(event, AgentRejectAction):
  170. self.state.outputs = event.outputs
  171. self.state.metrics.merge(self.state.local_metrics)
  172. await self.set_agent_state_to(AgentState.REJECTED)
  173. elif isinstance(event, Observation):
  174. if (
  175. self._pending_action
  176. and hasattr(self._pending_action, 'is_confirmed')
  177. and self._pending_action.is_confirmed
  178. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  179. ):
  180. return
  181. if self._pending_action and self._pending_action.id == event.cause:
  182. self._pending_action = None
  183. if self.state.agent_state == AgentState.USER_CONFIRMED:
  184. await self.set_agent_state_to(AgentState.RUNNING)
  185. if self.state.agent_state == AgentState.USER_REJECTED:
  186. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  187. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  188. elif isinstance(event, CmdOutputObservation):
  189. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  190. elif isinstance(event, AgentDelegateObservation):
  191. self.state.history.on_event(event)
  192. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  193. elif isinstance(event, ErrorObservation):
  194. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  195. def reset_task(self):
  196. self.almost_stuck = 0
  197. self.agent.reset()
  198. async def set_agent_state_to(self, new_state: AgentState):
  199. logger.debug(
  200. f'[Agent Controller {self.id}] Setting agent({self.agent.name}) state from {self.state.agent_state} to {new_state}'
  201. )
  202. if new_state == self.state.agent_state:
  203. return
  204. if (
  205. self.state.agent_state == AgentState.PAUSED
  206. and new_state == AgentState.RUNNING
  207. and self.state.traffic_control_state == TrafficControlState.THROTTLING
  208. ):
  209. # user intends to interrupt traffic control and let the task resume temporarily
  210. self.state.traffic_control_state = TrafficControlState.PAUSED
  211. self.state.agent_state = new_state
  212. if new_state == AgentState.STOPPED or new_state == AgentState.ERROR:
  213. self.reset_task()
  214. if self._pending_action is not None and (
  215. new_state == AgentState.USER_CONFIRMED
  216. or new_state == AgentState.USER_REJECTED
  217. ):
  218. if hasattr(self._pending_action, 'thought'):
  219. self._pending_action.thought = '' # type: ignore[union-attr]
  220. if new_state == AgentState.USER_CONFIRMED:
  221. self._pending_action.is_confirmed = ActionConfirmationStatus.CONFIRMED # type: ignore[attr-defined]
  222. else:
  223. self._pending_action.is_confirmed = ActionConfirmationStatus.REJECTED # type: ignore[attr-defined]
  224. self.event_stream.add_event(self._pending_action, EventSource.AGENT)
  225. self.event_stream.add_event(
  226. AgentStateChangedObservation('', self.state.agent_state), EventSource.AGENT
  227. )
  228. if new_state == AgentState.INIT and self.state.resume_state:
  229. await self.set_agent_state_to(self.state.resume_state)
  230. self.state.resume_state = None
  231. def get_agent_state(self):
  232. """Returns the current state of the agent task."""
  233. return self.state.agent_state
  234. async def start_delegate(self, action: AgentDelegateAction):
  235. agent_cls: Type[Agent] = Agent.get_cls(action.agent)
  236. agent_config = self.agent_configs.get(action.agent, self.agent.config)
  237. llm_config = self.agent_to_llm_config.get(action.agent, self.agent.llm.config)
  238. llm = LLM(config=llm_config)
  239. delegate_agent = agent_cls(llm=llm, config=agent_config)
  240. state = State(
  241. inputs=action.inputs or {},
  242. local_iteration=0,
  243. iteration=self.state.iteration,
  244. max_iterations=self.state.max_iterations,
  245. delegate_level=self.state.delegate_level + 1,
  246. # global metrics should be shared between parent and child
  247. metrics=self.state.metrics,
  248. )
  249. logger.info(
  250. f'[Agent Controller {self.id}]: start delegate, creating agent {delegate_agent.name} using LLM {llm}'
  251. )
  252. self.delegate = AgentController(
  253. sid=self.id + '-delegate',
  254. agent=delegate_agent,
  255. event_stream=self.event_stream,
  256. max_iterations=self.state.max_iterations,
  257. max_budget_per_task=self.max_budget_per_task,
  258. agent_to_llm_config=self.agent_to_llm_config,
  259. agent_configs=self.agent_configs,
  260. initial_state=state,
  261. is_delegate=True,
  262. headless_mode=self.headless_mode,
  263. )
  264. await self.delegate.set_agent_state_to(AgentState.RUNNING)
  265. async def _step(self) -> None:
  266. if self.get_agent_state() != AgentState.RUNNING:
  267. await asyncio.sleep(1)
  268. return
  269. if self._pending_action:
  270. logger.debug(
  271. f'[Agent Controller {self.id}] waiting for pending action: {self._pending_action}'
  272. )
  273. await asyncio.sleep(1)
  274. return
  275. if self.delegate is not None:
  276. logger.debug(f'[Agent Controller {self.id}] Delegate not none, awaiting...')
  277. assert self.delegate != self
  278. await self.delegate._step()
  279. logger.debug(f'[Agent Controller {self.id}] Delegate step done')
  280. assert self.delegate is not None
  281. delegate_state = self.delegate.get_agent_state()
  282. logger.debug(
  283. f'[Agent Controller {self.id}] Delegate state: {delegate_state}'
  284. )
  285. if delegate_state == AgentState.ERROR:
  286. # close the delegate upon error
  287. await self.delegate.close()
  288. self.delegate = None
  289. self.delegateAction = None
  290. await self.report_error('Delegator agent encounters an error')
  291. return
  292. delegate_done = delegate_state in (AgentState.FINISHED, AgentState.REJECTED)
  293. if delegate_done:
  294. logger.info(
  295. f'[Agent Controller {self.id}] Delegate agent has finished execution'
  296. )
  297. # retrieve delegate result
  298. outputs = self.delegate.state.outputs if self.delegate.state else {}
  299. # update iteration that shall be shared across agents
  300. self.state.iteration = self.delegate.state.iteration
  301. # close delegate controller: we must close the delegate controller before adding new events
  302. await self.delegate.close()
  303. # update delegate result observation
  304. # TODO: replace this with AI-generated summary (#2395)
  305. formatted_output = ', '.join(
  306. f'{key}: {value}' for key, value in outputs.items()
  307. )
  308. content = (
  309. f'{self.delegate.agent.name} finishes task with {formatted_output}'
  310. )
  311. obs: Observation = AgentDelegateObservation(
  312. outputs=outputs, content=content
  313. )
  314. # clean up delegate status
  315. self.delegate = None
  316. self.delegateAction = None
  317. self.event_stream.add_event(obs, EventSource.AGENT)
  318. return
  319. logger.info(
  320. f'{self.agent.name} LEVEL {self.state.delegate_level} LOCAL STEP {self.state.local_iteration} GLOBAL STEP {self.state.iteration}',
  321. extra={'msg_type': 'STEP'},
  322. )
  323. if self.state.iteration >= self.state.max_iterations:
  324. if self.state.traffic_control_state == TrafficControlState.PAUSED:
  325. logger.info(
  326. 'Hitting traffic control, temporarily resume upon user request'
  327. )
  328. self.state.traffic_control_state = TrafficControlState.NORMAL
  329. else:
  330. self.state.traffic_control_state = TrafficControlState.THROTTLING
  331. if self.headless_mode:
  332. # set to ERROR state if running in headless mode
  333. # since user cannot resume on the web interface
  334. await self.report_error(
  335. 'Agent reached maximum number of iterations in headless mode, task stopped.'
  336. )
  337. await self.set_agent_state_to(AgentState.ERROR)
  338. else:
  339. await self.report_error(
  340. f'Agent reached maximum number of iterations, task paused. {TRAFFIC_CONTROL_REMINDER}'
  341. )
  342. await self.set_agent_state_to(AgentState.PAUSED)
  343. return
  344. elif self.max_budget_per_task is not None:
  345. current_cost = self.state.metrics.accumulated_cost
  346. if current_cost > self.max_budget_per_task:
  347. if self.state.traffic_control_state == TrafficControlState.PAUSED:
  348. logger.info(
  349. 'Hitting traffic control, temporarily resume upon user request'
  350. )
  351. self.state.traffic_control_state = TrafficControlState.NORMAL
  352. else:
  353. self.state.traffic_control_state = TrafficControlState.THROTTLING
  354. if self.headless_mode:
  355. # set to ERROR state if running in headless mode
  356. # there is no way to resume
  357. await self.report_error(
  358. f'Task budget exceeded. Current cost: {current_cost:.2f}, max budget: {self.max_budget_per_task:.2f}, task stopped.'
  359. )
  360. await self.set_agent_state_to(AgentState.ERROR)
  361. else:
  362. await self.report_error(
  363. f'Task budget exceeded. Current cost: {current_cost:.2f}, Max budget: {self.max_budget_per_task:.2f}, task paused. {TRAFFIC_CONTROL_REMINDER}'
  364. )
  365. await self.set_agent_state_to(AgentState.PAUSED)
  366. return
  367. self.update_state_before_step()
  368. action: Action = NullAction()
  369. try:
  370. action = self.agent.step(self.state)
  371. if action is None:
  372. raise LLMNoActionError('No action was returned')
  373. except (LLMMalformedActionError, LLMNoActionError, LLMResponseError) as e:
  374. # report to the user
  375. # and send the underlying exception to the LLM for self-correction
  376. await self.report_error(str(e))
  377. return
  378. if action.runnable:
  379. if self.state.confirmation_mode and (
  380. type(action) is CmdRunAction or type(action) is IPythonRunCellAction
  381. ):
  382. action.is_confirmed = ActionConfirmationStatus.AWAITING_CONFIRMATION
  383. self._pending_action = action
  384. if not isinstance(action, NullAction):
  385. if (
  386. hasattr(action, 'is_confirmed')
  387. and action.is_confirmed
  388. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  389. ):
  390. await self.set_agent_state_to(AgentState.AWAITING_USER_CONFIRMATION)
  391. self.event_stream.add_event(action, EventSource.AGENT)
  392. await self.update_state_after_step()
  393. logger.info(action, extra={'msg_type': 'ACTION'})
  394. if self._is_stuck():
  395. await self.report_error('Agent got stuck in a loop')
  396. await self.set_agent_state_to(AgentState.ERROR)
  397. def get_state(self):
  398. return self.state
  399. def set_initial_state(
  400. self,
  401. state: State | None,
  402. max_iterations: int,
  403. confirmation_mode: bool = False,
  404. ):
  405. # state from the previous session, state from a parent agent, or a new state
  406. # note that this is called twice when restoring a previous session, first with state=None
  407. if state is None:
  408. self.state = State(
  409. inputs={},
  410. max_iterations=max_iterations,
  411. confirmation_mode=confirmation_mode,
  412. )
  413. else:
  414. self.state = state
  415. # when restored from a previous session, the State object will have history, start_id, and end_id
  416. # connect it to the event stream
  417. self.state.history.set_event_stream(self.event_stream)
  418. # if start_id was not set in State, we're starting fresh, at the top of the stream
  419. start_id = self.state.start_id
  420. if start_id == -1:
  421. start_id = self.event_stream.get_latest_event_id() + 1
  422. else:
  423. logger.debug(f'AgentController {self.id} restoring from event {start_id}')
  424. # make sure history is in sync
  425. self.state.start_id = start_id
  426. self.state.history.start_id = start_id
  427. # if there was an end_id saved in State, set it in history
  428. # currently not used, later useful for delegates
  429. if self.state.end_id > -1:
  430. self.state.history.end_id = self.state.end_id
  431. def _is_stuck(self):
  432. # check if delegate stuck
  433. if self.delegate and self.delegate._is_stuck():
  434. return True
  435. return self._stuck_detector.is_stuck()
  436. def __repr__(self):
  437. return (
  438. f'AgentController(id={self.id}, agent={self.agent!r}, '
  439. f'event_stream={self.event_stream!r}, '
  440. f'state={self.state!r}, agent_task={self.agent_task!r}, '
  441. f'delegate={self.delegate!r}, _pending_action={self._pending_action!r})'
  442. )