runtime.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. import os
  2. import tempfile
  3. import threading
  4. import time
  5. import uuid
  6. from zipfile import ZipFile
  7. import docker
  8. import requests
  9. import tenacity
  10. from openhands.core.config import AppConfig
  11. from openhands.core.logger import openhands_logger as logger
  12. from openhands.events import EventStream
  13. from openhands.events.action import (
  14. ActionConfirmationStatus,
  15. BrowseInteractiveAction,
  16. BrowseURLAction,
  17. CmdRunAction,
  18. FileReadAction,
  19. FileWriteAction,
  20. IPythonRunCellAction,
  21. )
  22. from openhands.events.action.action import Action
  23. from openhands.events.observation import (
  24. ErrorObservation,
  25. NullObservation,
  26. Observation,
  27. UserRejectObservation,
  28. )
  29. from openhands.events.serialization import event_to_dict, observation_from_dict
  30. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  31. from openhands.runtime.builder import DockerRuntimeBuilder
  32. from openhands.runtime.plugins import PluginRequirement
  33. from openhands.runtime.runtime import Runtime
  34. from openhands.runtime.utils import find_available_tcp_port
  35. from openhands.runtime.utils.runtime_build import build_runtime_image
  36. class LogBuffer:
  37. """
  38. Synchronous buffer for Docker container logs.
  39. This class provides a thread-safe way to collect, store, and retrieve logs
  40. from a Docker container. It uses a list to store log lines and provides methods
  41. for appending, retrieving, and clearing logs.
  42. """
  43. def __init__(self, container: docker.models.containers.Container):
  44. self.buffer: list[str] = []
  45. self.lock = threading.Lock()
  46. self.log_generator = container.logs(stream=True, follow=True)
  47. self.log_stream_thread = threading.Thread(target=self.stream_logs)
  48. self.log_stream_thread.daemon = True
  49. self.log_stream_thread.start()
  50. self._stop_event = threading.Event()
  51. def append(self, log_line: str):
  52. with self.lock:
  53. self.buffer.append(log_line)
  54. def get_and_clear(self) -> list[str]:
  55. with self.lock:
  56. logs = list(self.buffer)
  57. self.buffer.clear()
  58. return logs
  59. def stream_logs(self):
  60. """
  61. Stream logs from the Docker container in a separate thread.
  62. This method runs in its own thread to handle the blocking
  63. operation of reading log lines from the Docker SDK's synchronous generator.
  64. """
  65. try:
  66. for log_line in self.log_generator:
  67. if self._stop_event.is_set():
  68. break
  69. if log_line:
  70. self.append(log_line.decode('utf-8').rstrip())
  71. except Exception as e:
  72. logger.error(f'Error in stream_logs: {e}')
  73. def __del__(self):
  74. if self.log_stream_thread.is_alive():
  75. logger.warn(
  76. "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown."
  77. )
  78. self.close(timeout=5)
  79. def close(self, timeout: float = 10.0):
  80. self._stop_event.set()
  81. self.log_stream_thread.join(timeout)
  82. class EventStreamRuntime(Runtime):
  83. """This runtime will subscribe the event stream.
  84. When receive an event, it will send the event to runtime-client which run inside the docker environment.
  85. """
  86. container_name_prefix = 'openhands-sandbox-'
  87. def __init__(
  88. self,
  89. config: AppConfig,
  90. event_stream: EventStream,
  91. sid: str = 'default',
  92. plugins: list[PluginRequirement] | None = None,
  93. env_vars: dict[str, str] | None = None,
  94. ):
  95. self.config = config
  96. self._port = find_available_tcp_port()
  97. self.api_url = f'http://{self.config.sandbox.api_hostname}:{self._port}'
  98. self.session = requests.Session()
  99. self.instance_id = (
  100. sid + '_' + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
  101. )
  102. self.docker_client: docker.DockerClient = self._init_docker_client()
  103. self.base_container_image = self.config.sandbox.base_container_image
  104. self.runtime_container_image = self.config.sandbox.runtime_container_image
  105. self.container_name = self.container_name_prefix + self.instance_id
  106. self.container = None
  107. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  108. self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
  109. logger.debug(f'EventStreamRuntime `{sid}`')
  110. # Buffer for container logs
  111. self.log_buffer: LogBuffer | None = None
  112. self.startup_done = False
  113. if self.config.sandbox.runtime_extra_deps:
  114. logger.info(
  115. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}'
  116. )
  117. if self.runtime_container_image is None:
  118. if self.base_container_image is None:
  119. raise ValueError(
  120. 'Neither runtime container image nor base container image is set'
  121. )
  122. self.runtime_container_image = build_runtime_image(
  123. self.base_container_image,
  124. self.runtime_builder,
  125. extra_deps=self.config.sandbox.runtime_extra_deps,
  126. )
  127. self.container = self._init_container(
  128. self.sandbox_workspace_dir,
  129. mount_dir=self.config.workspace_mount_path,
  130. plugins=plugins,
  131. )
  132. # will initialize both the event stream and the env vars
  133. super().__init__(config, event_stream, sid, plugins, env_vars)
  134. logger.info(
  135. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
  136. )
  137. logger.info(f'Container initialized with env vars: {env_vars}')
  138. @staticmethod
  139. def _init_docker_client() -> docker.DockerClient:
  140. try:
  141. return docker.from_env()
  142. except Exception as ex:
  143. logger.error(
  144. 'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
  145. )
  146. raise ex
  147. @tenacity.retry(
  148. stop=tenacity.stop_after_attempt(5),
  149. wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
  150. )
  151. def _init_container(
  152. self,
  153. sandbox_workspace_dir: str,
  154. mount_dir: str | None = None,
  155. plugins: list[PluginRequirement] | None = None,
  156. ):
  157. try:
  158. logger.info(
  159. f'Starting container with image: {self.runtime_container_image} and name: {self.container_name}'
  160. )
  161. plugin_arg = ''
  162. if plugins is not None and len(plugins) > 0:
  163. plugin_arg = (
  164. f'--plugins {" ".join([plugin.name for plugin in plugins])} '
  165. )
  166. network_mode: str | None = None
  167. port_mapping: dict[str, int] | None = None
  168. if self.config.sandbox.use_host_network:
  169. network_mode = 'host'
  170. logger.warn(
  171. 'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
  172. )
  173. else:
  174. port_mapping = {f'{self._port}/tcp': self._port}
  175. if mount_dir is not None:
  176. volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
  177. logger.info(f'Mount dir: {sandbox_workspace_dir}')
  178. else:
  179. logger.warn(
  180. 'Mount dir is not set, will not mount the workspace directory to the container.'
  181. )
  182. volumes = None
  183. if self.config.sandbox.browsergym_eval_env is not None:
  184. browsergym_arg = (
  185. f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
  186. )
  187. else:
  188. browsergym_arg = ''
  189. container = self.docker_client.containers.run(
  190. self.runtime_container_image,
  191. command=(
  192. f'/openhands/miniforge3/bin/mamba run --no-capture-output -n base '
  193. 'PYTHONUNBUFFERED=1 poetry run '
  194. f'python -u -m openhands.runtime.client.client {self._port} '
  195. f'--working-dir {sandbox_workspace_dir} '
  196. f'{plugin_arg}'
  197. f'--username {"openhands" if self.config.run_as_openhands else "root"} '
  198. f'--user-id {self.config.sandbox.user_id} '
  199. f'{browsergym_arg}'
  200. ),
  201. network_mode=network_mode,
  202. ports=port_mapping,
  203. working_dir='/openhands/code/',
  204. name=self.container_name,
  205. detach=True,
  206. environment={'DEBUG': 'true'} if self.config.debug else None,
  207. volumes=volumes,
  208. )
  209. self.log_buffer = LogBuffer(container)
  210. logger.info(f'Container started. Server url: {self.api_url}')
  211. return container
  212. except Exception as e:
  213. logger.error('Failed to start container')
  214. logger.exception(e)
  215. self.close(close_client=False)
  216. raise e
  217. @tenacity.retry(
  218. stop=tenacity.stop_after_attempt(10),
  219. wait=tenacity.wait_exponential(multiplier=2, min=10, max=60),
  220. reraise=(ConnectionRefusedError,),
  221. )
  222. def _wait_until_alive(self):
  223. init_msg = 'Runtime client initialized.'
  224. logger.debug('Getting container logs...')
  225. # Print and clear the log buffer
  226. assert (
  227. self.log_buffer is not None
  228. ), 'Log buffer is expected to be initialized when container is started'
  229. # Always process logs, regardless of startup_done status
  230. logs = self.log_buffer.get_and_clear()
  231. if logs:
  232. formatted_logs = '\n'.join([f' |{log}' for log in logs])
  233. logger.info(
  234. '\n'
  235. + '-' * 30
  236. + 'Container logs:'
  237. + '-' * 30
  238. + f'\n{formatted_logs}'
  239. + '\n'
  240. + '-' * 90
  241. )
  242. # Check for initialization message even if startup_done is True
  243. if any(init_msg in log for log in logs):
  244. self.startup_done = True
  245. if not self.startup_done:
  246. attempts = 0
  247. while not self.startup_done and attempts < 10:
  248. attempts += 1
  249. time.sleep(1)
  250. logs = self.log_buffer.get_and_clear()
  251. if logs:
  252. formatted_logs = '\n'.join([f' |{log}' for log in logs])
  253. logger.info(
  254. '\n'
  255. + '-' * 30
  256. + 'Container logs:'
  257. + '-' * 30
  258. + f'\n{formatted_logs}'
  259. + '\n'
  260. + '-' * 90
  261. )
  262. if any(init_msg in log for log in logs):
  263. self.startup_done = True
  264. break
  265. response = self.session.get(f'{self.api_url}/alive')
  266. if response.status_code == 200:
  267. return
  268. else:
  269. msg = f'Action execution API is not alive. Response: {response}'
  270. logger.error(msg)
  271. raise RuntimeError(msg)
  272. @property
  273. def sandbox_workspace_dir(self):
  274. return self.config.workspace_mount_path_in_sandbox
  275. def close(self, close_client: bool = True):
  276. if self.log_buffer:
  277. self.log_buffer.close()
  278. if self.session:
  279. self.session.close()
  280. containers = self.docker_client.containers.list(all=True)
  281. for container in containers:
  282. try:
  283. if container.name.startswith(self.container_name_prefix):
  284. logs = container.logs(tail=1000).decode('utf-8')
  285. logger.debug(
  286. f'==== Container logs ====\n{logs}\n==== End of container logs ===='
  287. )
  288. container.remove(force=True)
  289. except docker.errors.NotFound:
  290. pass
  291. if close_client:
  292. self.docker_client.close()
  293. def run_action(self, action: Action) -> Observation:
  294. # set timeout to default if not set
  295. if action.timeout is None:
  296. action.timeout = self.config.sandbox.timeout
  297. with self.action_semaphore:
  298. if not action.runnable:
  299. return NullObservation('')
  300. if (
  301. hasattr(action, 'is_confirmed')
  302. and action.is_confirmed
  303. == ActionConfirmationStatus.AWAITING_CONFIRMATION
  304. ):
  305. return NullObservation('')
  306. action_type = action.action # type: ignore[attr-defined]
  307. if action_type not in ACTION_TYPE_TO_CLASS:
  308. return ErrorObservation(f'Action {action_type} does not exist.')
  309. if not hasattr(self, action_type):
  310. return ErrorObservation(
  311. f'Action {action_type} is not supported in the current runtime.'
  312. )
  313. if (
  314. hasattr(action, 'is_confirmed')
  315. and action.is_confirmed == ActionConfirmationStatus.REJECTED
  316. ):
  317. return UserRejectObservation(
  318. 'Action has been rejected by the user! Waiting for further user input.'
  319. )
  320. logger.info('Awaiting session')
  321. self._wait_until_alive()
  322. assert action.timeout is not None
  323. try:
  324. response = self.session.post(
  325. f'{self.api_url}/execute_action',
  326. json={'action': event_to_dict(action)},
  327. timeout=action.timeout,
  328. )
  329. if response.status_code == 200:
  330. output = response.json()
  331. obs = observation_from_dict(output)
  332. obs._cause = action.id # type: ignore[attr-defined]
  333. return obs
  334. else:
  335. error_message = response.text
  336. logger.error(f'Error from server: {error_message}')
  337. obs = ErrorObservation(f'Command execution failed: {error_message}')
  338. except requests.Timeout:
  339. logger.error('No response received within the timeout period.')
  340. obs = ErrorObservation('Command execution timed out')
  341. except Exception as e:
  342. logger.error(f'Error during command execution: {e}')
  343. obs = ErrorObservation(f'Command execution failed: {str(e)}')
  344. return obs
  345. def run(self, action: CmdRunAction) -> Observation:
  346. return self.run_action(action)
  347. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  348. return self.run_action(action)
  349. def read(self, action: FileReadAction) -> Observation:
  350. return self.run_action(action)
  351. def write(self, action: FileWriteAction) -> Observation:
  352. return self.run_action(action)
  353. def browse(self, action: BrowseURLAction) -> Observation:
  354. return self.run_action(action)
  355. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  356. return self.run_action(action)
  357. # ====================================================================
  358. # Implement these methods (for file operations) in the subclass
  359. # ====================================================================
  360. def copy_to(
  361. self, host_src: str, sandbox_dest: str, recursive: bool = False
  362. ) -> None:
  363. if not os.path.exists(host_src):
  364. raise FileNotFoundError(f'Source file {host_src} does not exist')
  365. self._wait_until_alive()
  366. try:
  367. if recursive:
  368. # For recursive copy, create a zip file
  369. with tempfile.NamedTemporaryFile(
  370. suffix='.zip', delete=False
  371. ) as temp_zip:
  372. temp_zip_path = temp_zip.name
  373. with ZipFile(temp_zip_path, 'w') as zipf:
  374. for root, _, files in os.walk(host_src):
  375. for file in files:
  376. file_path = os.path.join(root, file)
  377. arcname = os.path.relpath(
  378. file_path, os.path.dirname(host_src)
  379. )
  380. zipf.write(file_path, arcname)
  381. upload_data = {'file': open(temp_zip_path, 'rb')}
  382. else:
  383. # For single file copy
  384. upload_data = {'file': open(host_src, 'rb')}
  385. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  386. response = self.session.post(
  387. f'{self.api_url}/upload_file', files=upload_data, params=params
  388. )
  389. if response.status_code == 200:
  390. return
  391. else:
  392. error_message = response.text
  393. raise Exception(f'Copy operation failed: {error_message}')
  394. except requests.Timeout:
  395. raise TimeoutError('Copy operation timed out')
  396. except Exception as e:
  397. raise RuntimeError(f'Copy operation failed: {str(e)}')
  398. finally:
  399. if recursive:
  400. os.unlink(temp_zip_path)
  401. logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}')
  402. def list_files(self, path: str | None = None) -> list[str]:
  403. """List files in the sandbox.
  404. If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
  405. """
  406. self._wait_until_alive()
  407. try:
  408. data = {}
  409. if path is not None:
  410. data['path'] = path
  411. response = self.session.post(f'{self.api_url}/list_files', json=data)
  412. if response.status_code == 200:
  413. response_json = response.json()
  414. assert isinstance(response_json, list)
  415. return response_json
  416. else:
  417. error_message = response.text
  418. raise Exception(f'List files operation failed: {error_message}')
  419. except requests.Timeout:
  420. raise TimeoutError('List files operation timed out')
  421. except Exception as e:
  422. raise RuntimeError(f'List files operation failed: {str(e)}')