agent.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import asyncio
  2. from typing import Dict, List, Optional
  3. from opendevin import config
  4. from opendevin.action import (
  5. Action,
  6. NullAction,
  7. )
  8. from opendevin.agent import Agent
  9. from opendevin.controller import AgentController
  10. from opendevin.llm.llm import LLM
  11. from opendevin.logger import opendevin_logger as logger
  12. from opendevin.observation import NullObservation, Observation, UserMessageObservation
  13. from opendevin.schema import ActionType, ConfigType, TaskState, TaskStateAction
  14. from opendevin.server.session import session_manager
  15. # new task state to valid old task states
  16. VALID_TASK_STATE_MAP: Dict[TaskStateAction, List[TaskState]] = {
  17. TaskStateAction.PAUSE: [TaskState.RUNNING],
  18. TaskStateAction.RESUME: [TaskState.PAUSED],
  19. TaskStateAction.STOP: [TaskState.RUNNING, TaskState.PAUSED],
  20. }
  21. IGNORED_TASK_STATE_MAP: Dict[TaskStateAction, List[TaskState]] = {
  22. TaskStateAction.PAUSE: [
  23. TaskState.INIT,
  24. TaskState.PAUSED,
  25. TaskState.STOPPED,
  26. TaskState.FINISHED,
  27. ],
  28. TaskStateAction.RESUME: [
  29. TaskState.INIT,
  30. TaskState.RUNNING,
  31. TaskState.STOPPED,
  32. TaskState.FINISHED,
  33. ],
  34. TaskStateAction.STOP: [TaskState.INIT, TaskState.STOPPED, TaskState.FINISHED],
  35. }
  36. TASK_STATE_ACTION_MAP: Dict[TaskStateAction, TaskState] = {
  37. TaskStateAction.START: TaskState.RUNNING,
  38. TaskStateAction.PAUSE: TaskState.PAUSED,
  39. TaskStateAction.RESUME: TaskState.RUNNING,
  40. TaskStateAction.STOP: TaskState.STOPPED,
  41. }
  42. class AgentUnit:
  43. """Represents a session with an agent.
  44. Attributes:
  45. controller: The AgentController instance for controlling the agent.
  46. agent_task: The task representing the agent's execution.
  47. """
  48. sid: str
  49. controller: Optional[AgentController] = None
  50. agent_task: Optional[asyncio.Task] = None
  51. def __init__(self, sid):
  52. """Initializes a new instance of the Session class."""
  53. self.sid = sid
  54. async def send_error(self, message):
  55. """Sends an error message to the client.
  56. Args:
  57. message: The error message to send.
  58. """
  59. await session_manager.send_error(self.sid, message)
  60. async def send_message(self, message):
  61. """Sends a message to the client.
  62. Args:
  63. message: The message to send.
  64. """
  65. await session_manager.send_message(self.sid, message)
  66. async def send(self, data):
  67. """Sends data to the client.
  68. Args:
  69. data: The data to send.
  70. """
  71. await session_manager.send(self.sid, data)
  72. async def dispatch(self, action: str | None, data: dict):
  73. """Dispatches actions to the agent from the client."""
  74. if action is None:
  75. await self.send_error('Invalid action')
  76. return
  77. match action:
  78. case ActionType.INIT:
  79. await self.create_controller(data)
  80. case ActionType.START:
  81. await self.start_task(data)
  82. case ActionType.USER_MESSAGE:
  83. await self.send_user_message(data)
  84. case ActionType.CHANGE_TASK_STATE:
  85. task_state_action = data.get('args', {}).get('task_state_action', None)
  86. if task_state_action is None:
  87. await self.send_error('No task state action specified.')
  88. return
  89. await self.set_task_state(TaskStateAction(task_state_action))
  90. case ActionType.CHAT:
  91. if self.controller is None:
  92. await self.send_error('No agent started. Please wait a second...')
  93. return
  94. self.controller.add_history(
  95. NullAction(), UserMessageObservation(data['message'])
  96. )
  97. case _:
  98. await self.send_error("I didn't recognize this action:" + action)
  99. def get_arg_or_default(self, _args: dict, key: ConfigType) -> str:
  100. """Gets an argument from the args dictionary or the default value.
  101. Args:
  102. _args: The args dictionary.
  103. key: The key to get.
  104. Returns:
  105. The value of the key or the default value.
  106. """
  107. return _args.get(key, config.get(key))
  108. async def create_controller(self, start_event: dict):
  109. """Creates an AgentController instance.
  110. Args:
  111. start_event: The start event data (optional).
  112. """
  113. args = {
  114. key: value
  115. for key, value in start_event.get('args', {}).items()
  116. if value != ''
  117. } # remove empty values, prevent FE from sending empty strings
  118. agent_cls = self.get_arg_or_default(args, ConfigType.AGENT)
  119. model = self.get_arg_or_default(args, ConfigType.LLM_MODEL)
  120. api_key = self.get_arg_or_default(args, ConfigType.LLM_API_KEY)
  121. api_base = config.get(ConfigType.LLM_BASE_URL)
  122. max_iterations = self.get_arg_or_default(args, ConfigType.MAX_ITERATIONS)
  123. max_chars = self.get_arg_or_default(args, ConfigType.MAX_CHARS)
  124. logger.info(f'Creating agent {agent_cls} using LLM {model}')
  125. llm = LLM(model=model, api_key=api_key, base_url=api_base)
  126. try:
  127. self.controller = AgentController(
  128. sid=self.sid,
  129. agent=Agent.get_cls(agent_cls)(llm),
  130. max_iterations=int(max_iterations),
  131. max_chars=int(max_chars),
  132. callbacks=[self.on_agent_event],
  133. )
  134. except Exception as e:
  135. logger.exception(f'Error creating controller: {e}')
  136. await self.send_error(
  137. 'Error creating controller. Please check Docker is running and visit `https://opendevin.github.io/OpenDevin/modules/usage/troubleshooting` for more debugging information..'
  138. )
  139. return
  140. await self.init_done()
  141. async def init_done(self):
  142. if self.controller is None:
  143. await self.send_error('No agent started.')
  144. return
  145. await self.send(
  146. {
  147. 'action': ActionType.INIT,
  148. 'message': 'Control loop started.',
  149. }
  150. )
  151. await self.controller.notify_task_state_changed()
  152. async def start_task(self, start_event):
  153. """Starts a task for the agent.
  154. Args:
  155. start_event: The start event data.
  156. """
  157. task = start_event['args']['task']
  158. if self.controller is None:
  159. await self.send_error('No agent started. Please wait a second...')
  160. return
  161. try:
  162. if self.agent_task:
  163. self.agent_task.cancel()
  164. self.agent_task = asyncio.create_task(
  165. self.controller.start(task), name='agent start task loop'
  166. )
  167. except Exception as e:
  168. await self.send_error(f'Error during task loop: {e}')
  169. async def send_user_message(self, data: dict):
  170. if not self.agent_task or not self.controller:
  171. await self.send_error('No agent started.')
  172. return
  173. await self.controller.add_user_message(
  174. UserMessageObservation(data['args']['message'])
  175. )
  176. async def set_task_state(self, new_state_action: TaskStateAction):
  177. """Sets the state of the agent task."""
  178. if self.controller is None:
  179. await self.send_error('No agent started.')
  180. return
  181. cur_state = self.controller.get_task_state()
  182. new_state = TASK_STATE_ACTION_MAP.get(new_state_action)
  183. if new_state is None:
  184. await self.send_error('Invalid task state action.')
  185. return
  186. if cur_state in VALID_TASK_STATE_MAP.get(new_state_action, []):
  187. await self.controller.set_task_state_to(new_state)
  188. elif cur_state in IGNORED_TASK_STATE_MAP.get(new_state_action, []):
  189. # notify once again.
  190. await self.controller.notify_task_state_changed()
  191. return
  192. else:
  193. await self.send_error('Current task state not recognized.')
  194. return
  195. if new_state_action == TaskStateAction.RESUME:
  196. if self.agent_task:
  197. self.agent_task.cancel()
  198. self.agent_task = asyncio.create_task(
  199. self.controller.resume(), name='agent resume task loop'
  200. )
  201. async def on_agent_event(self, event: Observation | Action):
  202. """Callback function for agent events.
  203. Args:
  204. event: The agent event (Observation or Action).
  205. """
  206. if isinstance(event, NullAction):
  207. return
  208. if isinstance(event, NullObservation):
  209. return
  210. await self.send(event.to_dict())
  211. def close(self):
  212. if self.agent_task:
  213. self.agent_task.cancel()
  214. if self.controller is not None:
  215. self.controller.action_manager.sandbox.close()