runtime.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import asyncio
  2. import os
  3. import tempfile
  4. import uuid
  5. from typing import Any, Optional
  6. from zipfile import ZipFile
  7. import aiohttp
  8. import docker
  9. import tenacity
  10. from opendevin.core.config import AppConfig
  11. from opendevin.core.logger import opendevin_logger as logger
  12. from opendevin.events import EventSource, EventStream
  13. from opendevin.events.action import (
  14. BrowseInteractiveAction,
  15. BrowseURLAction,
  16. CmdRunAction,
  17. FileReadAction,
  18. FileWriteAction,
  19. IPythonRunCellAction,
  20. )
  21. from opendevin.events.action.action import Action
  22. from opendevin.events.event import Event
  23. from opendevin.events.observation import (
  24. ErrorObservation,
  25. NullObservation,
  26. Observation,
  27. )
  28. from opendevin.events.serialization import event_to_dict, observation_from_dict
  29. from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
  30. from opendevin.runtime.plugins import PluginRequirement
  31. from opendevin.runtime.runtime import Runtime
  32. from opendevin.runtime.utils import find_available_tcp_port
  33. from opendevin.runtime.utils.runtime_build import build_runtime_image
  34. class EventStreamRuntime(Runtime):
  35. """This runtime will subscribe the event stream.
  36. When receive an event, it will send the event to od-runtime-client which run inside the docker environment.
  37. """
  38. container_name_prefix = 'opendevin-sandbox-'
  39. def __init__(
  40. self,
  41. config: AppConfig,
  42. event_stream: EventStream,
  43. sid: str = 'default',
  44. plugins: list[PluginRequirement] | None = None,
  45. container_image: str | None = None,
  46. ):
  47. super().__init__(
  48. config, event_stream, sid, plugins
  49. ) # will initialize the event stream
  50. self._port = find_available_tcp_port()
  51. self.api_url = f'http://localhost:{self._port}'
  52. self.session: Optional[aiohttp.ClientSession] = None
  53. self.instance_id = (
  54. sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
  55. )
  56. # TODO: We can switch to aiodocker when `get_od_sandbox_image` is updated to use aiodocker
  57. self.docker_client: docker.DockerClient = self._init_docker_client()
  58. self.container_image = (
  59. self.config.sandbox.container_image
  60. if container_image is None
  61. else container_image
  62. )
  63. self.container_name = self.container_name_prefix + self.instance_id
  64. self.plugins = plugins if plugins is not None else []
  65. self.container = None
  66. self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time
  67. async def ainit(self, env_vars: dict[str, str] | None = None):
  68. self.container_image = build_runtime_image(
  69. self.container_image,
  70. self.docker_client,
  71. # NOTE: You can need set DEBUG=true to update the source code
  72. # inside the container. This is useful when you want to test/debug the
  73. # latest code in the runtime docker container.
  74. update_source_code=self.config.sandbox.update_source_code,
  75. )
  76. self.container = await self._init_container(
  77. self.sandbox_workspace_dir,
  78. mount_dir=self.config.workspace_mount_path,
  79. plugins=self.plugins,
  80. )
  81. # MUST call super().ainit() to initialize both default env vars
  82. # AND the ones in env vars!
  83. await super().ainit(env_vars)
  84. @staticmethod
  85. def _init_docker_client() -> docker.DockerClient:
  86. try:
  87. return docker.from_env()
  88. except Exception as ex:
  89. logger.error(
  90. 'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
  91. )
  92. raise ex
  93. @tenacity.retry(
  94. stop=tenacity.stop_after_attempt(5),
  95. wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
  96. )
  97. async def _init_container(
  98. self,
  99. sandbox_workspace_dir: str,
  100. mount_dir: str | None = None,
  101. plugins: list[PluginRequirement] | None = None,
  102. ):
  103. try:
  104. logger.info(
  105. f'Starting container with image: {self.container_image} and name: {self.container_name}'
  106. )
  107. if plugins is None:
  108. plugins = []
  109. plugin_names = ' '.join([plugin.name for plugin in plugins])
  110. network_mode: str | None = None
  111. port_mapping: dict[str, int] | None = None
  112. if self.config.sandbox.use_host_network:
  113. network_mode = 'host'
  114. logger.warn(
  115. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
  116. )
  117. else:
  118. port_mapping = {f'{self._port}/tcp': self._port}
  119. if mount_dir is not None:
  120. volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
  121. else:
  122. logger.warn(
  123. 'Mount dir is not set, will not mount the workspace directory to the container.'
  124. )
  125. volumes = None
  126. container = self.docker_client.containers.run(
  127. self.container_image,
  128. command=(
  129. f'/opendevin/miniforge3/bin/mamba run --no-capture-output -n base '
  130. 'PYTHONUNBUFFERED=1 poetry run '
  131. f'python -u -m opendevin.runtime.client.client {self._port} '
  132. f'--working-dir {sandbox_workspace_dir} '
  133. f'--plugins {plugin_names} '
  134. f'--username {"opendevin" if self.config.run_as_devin else "root"} '
  135. f'--user-id {self.config.sandbox.user_id}'
  136. ),
  137. network_mode=network_mode,
  138. ports=port_mapping,
  139. working_dir='/opendevin/code/',
  140. name=self.container_name,
  141. detach=True,
  142. environment={'DEBUG': 'true'} if self.config.debug else None,
  143. volumes=volumes,
  144. )
  145. logger.info(f'Container started. Server url: {self.api_url}')
  146. return container
  147. except Exception as e:
  148. logger.error('Failed to start container')
  149. logger.exception(e)
  150. await self.close(close_client=False)
  151. raise e
  152. async def _ensure_session(self):
  153. if self.session is None or self.session.closed:
  154. self.session = aiohttp.ClientSession()
  155. return self.session
  156. @tenacity.retry(
  157. stop=tenacity.stop_after_attempt(10),
  158. wait=tenacity.wait_exponential(multiplier=2, min=4, max=600),
  159. )
  160. async def _wait_until_alive(self):
  161. async with aiohttp.ClientSession() as session:
  162. async with session.get(f'{self.api_url}/alive') as response:
  163. if response.status == 200:
  164. return
  165. else:
  166. logger.error(
  167. f'Action execution API is not alive. Response: {response}'
  168. )
  169. raise RuntimeError(
  170. f'Action execution API is not alive. Response: {response}'
  171. )
  172. @property
  173. def sandbox_workspace_dir(self):
  174. return self.config.workspace_mount_path_in_sandbox
  175. async def close(self, close_client: bool = True):
  176. if self.session is not None and not self.session.closed:
  177. await self.session.close()
  178. containers = self.docker_client.containers.list(all=True)
  179. for container in containers:
  180. try:
  181. if container.name.startswith(self.container_name_prefix):
  182. logs = container.logs(tail=1000).decode('utf-8')
  183. logger.debug(
  184. f'==== Container logs ====\n{logs}\n==== End of container logs ===='
  185. )
  186. container.remove(force=True)
  187. except docker.errors.NotFound:
  188. pass
  189. if close_client:
  190. self.docker_client.close()
  191. async def copy_to(
  192. self, host_src: str, sandbox_dest: str, recursive: bool = False
  193. ) -> dict[str, Any]:
  194. if not os.path.exists(host_src):
  195. raise FileNotFoundError(f'Source file {host_src} does not exist')
  196. session = await self._ensure_session()
  197. await self._wait_until_alive()
  198. try:
  199. if recursive:
  200. # For recursive copy, create a zip file
  201. with tempfile.NamedTemporaryFile(
  202. suffix='.zip', delete=False
  203. ) as temp_zip:
  204. temp_zip_path = temp_zip.name
  205. with ZipFile(temp_zip_path, 'w') as zipf:
  206. for root, _, files in os.walk(host_src):
  207. for file in files:
  208. file_path = os.path.join(root, file)
  209. arcname = os.path.relpath(
  210. file_path, os.path.dirname(host_src)
  211. )
  212. zipf.write(file_path, arcname)
  213. upload_data = {'file': open(temp_zip_path, 'rb')}
  214. else:
  215. # For single file copy
  216. upload_data = {'file': open(host_src, 'rb')}
  217. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  218. async with session.post(
  219. f'{self.api_url}/upload_file', data=upload_data, params=params
  220. ) as response:
  221. if response.status == 200:
  222. return await response.json()
  223. else:
  224. error_message = await response.text()
  225. raise Exception(f'Copy operation failed: {error_message}')
  226. except asyncio.TimeoutError:
  227. raise TimeoutError('Copy operation timed out')
  228. except Exception as e:
  229. raise RuntimeError(f'Copy operation failed: {str(e)}')
  230. finally:
  231. if recursive:
  232. os.unlink(temp_zip_path)
  233. async def on_event(self, event: Event) -> None:
  234. logger.info(f'EventStreamRuntime: on_event triggered: {event}')
  235. if isinstance(event, Action):
  236. logger.info(event, extra={'msg_type': 'ACTION'})
  237. observation = await self.run_action(event)
  238. observation._cause = event.id # type: ignore[attr-defined]
  239. logger.info(observation, extra={'msg_type': 'OBSERVATION'})
  240. source = event.source if event.source else EventSource.AGENT
  241. await self.event_stream.add_event(observation, source)
  242. async def run_action(self, action: Action, timeout: int = 600) -> Observation:
  243. async with self.action_semaphore:
  244. if not action.runnable:
  245. return NullObservation('')
  246. action_type = action.action # type: ignore[attr-defined]
  247. if action_type not in ACTION_TYPE_TO_CLASS:
  248. return ErrorObservation(f'Action {action_type} does not exist.')
  249. if not hasattr(self, action_type):
  250. return ErrorObservation(
  251. f'Action {action_type} is not supported in the current runtime.'
  252. )
  253. session = await self._ensure_session()
  254. await self._wait_until_alive()
  255. try:
  256. async with session.post(
  257. f'{self.api_url}/execute_action',
  258. json={'action': event_to_dict(action)},
  259. timeout=timeout,
  260. ) as response:
  261. if response.status == 200:
  262. output = await response.json()
  263. obs = observation_from_dict(output)
  264. obs._cause = action.id # type: ignore[attr-defined]
  265. return obs
  266. else:
  267. error_message = await response.text()
  268. logger.error(f'Error from server: {error_message}')
  269. obs = ErrorObservation(
  270. f'Command execution failed: {error_message}'
  271. )
  272. except asyncio.TimeoutError:
  273. logger.error('No response received within the timeout period.')
  274. obs = ErrorObservation('Command execution timed out')
  275. except Exception as e:
  276. logger.error(f'Error during command execution: {e}')
  277. obs = ErrorObservation(f'Command execution failed: {str(e)}')
  278. return obs
  279. async def run(self, action: CmdRunAction) -> Observation:
  280. return await self.run_action(action)
  281. async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  282. return await self.run_action(action)
  283. async def read(self, action: FileReadAction) -> Observation:
  284. return await self.run_action(action)
  285. async def write(self, action: FileWriteAction) -> Observation:
  286. return await self.run_action(action)
  287. async def browse(self, action: BrowseURLAction) -> Observation:
  288. return await self.run_action(action)
  289. async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  290. return await self.run_action(action)
  291. ############################################################################
  292. # Keep the same with other runtimes
  293. ############################################################################
  294. def get_working_directory(self):
  295. raise NotImplementedError(
  296. 'This method is not implemented in the runtime client.'
  297. )