runtime.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. import os
  2. import tempfile
  3. import threading
  4. import uuid
  5. from typing import Callable
  6. from zipfile import ZipFile
  7. import docker
  8. import requests
  9. import tenacity
  10. from openhands.core.config import AppConfig
  11. from openhands.core.logger import DEBUG
  12. from openhands.core.logger import openhands_logger as logger
  13. from openhands.events import EventStream
  14. from openhands.events.action import (
  15. ActionConfirmationStatus,
  16. BrowseInteractiveAction,
  17. BrowseURLAction,
  18. CmdRunAction,
  19. FileReadAction,
  20. FileWriteAction,
  21. IPythonRunCellAction,
  22. )
  23. from openhands.events.action.action import Action
  24. from openhands.events.observation import (
  25. ErrorObservation,
  26. NullObservation,
  27. Observation,
  28. UserRejectObservation,
  29. )
  30. from openhands.events.serialization import event_to_dict, observation_from_dict
  31. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  32. from openhands.runtime.builder import DockerRuntimeBuilder
  33. from openhands.runtime.plugins import PluginRequirement
  34. from openhands.runtime.runtime import Runtime
  35. from openhands.runtime.utils import find_available_tcp_port
  36. from openhands.runtime.utils.runtime_build import build_runtime_image
  37. from openhands.utils.tenacity_stop import stop_if_should_exit
  38. class LogBuffer:
  39. """Synchronous buffer for Docker container logs.
  40. This class provides a thread-safe way to collect, store, and retrieve logs
  41. from a Docker container. It uses a list to store log lines and provides methods
  42. for appending, retrieving, and clearing logs.
  43. """
  44. def __init__(self, container: docker.models.containers.Container):
  45. self.client_ready = False
  46. self.init_msg = 'Runtime client initialized.'
  47. self.buffer: list[str] = []
  48. self.lock = threading.Lock()
  49. self._stop_event = threading.Event()
  50. self.log_generator = container.logs(stream=True, follow=True)
  51. self.log_stream_thread = threading.Thread(target=self.stream_logs)
  52. self.log_stream_thread.daemon = True
  53. self.log_stream_thread.start()
  54. def append(self, log_line: str):
  55. with self.lock:
  56. self.buffer.append(log_line)
  57. def get_and_clear(self) -> list[str]:
  58. with self.lock:
  59. logs = list(self.buffer)
  60. self.buffer.clear()
  61. return logs
  62. def stream_logs(self):
  63. """
  64. Stream logs from the Docker container in a separate thread.
  65. This method runs in its own thread to handle the blocking
  66. operation of reading log lines from the Docker SDK's synchronous generator.
  67. """
  68. try:
  69. for log_line in self.log_generator:
  70. if self._stop_event.is_set():
  71. break
  72. if log_line:
  73. decoded_line = log_line.decode('utf-8').rstrip()
  74. self.append(decoded_line)
  75. if self.init_msg in decoded_line:
  76. self.client_ready = True
  77. except Exception as e:
  78. logger.error(f'Error streaming docker logs: {e}')
  79. def __del__(self):
  80. if self.log_stream_thread.is_alive():
  81. logger.warn(
  82. "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown."
  83. )
  84. self.close(timeout=5)
  85. def close(self, timeout: float = 5.0):
  86. self._stop_event.set()
  87. self.log_stream_thread.join(timeout)
  88. class EventStreamRuntime(Runtime):
  89. """This runtime will subscribe the event stream.
  90. When receive an event, it will send the event to runtime-client which run inside the docker environment.
  91. From the sid also an instance_id is generated in combination with a UID.
  92. Args:
  93. config (AppConfig): The application configuration.
  94. event_stream (EventStream): The event stream to subscribe to.
  95. sid (str, optional): The session ID. Defaults to 'default'.
  96. plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
  97. env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
  98. """
  99. container_name_prefix = 'openhands-sandbox-'
  100. def __init__(
  101. self,
  102. config: AppConfig,
  103. event_stream: EventStream,
  104. sid: str = 'default',
  105. plugins: list[PluginRequirement] | None = None,
  106. env_vars: dict[str, str] | None = None,
  107. status_message_callback: Callable | None = None,
  108. ):
  109. self.config = config
  110. self._host_port = 30000 # initial dummy value
  111. self._container_port = 30001 # initial dummy value
  112. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  113. self.session = requests.Session()
  114. self.instance_id = (
  115. sid + '_' + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
  116. )
  117. self.status_message_callback = status_message_callback
  118. self.send_status_message('STATUS$STARTING_RUNTIME')
  119. self.docker_client: docker.DockerClient = self._init_docker_client()
  120. self.base_container_image = self.config.sandbox.base_container_image
  121. self.runtime_container_image = self.config.sandbox.runtime_container_image
  122. self.container_name = self.container_name_prefix + self.instance_id
  123. self.container = None
  124. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  125. self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
  126. logger.debug(f'EventStreamRuntime `{self.instance_id}`')
  127. # Buffer for container logs
  128. self.log_buffer: LogBuffer | None = None
  129. if self.config.sandbox.runtime_extra_deps:
  130. logger.debug(
  131. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}'
  132. )
  133. self.skip_container_logs = (
  134. os.environ.get('SKIP_CONTAINER_LOGS', 'false').lower() == 'true'
  135. )
  136. if self.runtime_container_image is None:
  137. if self.base_container_image is None:
  138. raise ValueError(
  139. 'Neither runtime container image nor base container image is set'
  140. )
  141. logger.info('Preparing container, this might take a few minutes...')
  142. self.send_status_message('STATUS$STARTING_CONTAINER')
  143. self.runtime_container_image = build_runtime_image(
  144. self.base_container_image,
  145. self.runtime_builder,
  146. extra_deps=self.config.sandbox.runtime_extra_deps,
  147. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  148. )
  149. self.container = self._init_container(
  150. sandbox_workspace_dir=self.config.workspace_mount_path_in_sandbox, # e.g. /workspace
  151. mount_dir=self.config.workspace_mount_path, # e.g. /opt/openhands/_test_workspace
  152. plugins=plugins,
  153. )
  154. # will initialize both the event stream and the env vars
  155. super().__init__(
  156. config, event_stream, sid, plugins, env_vars, status_message_callback
  157. )
  158. logger.info('Waiting for client to become ready...')
  159. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  160. self._wait_until_alive()
  161. self.setup_initial_env()
  162. logger.info(
  163. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
  164. )
  165. self.send_status_message(' ')
  166. @staticmethod
  167. def _init_docker_client() -> docker.DockerClient:
  168. try:
  169. return docker.from_env()
  170. except Exception as ex:
  171. logger.error(
  172. 'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.'
  173. )
  174. raise ex
  175. @tenacity.retry(
  176. stop=tenacity.stop_after_attempt(5) | stop_if_should_exit(),
  177. wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
  178. )
  179. def _init_container(
  180. self,
  181. sandbox_workspace_dir: str,
  182. mount_dir: str | None = None,
  183. plugins: list[PluginRequirement] | None = None,
  184. ):
  185. try:
  186. logger.info('Preparing to start container...')
  187. self.send_status_message('STATUS$PREPARING_CONTAINER')
  188. plugin_arg = ''
  189. if plugins is not None and len(plugins) > 0:
  190. plugin_arg = (
  191. f'--plugins {" ".join([plugin.name for plugin in plugins])} '
  192. )
  193. self._host_port = self._find_available_port()
  194. self._container_port = (
  195. self._host_port
  196. ) # in future this might differ from host port
  197. self.api_url = (
  198. f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  199. )
  200. use_host_network = self.config.sandbox.use_host_network
  201. network_mode: str | None = 'host' if use_host_network else None
  202. port_mapping: dict[str, list[dict[str, str]]] | None = (
  203. None
  204. if use_host_network
  205. else {
  206. f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]
  207. }
  208. )
  209. if use_host_network:
  210. logger.warn(
  211. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
  212. )
  213. # Combine environment variables
  214. environment = {
  215. 'port': str(self._container_port),
  216. 'PYTHONUNBUFFERED': 1,
  217. }
  218. if self.config.debug or DEBUG:
  219. environment['DEBUG'] = 'true'
  220. logger.debug(f'Workspace Base: {self.config.workspace_base}')
  221. if mount_dir is not None and sandbox_workspace_dir is not None:
  222. # e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
  223. volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
  224. logger.debug(f'Mount dir: {mount_dir}')
  225. else:
  226. logger.warn(
  227. 'Warning: Mount dir is not set, will not mount the workspace directory to the container!\n'
  228. )
  229. volumes = None
  230. logger.debug(f'Sandbox workspace: {sandbox_workspace_dir}')
  231. if self.config.sandbox.browsergym_eval_env is not None:
  232. browsergym_arg = (
  233. f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
  234. )
  235. else:
  236. browsergym_arg = ''
  237. container = self.docker_client.containers.run(
  238. self.runtime_container_image,
  239. command=(
  240. f'/openhands/micromamba/bin/micromamba run -n openhands '
  241. f'poetry run '
  242. f'python -u -m openhands.runtime.client.client {self._container_port} '
  243. f'--working-dir "{sandbox_workspace_dir}" '
  244. f'{plugin_arg}'
  245. f'--username {"openhands" if self.config.run_as_openhands else "root"} '
  246. f'--user-id {self.config.sandbox.user_id} '
  247. f'{browsergym_arg}'
  248. ),
  249. network_mode=network_mode,
  250. ports=port_mapping,
  251. working_dir='/openhands/code/', # do not change this!
  252. name=self.container_name,
  253. detach=True,
  254. environment=environment,
  255. volumes=volumes,
  256. )
  257. self.log_buffer = LogBuffer(container)
  258. logger.info(f'Container started. Server url: {self.api_url}')
  259. self.send_status_message('STATUS$CONTAINER_STARTED')
  260. return container
  261. except Exception as e:
  262. logger.error(
  263. f'Error: Instance {self.instance_id} FAILED to start container!\n'
  264. )
  265. logger.exception(e)
  266. self.close(close_client=False)
  267. raise e
  268. def _refresh_logs(self):
  269. logger.debug('Getting container logs...')
  270. assert (
  271. self.log_buffer is not None
  272. ), 'Log buffer is expected to be initialized when container is started'
  273. logs = self.log_buffer.get_and_clear()
  274. if logs:
  275. formatted_logs = '\n'.join([f' |{log}' for log in logs])
  276. logger.info(
  277. '\n'
  278. + '-' * 35
  279. + 'Container logs:'
  280. + '-' * 35
  281. + f'\n{formatted_logs}'
  282. + '\n'
  283. + '-' * 80
  284. )
  285. @tenacity.retry(
  286. stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
  287. wait=tenacity.wait_exponential(multiplier=2, min=1, max=20),
  288. reraise=(ConnectionRefusedError,),
  289. )
  290. def _wait_until_alive(self):
  291. self._refresh_logs()
  292. if not (self.log_buffer and self.log_buffer.client_ready):
  293. raise RuntimeError('Runtime client is not ready.')
  294. response = self.session.get(f'{self.api_url}/alive')
  295. if response.status_code == 200:
  296. return
  297. else:
  298. msg = f'Action execution API is not alive. Response: {response}'
  299. logger.error(msg)
  300. raise RuntimeError(msg)
  301. def close(self, close_client: bool = True, rm_all_containers: bool = True):
  302. """Closes the EventStreamRuntime and associated objects
  303. Parameters:
  304. - close_client (bool): Whether to close the DockerClient
  305. - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
  306. """
  307. if self.log_buffer:
  308. self.log_buffer.close()
  309. if self.session:
  310. self.session.close()
  311. try:
  312. containers = self.docker_client.containers.list(all=True)
  313. for container in containers:
  314. try:
  315. # If the app doesn't shut down properly, it can leave runtime containers on the system. This ensures
  316. # that all 'openhands-sandbox-' containers are removed as well.
  317. if rm_all_containers and container.name.startswith(
  318. self.container_name_prefix
  319. ):
  320. container.remove(force=True)
  321. elif container.name == self.container_name:
  322. if not self.skip_container_logs:
  323. logs = container.logs(tail=1000).decode('utf-8')
  324. logger.debug(
  325. f'==== Container logs on close ====\n{logs}\n==== End of container logs ===='
  326. )
  327. container.remove(force=True)
  328. except docker.errors.APIError:
  329. pass
  330. except docker.errors.NotFound:
  331. pass
  332. except docker.errors.NotFound: # yes, this can happen!
  333. pass
  334. if close_client:
  335. self.docker_client.close()
  336. def run_action(self, action: Action) -> Observation:
  337. # set timeout to default if not set
  338. if action.timeout is None:
  339. action.timeout = self.config.sandbox.timeout
  340. with self.action_semaphore:
  341. if not action.runnable:
  342. return NullObservation('')
  343. if (
  344. hasattr(action, 'is_confirmed')
  345. and action.is_confirmed
  346. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  347. ):
  348. return NullObservation('')
  349. action_type = action.action # type: ignore[attr-defined]
  350. if action_type not in ACTION_TYPE_TO_CLASS:
  351. return ErrorObservation(f'Action {action_type} does not exist.')
  352. if not hasattr(self, action_type):
  353. return ErrorObservation(
  354. f'Action {action_type} is not supported in the current runtime.'
  355. )
  356. if (
  357. hasattr(action, 'is_confirmed')
  358. and action.is_confirmed == ActionConfirmationStatus.REJECTED
  359. ):
  360. return UserRejectObservation(
  361. 'Action has been rejected by the user! Waiting for further user input.'
  362. )
  363. self._refresh_logs()
  364. assert action.timeout is not None
  365. try:
  366. response = self.session.post(
  367. f'{self.api_url}/execute_action',
  368. json={'action': event_to_dict(action)},
  369. timeout=action.timeout,
  370. )
  371. if response.status_code == 200:
  372. output = response.json()
  373. obs = observation_from_dict(output)
  374. obs._cause = action.id # type: ignore[attr-defined]
  375. else:
  376. logger.debug(f'action: {action}')
  377. logger.debug(f'response: {response}')
  378. error_message = response.text
  379. logger.error(f'Error from server: {error_message}')
  380. obs = ErrorObservation(f'Action execution failed: {error_message}')
  381. except requests.Timeout:
  382. logger.error('No response received within the timeout period.')
  383. obs = ErrorObservation(
  384. f'Action execution timed out after {action.timeout} seconds.'
  385. )
  386. except Exception as e:
  387. logger.error(f'Error during action execution: {e}')
  388. obs = ErrorObservation(f'Action execution failed: {str(e)}')
  389. self._refresh_logs()
  390. return obs
  391. def run(self, action: CmdRunAction) -> Observation:
  392. return self.run_action(action)
  393. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  394. return self.run_action(action)
  395. def read(self, action: FileReadAction) -> Observation:
  396. return self.run_action(action)
  397. def write(self, action: FileWriteAction) -> Observation:
  398. return self.run_action(action)
  399. def browse(self, action: BrowseURLAction) -> Observation:
  400. return self.run_action(action)
  401. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  402. return self.run_action(action)
  403. # ====================================================================
  404. # Implement these methods (for file operations) in the subclass
  405. # ====================================================================
  406. def copy_to(
  407. self, host_src: str, sandbox_dest: str, recursive: bool = False
  408. ) -> None:
  409. if not os.path.exists(host_src):
  410. raise FileNotFoundError(f'Source file {host_src} does not exist')
  411. self._refresh_logs()
  412. try:
  413. if recursive:
  414. # For recursive copy, create a zip file
  415. with tempfile.NamedTemporaryFile(
  416. suffix='.zip', delete=False
  417. ) as temp_zip:
  418. temp_zip_path = temp_zip.name
  419. with ZipFile(temp_zip_path, 'w') as zipf:
  420. for root, _, files in os.walk(host_src):
  421. for file in files:
  422. file_path = os.path.join(root, file)
  423. arcname = os.path.relpath(
  424. file_path, os.path.dirname(host_src)
  425. )
  426. zipf.write(file_path, arcname)
  427. upload_data = {'file': open(temp_zip_path, 'rb')}
  428. else:
  429. # For single file copy
  430. upload_data = {'file': open(host_src, 'rb')}
  431. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  432. response = self.session.post(
  433. f'{self.api_url}/upload_file', files=upload_data, params=params
  434. )
  435. if response.status_code == 200:
  436. return
  437. else:
  438. error_message = response.text
  439. raise Exception(f'Copy operation failed: {error_message}')
  440. except requests.Timeout:
  441. raise TimeoutError('Copy operation timed out')
  442. except Exception as e:
  443. raise RuntimeError(f'Copy operation failed: {str(e)}')
  444. finally:
  445. if recursive:
  446. os.unlink(temp_zip_path)
  447. logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}')
  448. self._refresh_logs()
  449. def list_files(self, path: str | None = None) -> list[str]:
  450. """List files in the sandbox.
  451. If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
  452. """
  453. self._refresh_logs()
  454. try:
  455. data = {}
  456. if path is not None:
  457. data['path'] = path
  458. response = self.session.post(f'{self.api_url}/list_files', json=data)
  459. if response.status_code == 200:
  460. response_json = response.json()
  461. assert isinstance(response_json, list)
  462. return response_json
  463. else:
  464. error_message = response.text
  465. raise Exception(f'List files operation failed: {error_message}')
  466. except requests.Timeout:
  467. raise TimeoutError('List files operation timed out')
  468. except Exception as e:
  469. raise RuntimeError(f'List files operation failed: {str(e)}')
  470. def _is_port_in_use_docker(self, port):
  471. containers = self.docker_client.containers.list()
  472. for container in containers:
  473. container_ports = container.ports
  474. if str(port) in str(container_ports):
  475. return True
  476. return False
  477. def _find_available_port(self, max_attempts=5):
  478. port = 39999
  479. for _ in range(max_attempts):
  480. port = find_available_tcp_port(30000, 39999)
  481. if not self._is_port_in_use_docker(port):
  482. return port
  483. # If no port is found after max_attempts, return the last tried port
  484. return port
  485. def send_status_message(self, message: str):
  486. """Sends a status message if the callback function was provided."""
  487. if self.status_message_callback:
  488. self.status_message_callback(message)