runtime.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. import asyncio
  2. import uuid
  3. from typing import Optional
  4. import aiohttp
  5. import docker
  6. import tenacity
  7. from opendevin.core.config import config
  8. from opendevin.core.logger import opendevin_logger as logger
  9. from opendevin.events import EventSource, EventStream
  10. from opendevin.events.action import (
  11. BrowseInteractiveAction,
  12. BrowseURLAction,
  13. CmdRunAction,
  14. FileReadAction,
  15. FileWriteAction,
  16. IPythonRunCellAction,
  17. )
  18. from opendevin.events.action.action import Action
  19. from opendevin.events.event import Event
  20. from opendevin.events.observation import (
  21. ErrorObservation,
  22. NullObservation,
  23. Observation,
  24. )
  25. from opendevin.events.serialization import event_to_dict, observation_from_dict
  26. from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
  27. from opendevin.runtime.plugins import (
  28. AgentSkillsRequirement,
  29. JupyterRequirement,
  30. PluginRequirement,
  31. )
  32. from opendevin.runtime.runtime import Runtime
  33. from opendevin.runtime.utils import find_available_tcp_port
  34. from opendevin.runtime.utils.runtime_build import build_runtime_image
  35. class EventStreamRuntime(Runtime):
  36. """This runtime will subscribe the event stream.
  37. When receive an event, it will send the event to od-runtime-client which run inside the docker environment.
  38. """
  39. container_name_prefix = 'opendevin-sandbox-'
  40. def __init__(
  41. self,
  42. event_stream: EventStream,
  43. sid: str = 'default',
  44. container_image: str | None = None,
  45. plugins: list[PluginRequirement] | None = None,
  46. ):
  47. super().__init__(event_stream, sid) # will initialize the event stream
  48. self._port = find_available_tcp_port()
  49. self.api_url = f'http://localhost:{self._port}'
  50. self.session: Optional[aiohttp.ClientSession] = None
  51. self.instance_id = (
  52. sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
  53. )
  54. # TODO: We can switch to aiodocker when `get_od_sandbox_image` is updated to use aiodocker
  55. self.docker_client: docker.DockerClient = self._init_docker_client()
  56. self.container_image = (
  57. config.sandbox.container_image
  58. if container_image is None
  59. else container_image
  60. )
  61. self.container_name = self.container_name_prefix + self.instance_id
  62. self.plugins = plugins if plugins is not None else []
  63. self.container = None
  64. self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time
  65. async def ainit(self):
  66. self.container_image = build_runtime_image(
  67. self.container_image,
  68. self.docker_client,
  69. # NOTE: You can need set DEBUG=true to update the source code
  70. # inside the container. This is useful when you want to test/debug the
  71. # latest code in the runtime docker container.
  72. update_source_code=config.debug,
  73. )
  74. self.container = await self._init_container(
  75. self.sandbox_workspace_dir,
  76. mount_dir=config.workspace_mount_path,
  77. plugins=self.plugins,
  78. )
  79. @staticmethod
  80. def _init_docker_client() -> docker.DockerClient:
  81. try:
  82. return docker.from_env()
  83. except Exception as ex:
  84. logger.error(
  85. 'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
  86. )
  87. raise ex
  88. @tenacity.retry(
  89. stop=tenacity.stop_after_attempt(5),
  90. wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
  91. )
  92. async def _init_container(
  93. self,
  94. sandbox_workspace_dir: str,
  95. mount_dir: str = config.workspace_mount_path,
  96. plugins: list[PluginRequirement] | None = None,
  97. ):
  98. try:
  99. logger.info(
  100. f'Starting container with image: {self.container_image} and name: {self.container_name}'
  101. )
  102. if plugins is None:
  103. plugins = []
  104. plugin_names = ' '.join([plugin.name for plugin in plugins])
  105. container = self.docker_client.containers.run(
  106. self.container_image,
  107. command=(
  108. f'/opendevin/miniforge3/bin/mamba run --no-capture-output -n base '
  109. 'PYTHONUNBUFFERED=1 poetry run '
  110. f'python -u -m opendevin.runtime.client.client {self._port} '
  111. f'--working-dir {sandbox_workspace_dir} '
  112. f'--plugins {plugin_names}'
  113. ),
  114. network_mode='host',
  115. working_dir='/opendevin/code/',
  116. name=self.container_name,
  117. detach=True,
  118. volumes={mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}},
  119. )
  120. logger.info(f'Container started. Server url: {self.api_url}')
  121. return container
  122. except Exception as e:
  123. logger.error('Failed to start container')
  124. logger.exception(e)
  125. await self.close(close_client=False)
  126. raise e
  127. async def _ensure_session(self):
  128. if self.session is None or self.session.closed:
  129. self.session = aiohttp.ClientSession()
  130. return self.session
  131. @tenacity.retry(
  132. stop=tenacity.stop_after_attempt(5),
  133. wait=tenacity.wait_exponential(multiplier=2, min=4, max=600),
  134. )
  135. async def _wait_until_alive(self):
  136. async with aiohttp.ClientSession() as session:
  137. async with session.get(f'{self.api_url}/alive') as response:
  138. if response.status == 200:
  139. return
  140. else:
  141. logger.error(
  142. f'Action execution API is not alive. Response: {response}'
  143. )
  144. raise RuntimeError(
  145. f'Action execution API is not alive. Response: {response}'
  146. )
  147. @property
  148. def sandbox_workspace_dir(self):
  149. return config.workspace_mount_path_in_sandbox
  150. async def close(self, close_client: bool = True):
  151. if self.session is not None and not self.session.closed:
  152. await self.session.close()
  153. containers = self.docker_client.containers.list(all=True)
  154. for container in containers:
  155. try:
  156. if container.name.startswith(self.container_name_prefix):
  157. logs = container.logs(tail=1000).decode('utf-8')
  158. logger.debug(
  159. f'==== Container logs ====\n{logs}\n==== End of container logs ===='
  160. )
  161. container.remove(force=True)
  162. except docker.errors.NotFound:
  163. pass
  164. if close_client:
  165. self.docker_client.close()
  166. async def on_event(self, event: Event) -> None:
  167. logger.info(f'EventStreamRuntime: on_event triggered: {event}')
  168. if isinstance(event, Action):
  169. logger.info(event, extra={'msg_type': 'ACTION'})
  170. observation = await self.run_action(event)
  171. # observation._cause = event.id # type: ignore[attr-defined]
  172. logger.info(observation, extra={'msg_type': 'OBSERVATION'})
  173. source = event.source if event.source else EventSource.AGENT
  174. await self.event_stream.add_event(observation, source)
  175. async def run_action(self, action: Action, timeout: int = 600) -> Observation:
  176. async with self.action_semaphore:
  177. if not action.runnable:
  178. return NullObservation('')
  179. action_type = action.action # type: ignore[attr-defined]
  180. if action_type not in ACTION_TYPE_TO_CLASS:
  181. return ErrorObservation(f'Action {action_type} does not exist.')
  182. if not hasattr(self, action_type):
  183. return ErrorObservation(
  184. f'Action {action_type} is not supported in the current runtime.'
  185. )
  186. session = await self._ensure_session()
  187. await self._wait_until_alive()
  188. try:
  189. async with session.post(
  190. f'{self.api_url}/execute_action',
  191. json={'action': event_to_dict(action)},
  192. timeout=timeout,
  193. ) as response:
  194. if response.status == 200:
  195. output = await response.json()
  196. obs = observation_from_dict(output)
  197. obs._cause = action.id # type: ignore[attr-defined]
  198. return obs
  199. else:
  200. error_message = await response.text()
  201. logger.error(f'Error from server: {error_message}')
  202. obs = ErrorObservation(
  203. f'Command execution failed: {error_message}'
  204. )
  205. except asyncio.TimeoutError:
  206. logger.error('No response received within the timeout period.')
  207. obs = ErrorObservation('Command execution timed out')
  208. except Exception as e:
  209. logger.error(f'Error during command execution: {e}')
  210. obs = ErrorObservation(f'Command execution failed: {str(e)}')
  211. obs._parent = action.id # type: ignore[attr-defined]
  212. return obs
  213. async def run(self, action: CmdRunAction) -> Observation:
  214. return await self.run_action(action)
  215. async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  216. return await self.run_action(action)
  217. async def read(self, action: FileReadAction) -> Observation:
  218. return await self.run_action(action)
  219. async def write(self, action: FileWriteAction) -> Observation:
  220. return await self.run_action(action)
  221. async def browse(self, action: BrowseURLAction) -> Observation:
  222. return await self.run_action(action)
  223. async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  224. return await self.run_action(action)
  225. ############################################################################
  226. # Keep the same with other runtimes
  227. ############################################################################
  228. def get_working_directory(self):
  229. raise NotImplementedError(
  230. 'This method is not implemented in the runtime client.'
  231. )
  232. ############################################################################
  233. # Initialization work inside sandbox image
  234. ############################################################################
  235. # init_runtime_tools direcctly do as what Runtime do
  236. # Do in the od_runtime_client
  237. # Overwrite the init_sandbox_plugins
  238. def init_sandbox_plugins(self, plugins: list[PluginRequirement]) -> None:
  239. pass
  240. async def test_run_command():
  241. sid = 'test'
  242. cli_session = 'main' + ('_' + sid if sid else '')
  243. event_stream = EventStream(cli_session)
  244. runtime = EventStreamRuntime(event_stream)
  245. await runtime.ainit()
  246. await runtime.run_action(CmdRunAction('ls -l'))
  247. async def test_event_stream():
  248. sid = 'test'
  249. cli_session = 'main' + ('_' + sid if sid else '')
  250. event_stream = EventStream(cli_session)
  251. runtime = EventStreamRuntime(
  252. event_stream,
  253. sid,
  254. 'ubuntu:22.04',
  255. plugins=[JupyterRequirement(), AgentSkillsRequirement()],
  256. )
  257. await runtime.ainit()
  258. # Test run command
  259. action_cmd = CmdRunAction(command='ls -l')
  260. logger.info(action_cmd, extra={'msg_type': 'ACTION'})
  261. logger.info(await runtime.run_action(action_cmd), extra={'msg_type': 'OBSERVATION'})
  262. # Test run ipython
  263. test_code = "print('Hello, `World`!\\n')"
  264. action_ipython = IPythonRunCellAction(code=test_code)
  265. logger.info(action_ipython, extra={'msg_type': 'ACTION'})
  266. logger.info(
  267. await runtime.run_action(action_ipython), extra={'msg_type': 'OBSERVATION'}
  268. )
  269. # Test read file (file should not exist)
  270. action_read = FileReadAction(path='hello.sh')
  271. logger.info(action_read, extra={'msg_type': 'ACTION'})
  272. logger.info(
  273. await runtime.run_action(action_read), extra={'msg_type': 'OBSERVATION'}
  274. )
  275. # Test write file
  276. action_write = FileWriteAction(content='echo "Hello, World!"', path='hello.sh')
  277. logger.info(action_write, extra={'msg_type': 'ACTION'})
  278. logger.info(
  279. await runtime.run_action(action_write), extra={'msg_type': 'OBSERVATION'}
  280. )
  281. # Test read file (file should exist)
  282. action_read = FileReadAction(path='hello.sh')
  283. logger.info(action_read, extra={'msg_type': 'ACTION'})
  284. logger.info(
  285. await runtime.run_action(action_read), extra={'msg_type': 'OBSERVATION'}
  286. )
  287. # Test browse
  288. action_browse = BrowseURLAction(url='https://google.com')
  289. logger.info(action_browse, extra={'msg_type': 'ACTION'})
  290. logger.info(
  291. await runtime.run_action(action_browse), extra={'msg_type': 'OBSERVATION'}
  292. )
  293. await runtime.close()
  294. if __name__ == '__main__':
  295. asyncio.run(test_event_stream())