runtime.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. import asyncio
  2. import os
  3. import tempfile
  4. import uuid
  5. from zipfile import ZipFile
  6. import aiohttp
  7. import docker
  8. import tenacity
  9. from opendevin.core.config import AppConfig
  10. from opendevin.core.logger import opendevin_logger as logger
  11. from opendevin.events import EventStream
  12. from opendevin.events.action import (
  13. BrowseInteractiveAction,
  14. BrowseURLAction,
  15. CmdRunAction,
  16. FileReadAction,
  17. FileWriteAction,
  18. IPythonRunCellAction,
  19. )
  20. from opendevin.events.action.action import Action
  21. from opendevin.events.observation import (
  22. ErrorObservation,
  23. NullObservation,
  24. Observation,
  25. )
  26. from opendevin.events.serialization import event_to_dict, observation_from_dict
  27. from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
  28. from opendevin.runtime.builder import DockerRuntimeBuilder
  29. from opendevin.runtime.plugins import PluginRequirement
  30. from opendevin.runtime.runtime import Runtime
  31. from opendevin.runtime.utils import find_available_tcp_port
  32. from opendevin.runtime.utils.runtime_build import build_runtime_image
  33. class EventStreamRuntime(Runtime):
  34. """This runtime will subscribe the event stream.
  35. When receive an event, it will send the event to od-runtime-client which run inside the docker environment.
  36. """
  37. container_name_prefix = 'opendevin-sandbox-'
  38. def __init__(
  39. self,
  40. config: AppConfig,
  41. event_stream: EventStream,
  42. sid: str = 'default',
  43. plugins: list[PluginRequirement] | None = None,
  44. container_image: str | None = None,
  45. ):
  46. super().__init__(
  47. config, event_stream, sid, plugins
  48. ) # will initialize the event stream
  49. self._port = find_available_tcp_port()
  50. self.api_url = f'http://{self.config.sandbox.api_hostname}:{self._port}'
  51. self.session: aiohttp.ClientSession | None = None
  52. self.instance_id = (
  53. sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
  54. )
  55. # TODO: We can switch to aiodocker when `get_od_sandbox_image` is updated to use aiodocker
  56. self.docker_client: docker.DockerClient = self._init_docker_client()
  57. self.container_image = (
  58. self.config.sandbox.container_image
  59. if container_image is None
  60. else container_image
  61. )
  62. self.container_name = self.container_name_prefix + self.instance_id
  63. self.container = None
  64. self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time
  65. self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
  66. logger.debug(f'EventStreamRuntime `{sid}` config:\n{self.config}')
  67. async def ainit(self, env_vars: dict[str, str] | None = None):
  68. if self.config.sandbox.od_runtime_extra_deps:
  69. logger.info(
  70. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.od_runtime_extra_deps}'
  71. )
  72. self.container_image = build_runtime_image(
  73. self.container_image,
  74. self.runtime_builder,
  75. extra_deps=self.config.sandbox.od_runtime_extra_deps,
  76. )
  77. self.container = await self._init_container(
  78. self.sandbox_workspace_dir,
  79. mount_dir=self.config.workspace_mount_path,
  80. plugins=self.plugins,
  81. )
  82. # MUST call super().ainit() to initialize both default env vars
  83. # AND the ones in env vars!
  84. await super().ainit(env_vars)
  85. logger.info(
  86. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
  87. )
  88. logger.info(f'Container initialized with env vars: {env_vars}')
  89. @staticmethod
  90. def _init_docker_client() -> docker.DockerClient:
  91. try:
  92. return docker.from_env()
  93. except Exception as ex:
  94. logger.error(
  95. 'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
  96. )
  97. raise ex
  98. @tenacity.retry(
  99. stop=tenacity.stop_after_attempt(5),
  100. wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
  101. )
  102. async def _init_container(
  103. self,
  104. sandbox_workspace_dir: str,
  105. mount_dir: str | None = None,
  106. plugins: list[PluginRequirement] | None = None,
  107. ):
  108. try:
  109. logger.info(
  110. f'Starting container with image: {self.container_image} and name: {self.container_name}'
  111. )
  112. plugin_arg = ''
  113. if plugins is not None and len(plugins) > 0:
  114. plugin_arg = (
  115. f'--plugins {" ".join([plugin.name for plugin in plugins])} '
  116. )
  117. network_mode: str | None = None
  118. port_mapping: dict[str, int] | None = None
  119. if self.config.sandbox.use_host_network:
  120. network_mode = 'host'
  121. logger.warn(
  122. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
  123. )
  124. else:
  125. port_mapping = {f'{self._port}/tcp': self._port}
  126. if mount_dir is not None:
  127. volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
  128. logger.info(f'Mount dir: {sandbox_workspace_dir}')
  129. else:
  130. logger.warn(
  131. 'Mount dir is not set, will not mount the workspace directory to the container.'
  132. )
  133. volumes = None
  134. if self.config.sandbox.browsergym_eval_env is not None:
  135. browsergym_arg = (
  136. f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
  137. )
  138. else:
  139. browsergym_arg = ''
  140. container = self.docker_client.containers.run(
  141. self.container_image,
  142. command=(
  143. f'/opendevin/miniforge3/bin/mamba run --no-capture-output -n base '
  144. 'PYTHONUNBUFFERED=1 poetry run '
  145. f'python -u -m opendevin.runtime.client.client {self._port} '
  146. f'--working-dir {sandbox_workspace_dir} '
  147. f'{plugin_arg}'
  148. f'--username {"opendevin" if self.config.run_as_devin else "root"} '
  149. f'--user-id {self.config.sandbox.user_id} '
  150. f'{browsergym_arg}'
  151. ),
  152. network_mode=network_mode,
  153. ports=port_mapping,
  154. working_dir='/opendevin/code/',
  155. name=self.container_name,
  156. detach=True,
  157. environment={'DEBUG': 'true'} if self.config.debug else None,
  158. volumes=volumes,
  159. )
  160. logger.info(f'Container started. Server url: {self.api_url}')
  161. return container
  162. except Exception as e:
  163. logger.error('Failed to start container')
  164. logger.exception(e)
  165. await self.close(close_client=False)
  166. raise e
  167. async def _ensure_session(self):
  168. await asyncio.sleep(1)
  169. if self.session is None or self.session.closed:
  170. self.session = aiohttp.ClientSession()
  171. return self.session
  172. @tenacity.retry(
  173. stop=tenacity.stop_after_attempt(10),
  174. wait=tenacity.wait_exponential(multiplier=2, min=10, max=60),
  175. )
  176. async def _wait_until_alive(self):
  177. logger.info('Reconnecting session')
  178. container = self.docker_client.containers.get(self.container_name)
  179. # get logs
  180. _logs = container.logs(tail=10).decode('utf-8').split('\n')
  181. # add indent
  182. _logs = '\n'.join([f' |{log}' for log in _logs])
  183. logger.info(
  184. '\n'
  185. + '-' * 30
  186. + 'Container logs (last 10 lines):'
  187. + '-' * 30
  188. + f'\n{_logs}'
  189. + '\n'
  190. + '-' * 90
  191. )
  192. async with aiohttp.ClientSession() as session:
  193. async with session.get(f'{self.api_url}/alive') as response:
  194. if response.status == 200:
  195. return
  196. else:
  197. msg = f'Action execution API is not alive. Response: {response}'
  198. logger.error(msg)
  199. raise RuntimeError(msg)
  200. @property
  201. def sandbox_workspace_dir(self):
  202. return self.config.workspace_mount_path_in_sandbox
  203. async def close(self, close_client: bool = True):
  204. if self.session is not None and not self.session.closed:
  205. await self.session.close()
  206. containers = self.docker_client.containers.list(all=True)
  207. for container in containers:
  208. try:
  209. if container.name.startswith(self.container_name_prefix):
  210. logs = container.logs(tail=1000).decode('utf-8')
  211. logger.debug(
  212. f'==== Container logs ====\n{logs}\n==== End of container logs ===='
  213. )
  214. container.remove(force=True)
  215. except docker.errors.NotFound:
  216. pass
  217. if close_client:
  218. self.docker_client.close()
  219. async def run_action(self, action: Action) -> Observation:
  220. # set timeout to default if not set
  221. if action.timeout is None:
  222. action.timeout = self.config.sandbox.timeout
  223. async with self.action_semaphore:
  224. if not action.runnable:
  225. return NullObservation('')
  226. action_type = action.action # type: ignore[attr-defined]
  227. if action_type not in ACTION_TYPE_TO_CLASS:
  228. return ErrorObservation(f'Action {action_type} does not exist.')
  229. if not hasattr(self, action_type):
  230. return ErrorObservation(
  231. f'Action {action_type} is not supported in the current runtime.'
  232. )
  233. logger.info('Awaiting session')
  234. session = await self._ensure_session()
  235. await self._wait_until_alive()
  236. assert action.timeout is not None
  237. try:
  238. logger.info('Executing command')
  239. async with session.post(
  240. f'{self.api_url}/execute_action',
  241. json={'action': event_to_dict(action)},
  242. timeout=action.timeout,
  243. ) as response:
  244. if response.status == 200:
  245. output = await response.json()
  246. obs = observation_from_dict(output)
  247. obs._cause = action.id # type: ignore[attr-defined]
  248. return obs
  249. else:
  250. error_message = await response.text()
  251. logger.error(f'Error from server: {error_message}')
  252. obs = ErrorObservation(
  253. f'Command execution failed: {error_message}'
  254. )
  255. except asyncio.TimeoutError:
  256. logger.error('No response received within the timeout period.')
  257. obs = ErrorObservation('Command execution timed out')
  258. except Exception as e:
  259. logger.error(f'Error during command execution: {e}')
  260. obs = ErrorObservation(f'Command execution failed: {str(e)}')
  261. return obs
  262. async def run(self, action: CmdRunAction) -> Observation:
  263. return await self.run_action(action)
  264. async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  265. return await self.run_action(action)
  266. async def read(self, action: FileReadAction) -> Observation:
  267. return await self.run_action(action)
  268. async def write(self, action: FileWriteAction) -> Observation:
  269. return await self.run_action(action)
  270. async def browse(self, action: BrowseURLAction) -> Observation:
  271. return await self.run_action(action)
  272. async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  273. return await self.run_action(action)
  274. # ====================================================================
  275. # Implement these methods (for file operations) in the subclass
  276. # ====================================================================
  277. async def copy_to(
  278. self, host_src: str, sandbox_dest: str, recursive: bool = False
  279. ) -> None:
  280. if not os.path.exists(host_src):
  281. raise FileNotFoundError(f'Source file {host_src} does not exist')
  282. session = await self._ensure_session()
  283. await self._wait_until_alive()
  284. try:
  285. if recursive:
  286. # For recursive copy, create a zip file
  287. with tempfile.NamedTemporaryFile(
  288. suffix='.zip', delete=False
  289. ) as temp_zip:
  290. temp_zip_path = temp_zip.name
  291. with ZipFile(temp_zip_path, 'w') as zipf:
  292. for root, _, files in os.walk(host_src):
  293. for file in files:
  294. file_path = os.path.join(root, file)
  295. arcname = os.path.relpath(
  296. file_path, os.path.dirname(host_src)
  297. )
  298. zipf.write(file_path, arcname)
  299. upload_data = {'file': open(temp_zip_path, 'rb')}
  300. else:
  301. # For single file copy
  302. upload_data = {'file': open(host_src, 'rb')}
  303. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  304. async with session.post(
  305. f'{self.api_url}/upload_file', data=upload_data, params=params
  306. ) as response:
  307. if response.status == 200:
  308. return
  309. else:
  310. error_message = await response.text()
  311. raise Exception(f'Copy operation failed: {error_message}')
  312. except asyncio.TimeoutError:
  313. raise TimeoutError('Copy operation timed out')
  314. except Exception as e:
  315. raise RuntimeError(f'Copy operation failed: {str(e)}')
  316. finally:
  317. if recursive:
  318. os.unlink(temp_zip_path)
  319. logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}')
  320. async def list_files(self, path: str | None = None) -> list[str]:
  321. """List files in the sandbox.
  322. If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
  323. """
  324. session = await self._ensure_session()
  325. await self._wait_until_alive()
  326. try:
  327. data = {}
  328. if path is not None:
  329. data['path'] = path
  330. async with session.post(
  331. f'{self.api_url}/list_files', json=data
  332. ) as response:
  333. if response.status == 200:
  334. response_json = await response.json()
  335. assert isinstance(response_json, list)
  336. return response_json
  337. else:
  338. error_message = await response.text()
  339. raise Exception(f'List files operation failed: {error_message}')
  340. except asyncio.TimeoutError:
  341. raise TimeoutError('List files operation timed out')
  342. except Exception as e:
  343. raise RuntimeError(f'List files operation failed: {str(e)}')