agent_controller.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. import asyncio
  2. import traceback
  3. from typing import Optional, Type
  4. from opendevin.controller.agent import Agent
  5. from opendevin.controller.state.state import TRAFFIC_CONTROL_STATE, State
  6. from opendevin.core.config import config
  7. from opendevin.core.exceptions import (
  8. LLMMalformedActionError,
  9. LLMNoActionError,
  10. LLMResponseError,
  11. )
  12. from opendevin.core.logger import opendevin_logger as logger
  13. from opendevin.core.schema import AgentState
  14. from opendevin.events import EventSource, EventStream, EventStreamSubscriber
  15. from opendevin.events.action import (
  16. Action,
  17. AddTaskAction,
  18. AgentDelegateAction,
  19. AgentFinishAction,
  20. AgentRejectAction,
  21. ChangeAgentStateAction,
  22. MessageAction,
  23. ModifyTaskAction,
  24. NullAction,
  25. )
  26. from opendevin.events.action.commands import CmdKillAction
  27. from opendevin.events.event import Event
  28. from opendevin.events.observation import (
  29. AgentDelegateObservation,
  30. AgentStateChangedObservation,
  31. CmdOutputObservation,
  32. ErrorObservation,
  33. NullObservation,
  34. Observation,
  35. )
  36. MAX_ITERATIONS = config.max_iterations
  37. MAX_BUDGET_PER_TASK = config.max_budget_per_task
  38. # note: RESUME is only available on web GUI
  39. TRAFFIC_CONTROL_REMINDER = (
  40. "Please click on resume button if you'd like to continue, or start a new task."
  41. )
  42. class AgentController:
  43. id: str
  44. agent: Agent
  45. max_iterations: int
  46. event_stream: EventStream
  47. state: State
  48. agent_task: Optional[asyncio.Task] = None
  49. parent: 'AgentController | None' = None
  50. delegate: 'AgentController | None' = None
  51. _pending_action: Action | None = None
  52. def __init__(
  53. self,
  54. agent: Agent,
  55. event_stream: EventStream,
  56. sid: str = 'default',
  57. max_iterations: int = MAX_ITERATIONS,
  58. max_budget_per_task: float | None = MAX_BUDGET_PER_TASK,
  59. initial_state: State | None = None,
  60. is_delegate: bool = False,
  61. ):
  62. """Initializes a new instance of the AgentController class.
  63. Args:
  64. agent: The agent instance to control.
  65. event_stream: The event stream to publish events to.
  66. sid: The session ID of the agent.
  67. max_iterations: The maximum number of iterations the agent can run.
  68. max_budget_per_task: The maximum budget (in USD) allowed per task, beyond which the agent will stop.
  69. initial_state: The initial state of the controller.
  70. is_delegate: Whether this controller is a delegate.
  71. """
  72. self._step_lock = asyncio.Lock()
  73. self.id = sid
  74. self.agent = agent
  75. # subscribe to the event stream
  76. self.event_stream = event_stream
  77. self.event_stream.subscribe(
  78. EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, append=is_delegate
  79. )
  80. # state from the previous session, state from a parent agent, or a fresh state
  81. self._set_initial_state(
  82. state=initial_state,
  83. max_iterations=max_iterations,
  84. )
  85. self.max_budget_per_task = max_budget_per_task
  86. if not is_delegate:
  87. self.agent_task = asyncio.create_task(self._start_step_loop())
  88. async def close(self):
  89. if self.agent_task is not None:
  90. self.agent_task.cancel()
  91. await self.set_agent_state_to(AgentState.STOPPED)
  92. self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER)
  93. def update_state_before_step(self):
  94. self.state.iteration += 1
  95. async def update_state_after_step(self):
  96. self.state.updated_info = []
  97. # update metrics especially for cost
  98. self.state.metrics = self.agent.llm.metrics
  99. async def report_error(self, message: str, exception: Exception | None = None):
  100. """
  101. This error will be reported to the user and sent to the LLM next step, in the hope it can self-correct.
  102. This method should be called for a particular type of errors:
  103. - the string message should be user-friendly, it will be shown in the UI
  104. - an ErrorObservation can be sent to the LLM by the agent, with the exception message, so it can self-correct next time
  105. """
  106. self.state.last_error = message
  107. if exception:
  108. self.state.last_error += f': {exception}'
  109. await self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
  110. async def add_history(self, action: Action, observation: Observation):
  111. if isinstance(action, NullAction) and isinstance(observation, NullObservation):
  112. return
  113. self.state.history.append((action, observation))
  114. self.state.updated_info.append((action, observation))
  115. async def _start_step_loop(self):
  116. logger.info(f'[Agent Controller {self.id}] Starting step loop...')
  117. while True:
  118. try:
  119. await self._step()
  120. except asyncio.CancelledError:
  121. logger.info('AgentController task was cancelled')
  122. break
  123. except Exception as e:
  124. traceback.print_exc()
  125. logger.error(f'Error while running the agent: {e}')
  126. logger.error(traceback.format_exc())
  127. await self.report_error(
  128. 'There was an unexpected error while running the agent', exception=e
  129. )
  130. await self.set_agent_state_to(AgentState.ERROR)
  131. break
  132. await asyncio.sleep(0.1)
  133. async def on_event(self, event: Event):
  134. if isinstance(event, ChangeAgentStateAction):
  135. await self.set_agent_state_to(event.agent_state) # type: ignore
  136. elif isinstance(event, MessageAction):
  137. if event.source == EventSource.USER:
  138. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  139. await self.add_history(event, NullObservation(''))
  140. if self.get_agent_state() != AgentState.RUNNING:
  141. await self.set_agent_state_to(AgentState.RUNNING)
  142. elif event.source == EventSource.AGENT and event.wait_for_response:
  143. logger.info(event, extra={'msg_type': 'ACTION'})
  144. await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  145. elif isinstance(event, AgentDelegateAction):
  146. await self.start_delegate(event)
  147. elif isinstance(event, AddTaskAction):
  148. self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
  149. elif isinstance(event, ModifyTaskAction):
  150. self.state.root_task.set_subtask_state(event.task_id, event.state)
  151. elif isinstance(event, AgentFinishAction):
  152. self.state.outputs = event.outputs # type: ignore[attr-defined]
  153. await self.set_agent_state_to(AgentState.FINISHED)
  154. elif isinstance(event, AgentRejectAction):
  155. self.state.outputs = event.outputs # type: ignore[attr-defined]
  156. await self.set_agent_state_to(AgentState.REJECTED)
  157. elif isinstance(event, Observation):
  158. if self._pending_action and self._pending_action.id == event.cause:
  159. await self.add_history(self._pending_action, event)
  160. self._pending_action = None
  161. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  162. elif isinstance(event, CmdOutputObservation):
  163. await self.add_history(NullAction(), event)
  164. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  165. elif isinstance(event, AgentDelegateObservation):
  166. await self.add_history(NullAction(), event)
  167. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  168. elif isinstance(event, ErrorObservation):
  169. await self.add_history(NullAction(), event)
  170. logger.info(event, extra={'msg_type': 'OBSERVATION'})
  171. def reset_task(self):
  172. self.agent.reset()
  173. async def set_agent_state_to(self, new_state: AgentState):
  174. logger.debug(
  175. f'[Agent Controller {self.id}] Setting agent({self.agent.name}) state from {self.state.agent_state} to {new_state}'
  176. )
  177. if new_state == self.state.agent_state:
  178. return
  179. if (
  180. self.state.agent_state == AgentState.PAUSED
  181. and new_state == AgentState.RUNNING
  182. and self.state.traffic_control_state == TRAFFIC_CONTROL_STATE.THROTTLING
  183. ):
  184. # user intends to interrupt traffic control and let the task resume temporarily
  185. self.state.traffic_control_state = TRAFFIC_CONTROL_STATE.PAUSED
  186. self.state.agent_state = new_state
  187. if new_state == AgentState.STOPPED or new_state == AgentState.ERROR:
  188. self.reset_task()
  189. await self.event_stream.add_event(
  190. AgentStateChangedObservation('', self.state.agent_state), EventSource.AGENT
  191. )
  192. if new_state == AgentState.INIT and self.state.resume_state:
  193. await self.set_agent_state_to(self.state.resume_state)
  194. self.state.resume_state = None
  195. def get_agent_state(self):
  196. """Returns the current state of the agent task."""
  197. if self.delegate is not None:
  198. return self.delegate.get_agent_state()
  199. return self.state.agent_state
  200. async def start_delegate(self, action: AgentDelegateAction):
  201. AgentCls: Type[Agent] = Agent.get_cls(action.agent)
  202. agent = AgentCls(llm=self.agent.llm)
  203. state = State(
  204. inputs=action.inputs or {},
  205. iteration=0,
  206. max_iterations=self.state.max_iterations,
  207. delegate_level=self.state.delegate_level + 1,
  208. # metrics should be shared between parent and child
  209. metrics=self.state.metrics,
  210. )
  211. logger.info(f'[Agent Controller {self.id}]: start delegate')
  212. self.delegate = AgentController(
  213. sid=self.id + '-delegate',
  214. agent=agent,
  215. event_stream=self.event_stream,
  216. max_iterations=self.state.max_iterations,
  217. max_budget_per_task=self.max_budget_per_task,
  218. initial_state=state,
  219. is_delegate=True,
  220. )
  221. await self.delegate.set_agent_state_to(AgentState.RUNNING)
  222. async def _step(self):
  223. logger.debug(f'[Agent Controller {self.id}] Entering step method')
  224. if self.get_agent_state() != AgentState.RUNNING:
  225. await asyncio.sleep(1)
  226. return
  227. if self._pending_action:
  228. logger.debug(
  229. f'[Agent Controller {self.id}] waiting for pending action: {self._pending_action}'
  230. )
  231. await asyncio.sleep(1)
  232. return
  233. if self.delegate is not None:
  234. logger.debug(f'[Agent Controller {self.id}] Delegate not none, awaiting...')
  235. assert self.delegate != self
  236. await self.delegate._step()
  237. logger.debug(f'[Agent Controller {self.id}] Delegate step done')
  238. assert self.delegate is not None
  239. delegate_state = self.delegate.get_agent_state()
  240. if delegate_state == AgentState.ERROR:
  241. # close the delegate upon error
  242. await self.delegate.close()
  243. self.delegate = None
  244. self.delegateAction = None
  245. await self.report_error('Delegator agent encounters an error')
  246. return
  247. delegate_done = delegate_state in (AgentState.FINISHED, AgentState.REJECTED)
  248. if delegate_done:
  249. logger.info(
  250. f'[Agent Controller {self.id}] Delegate agent has finished execution'
  251. )
  252. # retrieve delegate result
  253. outputs = self.delegate.state.outputs if self.delegate.state else {}
  254. # close delegate controller: we must close the delegate controller before adding new events
  255. await self.delegate.close()
  256. # update delegate result observation
  257. # TODO: replace this with AI-generated summary (#2395)
  258. formatted_output = ', '.join(
  259. f'{key}: {value}' for key, value in outputs.items()
  260. )
  261. content = (
  262. f'{self.delegate.agent.name} finishes task with {formatted_output}'
  263. )
  264. obs: Observation = AgentDelegateObservation(
  265. outputs=outputs, content=content
  266. )
  267. # clean up delegate status
  268. self.delegate = None
  269. self.delegateAction = None
  270. await self.event_stream.add_event(obs, EventSource.AGENT)
  271. return
  272. logger.info(
  273. f'{self.agent.name} LEVEL {self.state.delegate_level} STEP {self.state.iteration}',
  274. extra={'msg_type': 'STEP'},
  275. )
  276. if self.state.iteration >= self.state.max_iterations:
  277. if self.state.traffic_control_state == TRAFFIC_CONTROL_STATE.PAUSED:
  278. logger.info(
  279. 'Hitting traffic control, temporarily resume upon user request'
  280. )
  281. self.state.traffic_control_state = TRAFFIC_CONTROL_STATE.NORMAL
  282. else:
  283. self.state.traffic_control_state = TRAFFIC_CONTROL_STATE.THROTTLING
  284. await self.report_error(
  285. f'Agent reached maximum number of iterations, task paused. {TRAFFIC_CONTROL_REMINDER}'
  286. )
  287. await self.set_agent_state_to(AgentState.PAUSED)
  288. return
  289. elif self.max_budget_per_task is not None:
  290. current_cost = self.state.metrics.accumulated_cost
  291. if current_cost > self.max_budget_per_task:
  292. if self.state.traffic_control_state == TRAFFIC_CONTROL_STATE.PAUSED:
  293. logger.info(
  294. 'Hitting traffic control, temporarily resume upon user request'
  295. )
  296. self.state.traffic_control_state = TRAFFIC_CONTROL_STATE.NORMAL
  297. else:
  298. self.state.traffic_control_state = TRAFFIC_CONTROL_STATE.THROTTLING
  299. await self.report_error(
  300. f'Task budget exceeded. Current cost: {current_cost:.2f}, Max budget: {self.max_budget_per_task:.2f}, task paused. {TRAFFIC_CONTROL_REMINDER}'
  301. )
  302. await self.set_agent_state_to(AgentState.PAUSED)
  303. return
  304. self.update_state_before_step()
  305. action: Action = NullAction()
  306. try:
  307. action = self.agent.step(self.state)
  308. if action is None:
  309. raise LLMNoActionError('No action was returned')
  310. except (LLMMalformedActionError, LLMNoActionError, LLMResponseError) as e:
  311. # report to the user
  312. # and send the underlying exception to the LLM for self-correction
  313. await self.report_error(str(e))
  314. return
  315. logger.info(action, extra={'msg_type': 'ACTION'})
  316. if action.runnable:
  317. self._pending_action = action
  318. else:
  319. await self.add_history(action, NullObservation(''))
  320. if not isinstance(action, NullAction):
  321. await self.event_stream.add_event(action, EventSource.AGENT)
  322. await self.update_state_after_step()
  323. if self._is_stuck():
  324. await self.report_error('Agent got stuck in a loop')
  325. await self.set_agent_state_to(AgentState.ERROR)
  326. def get_state(self):
  327. return self.state
  328. def _set_initial_state(
  329. self, state: State | None, max_iterations: int = MAX_ITERATIONS
  330. ):
  331. # state from the previous session, state from a parent agent, or a new state
  332. # note that this is called twice when restoring a previous session, first with state=None
  333. if state is None:
  334. self.state = State(inputs={}, max_iterations=max_iterations)
  335. else:
  336. self.state = state
  337. def _is_stuck(self):
  338. # check if delegate stuck
  339. if self.delegate and self.delegate._is_stuck():
  340. return True
  341. # filter out MessageAction with source='user' from history
  342. filtered_history = [
  343. _tuple
  344. for _tuple in self.state.history
  345. if not (
  346. isinstance(_tuple[0], MessageAction)
  347. and _tuple[0].source == EventSource.USER
  348. )
  349. ]
  350. if len(filtered_history) < 3:
  351. return False
  352. # FIXME rewrite this to be more readable
  353. # Scenario 1: the same (Action, Observation) loop
  354. # 3 pairs of (action, observation) to stop the agent
  355. last_three_tuples = filtered_history[-3:]
  356. if all(
  357. # (Action, Observation) tuples
  358. # compare the last action to the last three actions
  359. self._eq_no_pid(last_three_tuples[-1][0], _tuple[0])
  360. for _tuple in last_three_tuples
  361. ) and all(
  362. # compare the last observation to the last three observations
  363. self._eq_no_pid(last_three_tuples[-1][1], _tuple[1])
  364. for _tuple in last_three_tuples
  365. ):
  366. logger.warning('Action, Observation loop detected')
  367. return True
  368. if len(filtered_history) < 4:
  369. return False
  370. last_four_tuples = filtered_history[-4:]
  371. # Scenario 2: (action, error) pattern, not necessary identical error
  372. # 4 pairs of (action, error) to stop the agent
  373. if all(
  374. self._eq_no_pid(last_four_tuples[-1][0], _tuple[0])
  375. for _tuple in last_four_tuples
  376. ):
  377. # It repeats the same action, give it a chance, but not if:
  378. if all(
  379. isinstance(_tuple[1], ErrorObservation) for _tuple in last_four_tuples
  380. ):
  381. logger.warning('Action, ErrorObservation loop detected')
  382. return True
  383. # check if the agent repeats the same (Action, Observation)
  384. # every other step in the last six tuples
  385. # step1 = step3 = step5
  386. # step2 = step4 = step6
  387. if len(filtered_history) >= 6:
  388. last_six_tuples = filtered_history[-6:]
  389. if (
  390. # this pattern is every other step, like:
  391. # (action_1, obs_1), (action_2, obs_2), (action_1, obs_1), (action_2, obs_2),...
  392. self._eq_no_pid(last_six_tuples[-1][0], last_six_tuples[-3][0])
  393. and self._eq_no_pid(last_six_tuples[-1][0], last_six_tuples[-5][0])
  394. and self._eq_no_pid(last_six_tuples[-2][0], last_six_tuples[-4][0])
  395. and self._eq_no_pid(last_six_tuples[-2][0], last_six_tuples[-6][0])
  396. and self._eq_no_pid(last_six_tuples[-1][1], last_six_tuples[-3][1])
  397. and self._eq_no_pid(last_six_tuples[-1][1], last_six_tuples[-5][1])
  398. and self._eq_no_pid(last_six_tuples[-2][1], last_six_tuples[-4][1])
  399. and self._eq_no_pid(last_six_tuples[-2][1], last_six_tuples[-6][1])
  400. ):
  401. logger.warning('Action, Observation pattern detected')
  402. return True
  403. return False
  404. def __repr__(self):
  405. return (
  406. f'AgentController(id={self.id}, agent={self.agent!r}, '
  407. f'event_stream={self.event_stream!r}, '
  408. f'state={self.state!r}, agent_task={self.agent_task!r}, '
  409. f'delegate={self.delegate!r}, _pending_action={self._pending_action!r})'
  410. )
  411. def _eq_no_pid(self, obj1, obj2):
  412. if isinstance(obj1, CmdOutputObservation) and isinstance(
  413. obj2, CmdOutputObservation
  414. ):
  415. # for loop detection, ignore command_id, which is the pid
  416. return obj1.command == obj2.command and obj1.exit_code == obj2.exit_code
  417. elif isinstance(obj1, CmdKillAction) and isinstance(obj2, CmdKillAction):
  418. # for loop detection, ignore command_id, which is the pid
  419. return obj1.thought == obj2.thought
  420. else:
  421. # this is the default comparison
  422. return obj1 == obj2