eventstream_runtime.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. import os
  2. from pathlib import Path
  3. import tempfile
  4. import threading
  5. from functools import lru_cache
  6. from typing import Callable
  7. from zipfile import ZipFile
  8. import docker
  9. import requests
  10. import tenacity
  11. from openhands.core.config import AppConfig
  12. from openhands.core.logger import DEBUG
  13. from openhands.core.logger import openhands_logger as logger
  14. from openhands.events import EventStream
  15. from openhands.events.action import (
  16. ActionConfirmationStatus,
  17. BrowseInteractiveAction,
  18. BrowseURLAction,
  19. CmdRunAction,
  20. FileEditAction,
  21. FileReadAction,
  22. FileWriteAction,
  23. IPythonRunCellAction,
  24. )
  25. from openhands.events.action.action import Action
  26. from openhands.events.observation import (
  27. ErrorObservation,
  28. NullObservation,
  29. Observation,
  30. UserRejectObservation,
  31. )
  32. from openhands.events.serialization import event_to_dict, observation_from_dict
  33. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  34. from openhands.runtime.base import Runtime
  35. from openhands.runtime.builder import DockerRuntimeBuilder
  36. from openhands.runtime.plugins import PluginRequirement
  37. from openhands.runtime.utils import find_available_tcp_port
  38. from openhands.runtime.utils.request import send_request
  39. from openhands.runtime.utils.runtime_build import build_runtime_image
  40. from openhands.utils.async_utils import call_sync_from_async
  41. from openhands.utils.tenacity_stop import stop_if_should_exit
  42. class LogBuffer:
  43. """Synchronous buffer for Docker container logs.
  44. This class provides a thread-safe way to collect, store, and retrieve logs
  45. from a Docker container. It uses a list to store log lines and provides methods
  46. for appending, retrieving, and clearing logs.
  47. """
  48. def __init__(self, container: docker.models.containers.Container, logFn: Callable):
  49. self.init_msg = 'Runtime client initialized.'
  50. self.buffer: list[str] = []
  51. self.lock = threading.Lock()
  52. self._stop_event = threading.Event()
  53. self.log_generator = container.logs(stream=True, follow=True)
  54. self.log_stream_thread = threading.Thread(target=self.stream_logs)
  55. self.log_stream_thread.daemon = True
  56. self.log_stream_thread.start()
  57. self.log = logFn
  58. def append(self, log_line: str):
  59. with self.lock:
  60. self.buffer.append(log_line)
  61. def get_and_clear(self) -> list[str]:
  62. with self.lock:
  63. logs = list(self.buffer)
  64. self.buffer.clear()
  65. return logs
  66. def stream_logs(self):
  67. """Stream logs from the Docker container in a separate thread.
  68. This method runs in its own thread to handle the blocking
  69. operation of reading log lines from the Docker SDK's synchronous generator.
  70. """
  71. try:
  72. for log_line in self.log_generator:
  73. if self._stop_event.is_set():
  74. break
  75. if log_line:
  76. decoded_line = log_line.decode('utf-8').rstrip()
  77. self.append(decoded_line)
  78. except Exception as e:
  79. self.log('error', f'Error streaming docker logs: {e}')
  80. def __del__(self):
  81. if self.log_stream_thread.is_alive():
  82. self.log(
  83. 'warn',
  84. "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.",
  85. )
  86. self.close(timeout=5)
  87. def close(self, timeout: float = 5.0):
  88. self._stop_event.set()
  89. self.log_stream_thread.join(timeout)
  90. class EventStreamRuntime(Runtime):
  91. """This runtime will subscribe the event stream.
  92. When receive an event, it will send the event to runtime-client which run inside the docker environment.
  93. Args:
  94. config (AppConfig): The application configuration.
  95. event_stream (EventStream): The event stream to subscribe to.
  96. sid (str, optional): The session ID. Defaults to 'default'.
  97. plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
  98. env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
  99. """
  100. container_name_prefix = 'openhands-runtime-'
  101. # Need to provide this method to allow inheritors to init the Runtime
  102. # without initting the EventStreamRuntime.
  103. def init_base_runtime(
  104. self,
  105. config: AppConfig,
  106. event_stream: EventStream,
  107. sid: str = 'default',
  108. plugins: list[PluginRequirement] | None = None,
  109. env_vars: dict[str, str] | None = None,
  110. status_callback: Callable | None = None,
  111. attach_to_existing: bool = False,
  112. ):
  113. super().__init__(
  114. config,
  115. event_stream,
  116. sid,
  117. plugins,
  118. env_vars,
  119. status_callback,
  120. attach_to_existing,
  121. )
  122. def __init__(
  123. self,
  124. config: AppConfig,
  125. event_stream: EventStream,
  126. sid: str = 'default',
  127. plugins: list[PluginRequirement] | None = None,
  128. env_vars: dict[str, str] | None = None,
  129. status_callback: Callable | None = None,
  130. attach_to_existing: bool = False,
  131. ):
  132. self.config = config
  133. self._host_port = 30000 # initial dummy value
  134. self._container_port = 30001 # initial dummy value
  135. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  136. self.session = requests.Session()
  137. self.status_callback = status_callback
  138. self.docker_client: docker.DockerClient = self._init_docker_client()
  139. self.base_container_image = self.config.sandbox.base_container_image
  140. self.runtime_container_image = self.config.sandbox.runtime_container_image
  141. self.container_name = self.container_name_prefix + sid
  142. self.container = None
  143. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  144. self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
  145. # Buffer for container logs
  146. self.log_buffer: LogBuffer | None = None
  147. if self.config.sandbox.runtime_extra_deps:
  148. self.log(
  149. 'debug',
  150. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  151. )
  152. self.skip_container_logs = (
  153. os.environ.get('SKIP_CONTAINER_LOGS', 'false').lower() == 'true'
  154. )
  155. self.init_base_runtime(
  156. config,
  157. event_stream,
  158. sid,
  159. plugins,
  160. env_vars,
  161. status_callback,
  162. attach_to_existing,
  163. )
  164. async def connect(self):
  165. self.send_status_message('STATUS$STARTING_RUNTIME')
  166. if not self.attach_to_existing:
  167. if self.runtime_container_image is None:
  168. if self.base_container_image is None:
  169. raise ValueError(
  170. 'Neither runtime container image nor base container image is set'
  171. )
  172. self.send_status_message('STATUS$STARTING_CONTAINER')
  173. self.runtime_container_image = build_runtime_image(
  174. self.base_container_image,
  175. self.runtime_builder,
  176. platform=self.config.sandbox.platform,
  177. extra_deps=self.config.sandbox.runtime_extra_deps,
  178. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  179. )
  180. self.log(
  181. 'info', f'Starting runtime with image: {self.runtime_container_image}'
  182. )
  183. await call_sync_from_async(self._init_container)
  184. self.log('info', f'Container started: {self.container_name}')
  185. else:
  186. await call_sync_from_async(self._attach_to_container)
  187. if not self.attach_to_existing:
  188. self.log('info', f'Waiting for client to become ready at {self.api_url}...')
  189. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  190. await call_sync_from_async(self._wait_until_alive)
  191. if not self.attach_to_existing:
  192. self.log('info', 'Runtime is ready.')
  193. if not self.attach_to_existing:
  194. await call_sync_from_async(self.setup_initial_env)
  195. self.log(
  196. 'debug',
  197. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}',
  198. )
  199. self.send_status_message(' ')
  200. @staticmethod
  201. @lru_cache(maxsize=1)
  202. def _init_docker_client() -> docker.DockerClient:
  203. try:
  204. return docker.from_env()
  205. except Exception as ex:
  206. logger.error(
  207. 'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
  208. )
  209. raise ex
  210. def _init_container(self):
  211. self.log('debug', 'Preparing to start container...')
  212. self.send_status_message('STATUS$PREPARING_CONTAINER')
  213. plugin_arg = ''
  214. if self.plugins is not None and len(self.plugins) > 0:
  215. plugin_arg = (
  216. f'--plugins {" ".join([plugin.name for plugin in self.plugins])} '
  217. )
  218. self._host_port = self._find_available_port()
  219. self._container_port = (
  220. self._host_port
  221. ) # in future this might differ from host port
  222. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  223. use_host_network = self.config.sandbox.use_host_network
  224. network_mode: str | None = 'host' if use_host_network else None
  225. port_mapping: dict[str, list[dict[str, str]]] | None = (
  226. None
  227. if use_host_network
  228. else {f'{self._container_port}/tcp': [{'HostPort': str(self._host_port)}]}
  229. )
  230. if use_host_network:
  231. self.log(
  232. 'warn',
  233. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop',
  234. )
  235. # Combine environment variables
  236. environment = {
  237. 'port': str(self._container_port),
  238. 'PYTHONUNBUFFERED': 1,
  239. }
  240. if self.config.debug or DEBUG:
  241. environment['DEBUG'] = 'true'
  242. self.log('debug', f'Workspace Base: {self.config.workspace_base}')
  243. if (
  244. self.config.workspace_mount_path is not None
  245. and self.config.workspace_mount_path_in_sandbox is not None
  246. ):
  247. # e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
  248. volumes = {
  249. self.config.workspace_mount_path: {
  250. 'bind': self.config.workspace_mount_path_in_sandbox,
  251. 'mode': 'rw',
  252. }
  253. }
  254. logger.debug(f'Mount dir: {self.config.workspace_mount_path}')
  255. else:
  256. logger.debug(
  257. 'Mount dir is not set, will not mount the workspace directory to the container'
  258. )
  259. volumes = None
  260. self.log(
  261. 'debug',
  262. f'Sandbox workspace: {self.config.workspace_mount_path_in_sandbox}',
  263. )
  264. if self.config.sandbox.browsergym_eval_env is not None:
  265. browsergym_arg = (
  266. f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
  267. )
  268. else:
  269. browsergym_arg = ''
  270. try:
  271. self.container = self.docker_client.containers.run(
  272. self.runtime_container_image,
  273. command=(
  274. f'/openhands/micromamba/bin/micromamba run -n openhands '
  275. f'poetry run '
  276. f'python -u -m openhands.runtime.action_execution_server {self._container_port} '
  277. f'--working-dir "{self.config.workspace_mount_path_in_sandbox}" '
  278. f'{plugin_arg}'
  279. f'--username {"openhands" if self.config.run_as_openhands else "root"} '
  280. f'--user-id {self.config.sandbox.user_id} '
  281. f'{browsergym_arg}'
  282. ),
  283. network_mode=network_mode,
  284. ports=port_mapping,
  285. working_dir='/openhands/code/', # do not change this!
  286. name=self.container_name,
  287. detach=True,
  288. environment=environment,
  289. volumes=volumes,
  290. )
  291. self.log_buffer = LogBuffer(self.container, self.log)
  292. self.log('debug', f'Container started. Server url: {self.api_url}')
  293. self.send_status_message('STATUS$CONTAINER_STARTED')
  294. except docker.errors.APIError as e:
  295. # check 409 error
  296. if '409' in str(e):
  297. self.log(
  298. 'warning',
  299. f'Container {self.container_name} already exists. Removing...',
  300. )
  301. self._close_containers(rm_all_containers=True)
  302. return self._init_container()
  303. else:
  304. self.log(
  305. 'error',
  306. f'Error: Instance {self.container_name} FAILED to start container!\n',
  307. )
  308. except Exception as e:
  309. self.log(
  310. 'error',
  311. f'Error: Instance {self.container_name} FAILED to start container!\n',
  312. )
  313. self.log('error', str(e))
  314. self.close()
  315. raise e
  316. def _attach_to_container(self):
  317. container = self.docker_client.containers.get(self.container_name)
  318. self.log_buffer = LogBuffer(container, self.log)
  319. self.container = container
  320. self._container_port = 0
  321. for port in container.attrs['NetworkSettings']['Ports']:
  322. self._container_port = int(port.split('/')[0])
  323. break
  324. self._host_port = self._container_port
  325. self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  326. self.log(
  327. 'debug',
  328. f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
  329. )
  330. def _refresh_logs(self):
  331. self.log('debug', 'Getting container logs...')
  332. assert (
  333. self.log_buffer is not None
  334. ), 'Log buffer is expected to be initialized when container is started'
  335. logs = self.log_buffer.get_and_clear()
  336. if logs:
  337. formatted_logs = '\n'.join([f' |{log}' for log in logs])
  338. self.log(
  339. 'debug',
  340. '\n'
  341. + '-' * 35
  342. + 'Container logs:'
  343. + '-' * 35
  344. + f'\n{formatted_logs}'
  345. + '\n'
  346. + '-' * 80,
  347. )
  348. @tenacity.retry(
  349. stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
  350. reraise=(ConnectionRefusedError,),
  351. wait=tenacity.wait_fixed(2),
  352. )
  353. def _wait_until_alive(self):
  354. self._refresh_logs()
  355. if not self.log_buffer:
  356. raise RuntimeError('Runtime client is not ready.')
  357. send_request(
  358. self.session,
  359. 'GET',
  360. f'{self.api_url}/alive',
  361. timeout=5,
  362. )
  363. def close(self, rm_all_containers: bool = True):
  364. """Closes the EventStreamRuntime and associated objects
  365. Parameters:
  366. - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
  367. """
  368. if self.log_buffer:
  369. self.log_buffer.close()
  370. if self.session:
  371. self.session.close()
  372. if self.attach_to_existing:
  373. return
  374. self._close_containers(rm_all_containers)
  375. def _close_containers(self, rm_all_containers: bool = True):
  376. try:
  377. containers = self.docker_client.containers.list(all=True)
  378. for container in containers:
  379. try:
  380. # If the app doesn't shut down properly, it can leave runtime containers on the system. This ensures
  381. # that all 'openhands-sandbox-' containers are removed as well.
  382. if rm_all_containers and container.name.startswith(
  383. self.container_name_prefix
  384. ):
  385. container.remove(force=True)
  386. elif container.name == self.container_name:
  387. if not self.skip_container_logs:
  388. logs = container.logs(tail=1000).decode('utf-8')
  389. self.log(
  390. 'debug',
  391. f'==== Container logs on close ====\n{logs}\n==== End of container logs ====',
  392. )
  393. container.remove(force=True)
  394. except docker.errors.APIError:
  395. pass
  396. except docker.errors.NotFound:
  397. pass
  398. except docker.errors.NotFound: # yes, this can happen!
  399. pass
  400. def run_action(self, action: Action) -> Observation:
  401. if isinstance(action, FileEditAction):
  402. return self.edit(action)
  403. # set timeout to default if not set
  404. if action.timeout is None:
  405. action.timeout = self.config.sandbox.timeout
  406. with self.action_semaphore:
  407. if not action.runnable:
  408. return NullObservation('')
  409. if (
  410. hasattr(action, 'confirmation_state')
  411. and action.confirmation_state
  412. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  413. ):
  414. return NullObservation('')
  415. action_type = action.action # type: ignore[attr-defined]
  416. if action_type not in ACTION_TYPE_TO_CLASS:
  417. raise ValueError(f'Action {action_type} does not exist.')
  418. if not hasattr(self, action_type):
  419. return ErrorObservation(
  420. f'Action {action_type} is not supported in the current runtime.',
  421. error_id='AGENT_ERROR$BAD_ACTION',
  422. )
  423. if (
  424. getattr(action, 'confirmation_state', None)
  425. == ActionConfirmationStatus.REJECTED
  426. ):
  427. return UserRejectObservation(
  428. 'Action has been rejected by the user! Waiting for further user input.'
  429. )
  430. self._refresh_logs()
  431. assert action.timeout is not None
  432. try:
  433. response = send_request(
  434. self.session,
  435. 'POST',
  436. f'{self.api_url}/execute_action',
  437. json={'action': event_to_dict(action)},
  438. # wait a few more seconds to get the timeout error from client side
  439. timeout=action.timeout + 5,
  440. )
  441. output = response.json()
  442. obs = observation_from_dict(output)
  443. obs._cause = action.id # type: ignore[attr-defined]
  444. except requests.Timeout:
  445. raise RuntimeError(
  446. f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
  447. )
  448. self._refresh_logs()
  449. return obs
  450. def run(self, action: CmdRunAction) -> Observation:
  451. return self.run_action(action)
  452. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  453. return self.run_action(action)
  454. def read(self, action: FileReadAction) -> Observation:
  455. return self.run_action(action)
  456. def write(self, action: FileWriteAction) -> Observation:
  457. return self.run_action(action)
  458. def browse(self, action: BrowseURLAction) -> Observation:
  459. return self.run_action(action)
  460. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  461. return self.run_action(action)
  462. # ====================================================================
  463. # Implement these methods (for file operations) in the subclass
  464. # ====================================================================
  465. def copy_to(
  466. self, host_src: str, sandbox_dest: str, recursive: bool = False
  467. ) -> None:
  468. if not os.path.exists(host_src):
  469. raise FileNotFoundError(f'Source file {host_src} does not exist')
  470. self._refresh_logs()
  471. try:
  472. if recursive:
  473. # For recursive copy, create a zip file
  474. with tempfile.NamedTemporaryFile(
  475. suffix='.zip', delete=False
  476. ) as temp_zip:
  477. temp_zip_path = temp_zip.name
  478. with ZipFile(temp_zip_path, 'w') as zipf:
  479. for root, _, files in os.walk(host_src):
  480. for file in files:
  481. file_path = os.path.join(root, file)
  482. arcname = os.path.relpath(
  483. file_path, os.path.dirname(host_src)
  484. )
  485. zipf.write(file_path, arcname)
  486. upload_data = {'file': open(temp_zip_path, 'rb')}
  487. else:
  488. # For single file copy
  489. upload_data = {'file': open(host_src, 'rb')}
  490. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  491. send_request(
  492. self.session,
  493. 'POST',
  494. f'{self.api_url}/upload_file',
  495. files=upload_data,
  496. params=params,
  497. timeout=300,
  498. )
  499. except requests.Timeout:
  500. raise TimeoutError('Copy operation timed out')
  501. except Exception as e:
  502. raise RuntimeError(f'Copy operation failed: {str(e)}')
  503. finally:
  504. if recursive:
  505. os.unlink(temp_zip_path)
  506. self.log(
  507. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  508. )
  509. self._refresh_logs()
  510. def list_files(self, path: str | None = None) -> list[str]:
  511. """List files in the sandbox.
  512. If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
  513. """
  514. self._refresh_logs()
  515. try:
  516. data = {}
  517. if path is not None:
  518. data['path'] = path
  519. response = send_request(
  520. self.session,
  521. 'POST',
  522. f'{self.api_url}/list_files',
  523. json=data,
  524. timeout=10,
  525. )
  526. response_json = response.json()
  527. assert isinstance(response_json, list)
  528. return response_json
  529. except requests.Timeout:
  530. raise TimeoutError('List files operation timed out')
  531. def copy_from(self, path: str) -> Path:
  532. """Zip all files in the sandbox and return as a stream of bytes."""
  533. self._refresh_logs()
  534. try:
  535. params = {'path': path}
  536. response = send_request(
  537. self.session,
  538. 'GET',
  539. f'{self.api_url}/download_files',
  540. params=params,
  541. stream=True,
  542. timeout=30,
  543. )
  544. temp_file = tempfile.NamedTemporaryFile(delete=False)
  545. for chunk in response.iter_content(chunk_size=8192):
  546. if chunk: # filter out keep-alive new chunks
  547. temp_file.write(chunk)
  548. return Path(temp_file.name)
  549. except requests.Timeout:
  550. raise TimeoutError('Copy operation timed out')
  551. def _is_port_in_use_docker(self, port):
  552. containers = self.docker_client.containers.list()
  553. for container in containers:
  554. container_ports = container.ports
  555. if str(port) in str(container_ports):
  556. return True
  557. return False
  558. def _find_available_port(self, max_attempts=5):
  559. port = 39999
  560. for _ in range(max_attempts):
  561. port = find_available_tcp_port(30000, 39999)
  562. if not self._is_port_in_use_docker(port):
  563. return port
  564. # If no port is found after max_attempts, return the last tried port
  565. return port