| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- import asyncio
- from typing import Callable, List, Type
- from opendevin import config
- from opendevin.action import (
- Action,
- AgentFinishAction,
- AgentDelegateAction,
- NullAction,
- )
- from opendevin.observation import (
- Observation,
- AgentErrorObservation,
- AgentDelegateObservation,
- NullObservation,
- )
- from opendevin.agent import Agent
- from opendevin.exceptions import AgentMalformedActionError, AgentNoActionError, MaxCharsExceedError
- from opendevin.logger import opendevin_logger as logger
- from opendevin.plan import Plan
- from opendevin.state import State
- from opendevin.action.tasks import TaskStateChangedAction
- from opendevin.schema import TaskState
- from opendevin.controller.action_manager import ActionManager
- MAX_ITERATIONS = config.get('MAX_ITERATIONS')
- MAX_CHARS = config.get('MAX_CHARS')
- class AgentController:
- id: str
- agent: Agent
- max_iterations: int
- action_manager: ActionManager
- callbacks: List[Callable]
- delegate: 'AgentController | None' = None
- state: State | None = None
- _task_state: TaskState = TaskState.INIT
- _cur_step: int = 0
- def __init__(
- self,
- agent: Agent,
- inputs: dict = {},
- sid: str = 'default',
- max_iterations: int = MAX_ITERATIONS,
- max_chars: int = MAX_CHARS,
- callbacks: List[Callable] = [],
- ):
- self.id = sid
- self.agent = agent
- self.max_iterations = max_iterations
- self.action_manager = ActionManager(self.id)
- self.max_chars = max_chars
- self.callbacks = callbacks
- # Initialize agent-required plugins for sandbox (if any)
- self.action_manager.init_sandbox_plugins(agent.sandbox_plugins)
- def update_state_for_step(self, i):
- if self.state is None:
- return
- self.state.iteration = i
- self.state.background_commands_obs = self.action_manager.get_background_obs()
- def update_state_after_step(self):
- if self.state is None:
- return
- self.state.updated_info = []
- def add_history(self, action: Action, observation: Observation):
- if self.state is None:
- return
- if not isinstance(action, Action):
- raise TypeError(
- f'action must be an instance of Action, got {type(action).__name__} instead'
- )
- if not isinstance(observation, Observation):
- raise TypeError(
- f'observation must be an instance of Observation, got {type(observation).__name__} instead'
- )
- self.state.history.append((action, observation))
- self.state.updated_info.append((action, observation))
- async def _run(self):
- if self.state is None:
- return
- if self._task_state != TaskState.RUNNING:
- raise ValueError('Task is not in running state')
- for i in range(self._cur_step, self.max_iterations):
- self._cur_step = i
- try:
- finished = await self.step(i)
- if finished:
- self._task_state = TaskState.FINISHED
- except Exception as e:
- logger.error('Error in loop', exc_info=True)
- raise e
- if self._task_state == TaskState.FINISHED:
- logger.info('Task finished by agent')
- await self.reset_task()
- break
- elif self._task_state == TaskState.STOPPED:
- logger.info('Task stopped by user')
- await self.reset_task()
- break
- elif self._task_state == TaskState.PAUSED:
- logger.info('Task paused')
- self._cur_step = i + 1
- await self.notify_task_state_changed()
- break
- if self._is_stuck():
- logger.info('Loop detected, stopping task')
- observation = AgentErrorObservation('I got stuck into a loop, the task has stopped.')
- await self._run_callbacks(observation)
- await self.set_task_state_to(TaskState.STOPPED)
- break
- async def setup_task(self, task: str, inputs: dict = {}):
- """Sets up the agent controller with a task.
- """
- self._task_state = TaskState.RUNNING
- await self.notify_task_state_changed()
- self.state = State(Plan(task))
- self.state.inputs = inputs
- async def start(self, task: str):
- """Starts the agent controller with a task.
- If task already run before, it will continue from the last step.
- """
- await self.setup_task(task)
- await self._run()
- async def resume(self):
- if self.state is None:
- raise ValueError('No task to resume')
- self._task_state = TaskState.RUNNING
- await self.notify_task_state_changed()
- await self._run()
- async def reset_task(self):
- self.state = None
- self._cur_step = 0
- self._task_state = TaskState.INIT
- self.agent.reset()
- await self.notify_task_state_changed()
- async def set_task_state_to(self, state: TaskState):
- self._task_state = state
- if state == TaskState.STOPPED:
- await self.reset_task()
- logger.info(f'Task state set to {state}')
- def get_task_state(self):
- """Returns the current state of the agent task."""
- return self._task_state
- async def notify_task_state_changed(self):
- await self._run_callbacks(TaskStateChangedAction(self._task_state))
- async def start_delegate(self, action: AgentDelegateAction):
- AgentCls: Type[Agent] = Agent.get_cls(action.agent)
- agent = AgentCls(llm=self.agent.llm)
- self.delegate = AgentController(
- sid=self.id + '-delegate',
- agent=agent,
- max_iterations=self.max_iterations,
- max_chars=self.max_chars,
- callbacks=self.callbacks,
- )
- task = action.inputs.get('task') or ''
- await self.delegate.setup_task(task, action.inputs)
- async def step(self, i: int) -> bool:
- if self.state is None:
- raise ValueError('No task to run')
- if self.delegate is not None:
- delegate_done = await self.delegate.step(i)
- if delegate_done:
- outputs = self.delegate.state.outputs if self.delegate.state else {}
- obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
- self.add_history(NullAction(), obs)
- self.delegate = None
- self.delegateAction = None
- return False
- logger.info(f'STEP {i}', extra={'msg_type': 'STEP'})
- logger.info(self.state.plan.main_goal, extra={'msg_type': 'PLAN'})
- if self.state.num_of_chars > self.max_chars:
- raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
- log_obs = self.action_manager.get_background_obs()
- for obs in log_obs:
- self.add_history(NullAction(), obs)
- await self._run_callbacks(obs)
- logger.info(obs, extra={'msg_type': 'BACKGROUND LOG'})
- self.update_state_for_step(i)
- action: Action = NullAction()
- observation: Observation = NullObservation('')
- try:
- action = self.agent.step(self.state)
- if action is None:
- raise AgentNoActionError('No action was returned')
- except (AgentMalformedActionError, AgentNoActionError) as e:
- observation = AgentErrorObservation(str(e))
- logger.info(action, extra={'msg_type': 'ACTION'})
- self.update_state_after_step()
- await self._run_callbacks(action)
- finished = isinstance(action, AgentFinishAction)
- if finished:
- self.state.outputs = action.outputs # type: ignore[attr-defined]
- logger.info(action, extra={'msg_type': 'INFO'})
- return True
- if isinstance(observation, NullObservation):
- observation = await self.action_manager.run_action(action, self)
- if not isinstance(observation, NullObservation):
- logger.info(observation, extra={'msg_type': 'OBSERVATION'})
- self.add_history(action, observation)
- await self._run_callbacks(observation)
- return False
- async def _run_callbacks(self, event):
- if event is None:
- return
- for callback in self.callbacks:
- idx = self.callbacks.index(callback)
- try:
- await callback(event)
- except Exception as e:
- logger.exception(f'Callback error: {e}, idx: {idx}')
- await asyncio.sleep(
- 0.001
- ) # Give back control for a tick, so we can await in callbacks
- def get_state(self):
- return self.state
- def _is_stuck(self):
- if self.state is None or self.state.history is None or len(self.state.history) < 3:
- return False
- # if the last three (Action, Observation) tuples are too repetitive
- # the agent got stuck in a loop
- if all(
- [self.state.history[-i][0] == self.state.history[-3][0] for i in range(1, 3)]
- ):
- # it repeats same action, give it a chance, but not if:
- if (all
- (isinstance(self.state.history[-i][1], NullObservation) for i in range(1, 4))):
- # same (Action, NullObservation): like 'think' the same thought over and over
- logger.debug('Action, NullObservation loop detected')
- return True
- elif (all
- (isinstance(self.state.history[-i][1], AgentErrorObservation) for i in range(1, 4))):
- # (NullAction, AgentErrorObservation): errors coming from an exception
- # (Action, AgentErrorObservation): the same action getting an error, even if not necessarily the same error
- logger.debug('Action, AgentErrorObservation loop detected')
- return True
- return False
|