agent.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import asyncio
  2. from typing import Dict, List, Optional
  3. from opendevin import config
  4. from opendevin.action import (
  5. Action,
  6. NullAction,
  7. )
  8. from opendevin.agent import Agent
  9. from opendevin.controller import AgentController
  10. from opendevin.llm.llm import LLM
  11. from opendevin.logger import opendevin_logger as logger
  12. from opendevin.observation import NullObservation, Observation, UserMessageObservation
  13. from opendevin.schema import ActionType, ConfigType, TaskState, TaskStateAction
  14. from opendevin.server.session import session_manager
  15. # new task state to valid old task states
  16. VALID_TASK_STATE_MAP: Dict[TaskStateAction, List[TaskState]] = {
  17. TaskStateAction.PAUSE: [TaskState.RUNNING],
  18. TaskStateAction.RESUME: [TaskState.PAUSED],
  19. TaskStateAction.STOP: [
  20. TaskState.RUNNING,
  21. TaskState.PAUSED,
  22. TaskState.AWAITING_USER_INPUT,
  23. ],
  24. }
  25. IGNORED_TASK_STATE_MAP: Dict[TaskStateAction, List[TaskState]] = {
  26. TaskStateAction.PAUSE: [
  27. TaskState.INIT,
  28. TaskState.PAUSED,
  29. TaskState.STOPPED,
  30. TaskState.FINISHED,
  31. TaskState.AWAITING_USER_INPUT,
  32. ],
  33. TaskStateAction.RESUME: [
  34. TaskState.INIT,
  35. TaskState.RUNNING,
  36. TaskState.STOPPED,
  37. TaskState.FINISHED,
  38. TaskState.AWAITING_USER_INPUT,
  39. ],
  40. TaskStateAction.STOP: [TaskState.INIT, TaskState.STOPPED, TaskState.FINISHED],
  41. }
  42. TASK_STATE_ACTION_MAP: Dict[TaskStateAction, TaskState] = {
  43. TaskStateAction.START: TaskState.RUNNING,
  44. TaskStateAction.PAUSE: TaskState.PAUSED,
  45. TaskStateAction.RESUME: TaskState.RUNNING,
  46. TaskStateAction.STOP: TaskState.STOPPED,
  47. }
  48. class AgentUnit:
  49. """Represents a session with an agent.
  50. Attributes:
  51. controller: The AgentController instance for controlling the agent.
  52. agent_task: The task representing the agent's execution.
  53. """
  54. sid: str
  55. controller: Optional[AgentController] = None
  56. agent_task: Optional[asyncio.Task] = None
  57. def __init__(self, sid):
  58. """Initializes a new instance of the Session class."""
  59. self.sid = sid
  60. async def send_error(self, message):
  61. """Sends an error message to the client.
  62. Args:
  63. message: The error message to send.
  64. """
  65. await session_manager.send_error(self.sid, message)
  66. async def send_message(self, message):
  67. """Sends a message to the client.
  68. Args:
  69. message: The message to send.
  70. """
  71. await session_manager.send_message(self.sid, message)
  72. async def send(self, data):
  73. """Sends data to the client.
  74. Args:
  75. data: The data to send.
  76. """
  77. await session_manager.send(self.sid, data)
  78. async def dispatch(self, action: str | None, data: dict):
  79. """Dispatches actions to the agent from the client."""
  80. if action is None:
  81. await self.send_error('Invalid action')
  82. return
  83. match action:
  84. case ActionType.INIT:
  85. await self.create_controller(data)
  86. case ActionType.START:
  87. await self.start_task(data)
  88. case ActionType.USER_MESSAGE:
  89. await self.send_user_message(data)
  90. case ActionType.CHANGE_TASK_STATE:
  91. task_state_action = data.get('args', {}).get('task_state_action', None)
  92. if task_state_action is None:
  93. await self.send_error('No task state action specified.')
  94. return
  95. await self.set_task_state(TaskStateAction(task_state_action))
  96. case ActionType.CHAT:
  97. if self.controller is None:
  98. await self.send_error('No agent started. Please wait a second...')
  99. return
  100. self.controller.add_history(
  101. NullAction(), UserMessageObservation(data['message'])
  102. )
  103. case _:
  104. await self.send_error("I didn't recognize this action:" + action)
  105. def get_arg_or_default(self, _args: dict, key: ConfigType) -> str:
  106. """Gets an argument from the args dictionary or the default value.
  107. Args:
  108. _args: The args dictionary.
  109. key: The key to get.
  110. Returns:
  111. The value of the key or the default value.
  112. """
  113. return _args.get(key, config.get(key))
  114. async def create_controller(self, start_event: dict):
  115. """Creates an AgentController instance.
  116. Args:
  117. start_event: The start event data (optional).
  118. """
  119. args = {
  120. key: value
  121. for key, value in start_event.get('args', {}).items()
  122. if value != ''
  123. } # remove empty values, prevent FE from sending empty strings
  124. agent_cls = self.get_arg_or_default(args, ConfigType.AGENT)
  125. model = self.get_arg_or_default(args, ConfigType.LLM_MODEL)
  126. api_key = self.get_arg_or_default(args, ConfigType.LLM_API_KEY)
  127. api_base = config.get(ConfigType.LLM_BASE_URL)
  128. max_iterations = self.get_arg_or_default(args, ConfigType.MAX_ITERATIONS)
  129. max_chars = self.get_arg_or_default(args, ConfigType.MAX_CHARS)
  130. logger.info(f'Creating agent {agent_cls} using LLM {model}')
  131. llm = LLM(model=model, api_key=api_key, base_url=api_base)
  132. try:
  133. self.controller = AgentController(
  134. sid=self.sid,
  135. agent=Agent.get_cls(agent_cls)(llm),
  136. max_iterations=int(max_iterations),
  137. max_chars=int(max_chars),
  138. callbacks=[self.on_agent_event],
  139. )
  140. except Exception as e:
  141. logger.exception(f'Error creating controller: {e}')
  142. await self.send_error(
  143. 'Error creating controller. Please check Docker is running and visit `https://opendevin.github.io/OpenDevin/modules/usage/troubleshooting` for more debugging information..'
  144. )
  145. return
  146. await self.init_done()
  147. async def init_done(self):
  148. if self.controller is None:
  149. await self.send_error('No agent started.')
  150. return
  151. await self.send(
  152. {
  153. 'action': ActionType.INIT,
  154. 'message': 'Control loop started.',
  155. }
  156. )
  157. await self.controller.notify_task_state_changed()
  158. async def start_task(self, start_event):
  159. """Starts a task for the agent.
  160. Args:
  161. start_event: The start event data.
  162. """
  163. task = start_event['args']['task']
  164. if self.controller is None:
  165. await self.send_error('No agent started. Please wait a second...')
  166. return
  167. try:
  168. if self.agent_task:
  169. self.agent_task.cancel()
  170. self.agent_task = asyncio.create_task(
  171. self.controller.start(task), name='agent start task loop'
  172. )
  173. except Exception as e:
  174. await self.send_error(f'Error during task loop: {e}')
  175. async def send_user_message(self, data: dict):
  176. if not self.agent_task or not self.controller:
  177. await self.send_error('No agent started.')
  178. return
  179. await self.controller.add_user_message(
  180. UserMessageObservation(data['args']['message'])
  181. )
  182. async def set_task_state(self, new_state_action: TaskStateAction):
  183. """Sets the state of the agent task."""
  184. if self.controller is None:
  185. await self.send_error('No agent started.')
  186. return
  187. cur_state = self.controller.get_task_state()
  188. new_state = TASK_STATE_ACTION_MAP.get(new_state_action)
  189. if new_state is None:
  190. await self.send_error('Invalid task state action.')
  191. return
  192. if cur_state in VALID_TASK_STATE_MAP.get(new_state_action, []):
  193. await self.controller.set_task_state_to(new_state)
  194. elif cur_state in IGNORED_TASK_STATE_MAP.get(new_state_action, []):
  195. # notify once again.
  196. await self.controller.notify_task_state_changed()
  197. return
  198. else:
  199. await self.send_error('Current task state not recognized.')
  200. return
  201. if new_state_action == TaskStateAction.RESUME:
  202. if self.agent_task:
  203. self.agent_task.cancel()
  204. self.agent_task = asyncio.create_task(
  205. self.controller.resume(), name='agent resume task loop'
  206. )
  207. async def on_agent_event(self, event: Observation | Action):
  208. """Callback function for agent events.
  209. Args:
  210. event: The agent event (Observation or Action).
  211. """
  212. if isinstance(event, NullAction):
  213. return
  214. if isinstance(event, NullObservation):
  215. return
  216. await self.send(event.to_dict())
  217. def close(self):
  218. if self.agent_task:
  219. self.agent_task.cancel()
  220. if self.controller is not None:
  221. self.controller.action_manager.sandbox.close()