eventstream_runtime.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. import atexit
  2. import os
  3. import tempfile
  4. import threading
  5. from functools import lru_cache
  6. from pathlib import Path
  7. from typing import Callable
  8. from zipfile import ZipFile
  9. import docker
  10. import requests
  11. import tenacity
  12. from openhands.core.config import AppConfig
  13. from openhands.core.logger import DEBUG
  14. from openhands.core.logger import openhands_logger as logger
  15. from openhands.events import EventStream
  16. from openhands.events.action import (
  17. ActionConfirmationStatus,
  18. BrowseInteractiveAction,
  19. BrowseURLAction,
  20. CmdRunAction,
  21. FileEditAction,
  22. FileReadAction,
  23. FileWriteAction,
  24. IPythonRunCellAction,
  25. )
  26. from openhands.events.action.action import Action
  27. from openhands.events.observation import (
  28. ErrorObservation,
  29. NullObservation,
  30. Observation,
  31. UserRejectObservation,
  32. )
  33. from openhands.events.serialization import event_to_dict, observation_from_dict
  34. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  35. from openhands.runtime.base import (
  36. Runtime,
  37. RuntimeDisconnectedError,
  38. RuntimeNotFoundError,
  39. )
  40. from openhands.runtime.builder import DockerRuntimeBuilder
  41. from openhands.runtime.impl.eventstream.containers import remove_all_containers
  42. from openhands.runtime.plugins import PluginRequirement
  43. from openhands.runtime.utils import find_available_tcp_port
  44. from openhands.runtime.utils.log_streamer import LogStreamer
  45. from openhands.runtime.utils.request import send_request
  46. from openhands.runtime.utils.runtime_build import build_runtime_image
  47. from openhands.utils.async_utils import call_sync_from_async
  48. from openhands.utils.tenacity_stop import stop_if_should_exit
  49. CONTAINER_NAME_PREFIX = 'openhands-runtime-'
  50. def remove_all_runtime_containers():
  51. remove_all_containers(CONTAINER_NAME_PREFIX)
  52. atexit.register(remove_all_runtime_containers)
  53. class EventStreamRuntime(Runtime):
  54. """This runtime will subscribe the event stream.
  55. When receive an event, it will send the event to runtime-client which run inside the docker environment.
  56. Args:
  57. config (AppConfig): The application configuration.
  58. event_stream (EventStream): The event stream to subscribe to.
  59. sid (str, optional): The session ID. Defaults to 'default'.
  60. plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
  61. env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
  62. """
  63. # Need to provide this method to allow inheritors to init the Runtime
  64. # without initting the EventStreamRuntime.
  65. def init_base_runtime(
  66. self,
  67. config: AppConfig,
  68. event_stream: EventStream,
  69. sid: str = 'default',
  70. plugins: list[PluginRequirement] | None = None,
  71. env_vars: dict[str, str] | None = None,
  72. status_callback: Callable | None = None,
  73. attach_to_existing: bool = False,
  74. headless_mode: bool = True,
  75. ):
  76. super().__init__(
  77. config,
  78. event_stream,
  79. sid,
  80. plugins,
  81. env_vars,
  82. status_callback,
  83. attach_to_existing,
  84. headless_mode,
  85. )
  86. def __init__(
  87. self,
  88. config: AppConfig,
  89. event_stream: EventStream,
  90. sid: str = 'default',
  91. plugins: list[PluginRequirement] | None = None,
  92. env_vars: dict[str, str] | None = None,
  93. status_callback: Callable | None = None,
  94. attach_to_existing: bool = False,
  95. headless_mode: bool = True,
  96. ):
  97. self.config = config
  98. self._host_port = 30000 # initial dummy value
  99. self._container_port = 30001 # initial dummy value
  100. self._vscode_url: str | None = None # initial dummy value
  101. self._runtime_initialized: bool = False
  102. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  103. self.session = requests.Session()
  104. self.status_callback = status_callback
  105. self.docker_client: docker.DockerClient = self._init_docker_client()
  106. self.base_container_image = self.config.sandbox.base_container_image
  107. self.runtime_container_image = self.config.sandbox.runtime_container_image
  108. self.container_name = CONTAINER_NAME_PREFIX + sid
  109. self.container = None
  110. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  111. self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
  112. # Buffer for container logs
  113. self.log_streamer: LogStreamer | None = None
  114. self.init_base_runtime(
  115. config,
  116. event_stream,
  117. sid,
  118. plugins,
  119. env_vars,
  120. status_callback,
  121. attach_to_existing,
  122. headless_mode,
  123. )
  124. # Log runtime_extra_deps after base class initialization so self.sid is available
  125. if self.config.sandbox.runtime_extra_deps:
  126. self.log(
  127. 'debug',
  128. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  129. )
  130. async def connect(self):
  131. self.send_status_message('STATUS$STARTING_RUNTIME')
  132. try:
  133. await call_sync_from_async(self._attach_to_container)
  134. except docker.errors.NotFound as e:
  135. if self.attach_to_existing:
  136. self.log(
  137. 'error',
  138. f'Container {self.container_name} not found.',
  139. )
  140. raise e
  141. if self.runtime_container_image is None:
  142. if self.base_container_image is None:
  143. raise ValueError(
  144. 'Neither runtime container image nor base container image is set'
  145. )
  146. self.send_status_message('STATUS$STARTING_CONTAINER')
  147. self.runtime_container_image = build_runtime_image(
  148. self.base_container_image,
  149. self.runtime_builder,
  150. platform=self.config.sandbox.platform,
  151. extra_deps=self.config.sandbox.runtime_extra_deps,
  152. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  153. extra_build_args=self.config.sandbox.runtime_extra_build_args,
  154. )
  155. self.log(
  156. 'info', f'Starting runtime with image: {self.runtime_container_image}'
  157. )
  158. await call_sync_from_async(self._init_container)
  159. self.log(
  160. 'info',
  161. f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
  162. )
  163. self.log_streamer = LogStreamer(self.container, self.log)
  164. if not self.attach_to_existing:
  165. self.log('info', f'Waiting for client to become ready at {self.api_url}...')
  166. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  167. await call_sync_from_async(self._wait_until_alive)
  168. if not self.attach_to_existing:
  169. self.log('info', 'Runtime is ready.')
  170. if not self.attach_to_existing:
  171. await call_sync_from_async(self.setup_initial_env)
  172. self.log(
  173. 'debug',
  174. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}. VSCode URL: {self.vscode_url}',
  175. )
  176. if not self.attach_to_existing:
  177. self.send_status_message(' ')
  178. self._runtime_initialized = True
  179. @staticmethod
  180. @lru_cache(maxsize=1)
  181. def _init_docker_client() -> docker.DockerClient:
  182. try:
  183. return docker.from_env()
  184. except Exception as ex:
  185. logger.error(
  186. 'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
  187. )
  188. raise ex
  189. def _init_container(self):
  190. self.log('debug', 'Preparing to start container...')
  191. self.send_status_message('STATUS$PREPARING_CONTAINER')
  192. plugin_arg = ''
  193. if self.plugins is not None and len(self.plugins) > 0:
  194. plugin_arg = (
  195. f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
  196. )
  197. self._host_port = self._find_available_port()
  198. self._container_port = (
  199. self._host_port
  200. ) # in future this might differ from host port
  201. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  202. use_host_network = self.config.sandbox.use_host_network
  203. network_mode: str | None = 'host' if use_host_network else None
  204. port_mapping: dict[str, list[dict[str, str]]] | None = (
  205. None
  206. if use_host_network
  207. else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
  208. )
  209. if use_host_network:
  210. self.log(
  211. 'warn',
  212. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
  213. )
  214. # Combine environment variables
  215. environment = {
  216. 'port': str(self._container_port),
  217. 'PYTHONUNBUFFERED': 1,
  218. }
  219. if self.config.debug or DEBUG:
  220. environment['DEBUG'] = 'true'
  221. if self.vscode_enabled:
  222. # vscode is on port +1 from container port
  223. if isinstance(port_mapping, dict):
  224. port_mapping[f'{self._container_port + 1}/tcp'] = [
  225. {'HostPort': str(self._host_port + 1)}
  226. ]
  227. self.log('debug', f'Workspace Base: {self.config.workspace_base}')
  228. if (
  229. self.config.workspace_mount_path is not None
  230. and self.config.workspace_mount_path_in_sandbox is not None
  231. ):
  232. # e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
  233. volumes = {
  234. self.config.workspace_mount_path: {
  235. 'bind': self.config.workspace_mount_path_in_sandbox,
  236. 'mode': 'rw',
  237. }
  238. }
  239. logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
  240. else:
  241. logger.debug(
  242. 'Mount dir is not set, will not mount the workspace directory to the container'
  243. )
  244. volumes = None
  245. self.log(
  246. 'debug',
  247. f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
  248. )
  249. if self.config.sandbox.browsergym_eval_env is not None:
  250. browsergym_arg = (
  251. f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
  252. )
  253. else:
  254. browsergym_arg = ''
  255. try:
  256. self.container = self.docker_client.containers.run(
  257. self.runtime_container_image,
  258. command=(
  259. f'/openhands/micromamba/bin/micromamba run -n openhands '
  260. f'poetry run '
  261. f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
  262. f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
  263. f'{plugin_arg}'
  264. f'--username {"openhands" if self.config.run_as_openhands else "root"} '
  265. f'--user-id {self.config.sandbox.user_id} '
  266. f'{browsergym_arg}'
  267. ),
  268. network_mode=network_mode,
  269. ports=port_mapping,
  270. working_dir='/openhands/code/', # do not change this!
  271. name=self.container_name,
  272. detach=True,
  273. environment=environment,
  274. volumes=volumes,
  275. )
  276. self.log('debug', f'Container started. Server url: {self.api_url}')
  277. self.send_status_message('STATUS$CONTAINER_STARTED')
  278. except docker.errors.APIError as e:
  279. if '409' in str(e):
  280. self.log(
  281. 'warning',
  282. f'Container {self.container_name} already exists. Removing...',
  283. )
  284. remove_all_containers(self.container_name)
  285. return self._init_container()
  286. else:
  287. self.log(
  288. 'error',
  289. f'Error: Instance {self.container_name} FAILED to start container!\n',
  290. )
  291. except Exception as e:
  292. self.log(
  293. 'error',
  294. f'Error: Instance {self.container_name} FAILED to start container!\n',
  295. )
  296. self.log('error', str(e))
  297. self.close()
  298. raise e
  299. def _attach_to_container(self):
  300. self._container_port = 0
  301. self.container = self.docker_client.containers.get(self.container_name)
  302. for port in self.container.attrs['NetworkSettings']['Ports']: # type: ignore
  303. self._container_port = int(port.split('/')[0])
  304. break
  305. self._host_port = self._container_port
  306. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  307. self.log(
  308. 'debug',
  309. f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
  310. )
  311. @tenacity.retry(
  312. stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
  313. retry=tenacity.retry_if_exception_type(
  314. (ConnectionError, requests.exceptions.ConnectionError)
  315. ),
  316. reraise=True,
  317. wait=tenacity.wait_fixed(2),
  318. )
  319. def _wait_until_alive(self):
  320. try:
  321. container = self.docker_client.containers.get(self.container_name)
  322. if container.status == 'exited':
  323. raise RuntimeDisconnectedError(
  324. f'Container {self.container_name} has exited.'
  325. )
  326. except docker.errors.NotFound:
  327. raise RuntimeNotFoundError(f'Container {self.container_name} not found.')
  328. if not self.log_streamer:
  329. raise RuntimeError('Runtime client is not ready.')
  330. with send_request(
  331. self.session,
  332. 'GET',
  333. f'{self.api_url}/alive',
  334. timeout=5,
  335. ):
  336. pass
  337. def close(self, rm_all_containers: bool | None = None):
  338. """Closes the EventStreamRuntime and associated objects
  339. Parameters:
  340. - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
  341. """
  342. if self.log_streamer:
  343. self.log_streamer.close()
  344. if self.session:
  345. self.session.close()
  346. if rm_all_containers is None:
  347. rm_all_containers = self.config.sandbox.rm_all_containers
  348. if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
  349. return
  350. close_prefix = (
  351. CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
  352. )
  353. remove_all_containers(close_prefix)
  354. def run_action(self, action: Action) -> Observation:
  355. if isinstance(action, FileEditAction):
  356. return self.edit(action)
  357. # set timeout to default if not set
  358. if action.timeout is None:
  359. action.timeout = self.config.sandbox.timeout
  360. with self.action_semaphore:
  361. if not action.runnable:
  362. return NullObservation('')
  363. if (
  364. hasattr(action, 'confirmation_state')
  365. and action.confirmation_state
  366. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  367. ):
  368. return NullObservation('')
  369. action_type = action.action # type: ignore[attr-defined]
  370. if action_type not in ACTION_TYPE_TO_CLASS:
  371. raise ValueError(f'Action {action_type} does not exist.')
  372. if not hasattr(self, action_type):
  373. return ErrorObservation(
  374. f'Action {action_type} is not supported in the current runtime.',
  375. error_id='AGENT_ERROR$BAD_ACTION',
  376. )
  377. if (
  378. getattr(action, 'confirmation_state', None)
  379. == ActionConfirmationStatus.REJECTED
  380. ):
  381. return UserRejectObservation(
  382. 'Action has been rejected by the user! Waiting for further user input.'
  383. )
  384. assert action.timeout is not None
  385. try:
  386. with send_request(
  387. self.session,
  388. 'POST',
  389. f'{self.api_url}/execute_action',
  390. json={'action': event_to_dict(action)},
  391. # wait a few more seconds to get the timeout error from client side
  392. timeout=action.timeout + 5,
  393. ) as response:
  394. output = response.json()
  395. obs = observation_from_dict(output)
  396. obs._cause = action.id # type: ignore[attr-defined]
  397. except requests.Timeout:
  398. raise RuntimeError(
  399. f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
  400. )
  401. return obs
  402. def run(self, action: CmdRunAction) -> Observation:
  403. return self.run_action(action)
  404. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  405. return self.run_action(action)
  406. def read(self, action: FileReadAction) -> Observation:
  407. return self.run_action(action)
  408. def write(self, action: FileWriteAction) -> Observation:
  409. return self.run_action(action)
  410. def browse(self, action: BrowseURLAction) -> Observation:
  411. return self.run_action(action)
  412. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  413. return self.run_action(action)
  414. # ====================================================================
  415. # Implement these methods (for file operations) in the subclass
  416. # ====================================================================
  417. def copy_to(
  418. self, host_src: str, sandbox_dest: str, recursive: bool = False
  419. ) -> None:
  420. if not os.path.exists(host_src):
  421. raise FileNotFoundError(f'Source file {host_src} does not exist')
  422. try:
  423. if recursive:
  424. # For recursive copy, create a zip file
  425. with tempfile.NamedTemporaryFile(
  426. suffix='.zip', delete=False
  427. ) as temp_zip:
  428. temp_zip_path = temp_zip.name
  429. with ZipFile(temp_zip_path, 'w') as zipf:
  430. for root, _, files in os.walk(host_src):
  431. for file in files:
  432. file_path = os.path.join(root, file)
  433. arcname = os.path.relpath(
  434. file_path, os.path.dirname(host_src)
  435. )
  436. zipf.write(file_path, arcname)
  437. upload_data = {'file': open(temp_zip_path, 'rb')}
  438. else:
  439. # For single file copy
  440. upload_data = {'file': open(host_src, 'rb')}
  441. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  442. with send_request(
  443. self.session,
  444. 'POST',
  445. f'{self.api_url}/upload_file',
  446. files=upload_data,
  447. params=params,
  448. timeout=300,
  449. ):
  450. pass
  451. except requests.Timeout:
  452. raise TimeoutError('Copy operation timed out')
  453. except Exception as e:
  454. raise RuntimeError(f'Copy operation failed: {str(e)}')
  455. finally:
  456. if recursive:
  457. os.unlink(temp_zip_path)
  458. self.log(
  459. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  460. )
  461. def list_files(self, path: str | None = None) -> list[str]:
  462. """List files in the sandbox.
  463. If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
  464. """
  465. try:
  466. data = {}
  467. if path is not None:
  468. data['path'] = path
  469. with send_request(
  470. self.session,
  471. 'POST',
  472. f'{self.api_url}/list_files',
  473. json=data,
  474. timeout=10,
  475. ) as response:
  476. response_json = response.json()
  477. assert isinstance(response_json, list)
  478. return response_json
  479. except requests.Timeout:
  480. raise TimeoutError('List files operation timed out')
  481. def copy_from(self, path: str) -> Path:
  482. """Zip all files in the sandbox and return as a stream of bytes."""
  483. try:
  484. params = {'path': path}
  485. with send_request(
  486. self.session,
  487. 'GET',
  488. f'{self.api_url}/download_files',
  489. params=params,
  490. stream=True,
  491. timeout=30,
  492. ) as response:
  493. temp_file = tempfile.NamedTemporaryFile(delete=False)
  494. for chunk in response.iter_content(chunk_size=8192):
  495. if chunk: # filter out keep-alive new chunks
  496. temp_file.write(chunk)
  497. return Path(temp_file.name)
  498. except requests.Timeout:
  499. raise TimeoutError('Copy operation timed out')
  500. def _is_port_in_use_docker(self, port):
  501. containers = self.docker_client.containers.list()
  502. for container in containers:
  503. container_ports = container.ports
  504. if str(port) in str(container_ports):
  505. return True
  506. return False
  507. def _find_available_port(self, max_attempts=5):
  508. port = 39999
  509. for _ in range(max_attempts):
  510. port = find_available_tcp_port(30000, 39999)
  511. if not self._is_port_in_use_docker(port):
  512. return port
  513. # If no port is found after max_attempts, return the last tried port
  514. return port
  515. @property
  516. def vscode_url(self) -> str | None:
  517. if self.vscode_enabled and self._runtime_initialized:
  518. if (
  519. hasattr(self, '_vscode_url') and self._vscode_url is not None
  520. ): # cached value
  521. return self._vscode_url
  522. with send_request(
  523. self.session,
  524. 'GET',
  525. f'{self.api_url}/vscode/connection_token',
  526. timeout=10,
  527. ) as response:
  528. response_json = response.json()
  529. assert isinstance(response_json, dict)
  530. if response_json['token'] is None:
  531. return None
  532. self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
  533. self.log(
  534. 'debug',
  535. f'VSCode URL: {self._vscode_url}',
  536. )
  537. return self._vscode_url
  538. else:
  539. return None