remote_runtime.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. import os
  2. import tempfile
  3. import threading
  4. from pathlib import Path
  5. from typing import Callable, Optional
  6. from urllib.parse import urlparse
  7. from zipfile import ZipFile
  8. import requests
  9. import tenacity
  10. from openhands.core.config import AppConfig
  11. from openhands.events import EventStream
  12. from openhands.events.action import (
  13. BrowseInteractiveAction,
  14. BrowseURLAction,
  15. CmdRunAction,
  16. FileEditAction,
  17. FileReadAction,
  18. FileWriteAction,
  19. IPythonRunCellAction,
  20. )
  21. from openhands.events.action.action import Action
  22. from openhands.events.observation import (
  23. ErrorObservation,
  24. NullObservation,
  25. Observation,
  26. )
  27. from openhands.events.serialization import event_to_dict, observation_from_dict
  28. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  29. from openhands.runtime.base import (
  30. Runtime,
  31. RuntimeDisconnectedError,
  32. RuntimeNotFoundError,
  33. RuntimeNotReadyError,
  34. RuntimeUnavailableError,
  35. )
  36. from openhands.runtime.builder.remote import RemoteRuntimeBuilder
  37. from openhands.runtime.plugins import PluginRequirement
  38. from openhands.runtime.utils.command import get_remote_startup_command
  39. from openhands.runtime.utils.request import (
  40. send_request,
  41. )
  42. from openhands.runtime.utils.runtime_build import build_runtime_image
  43. from openhands.utils.async_utils import call_sync_from_async
  44. from openhands.utils.tenacity_stop import stop_if_should_exit
  45. class RemoteRuntime(Runtime):
  46. """This runtime will connect to a remote oh-runtime-client."""
  47. port: int = 60000 # default port for the remote runtime client
  48. def __init__(
  49. self,
  50. config: AppConfig,
  51. event_stream: EventStream,
  52. sid: str = 'default',
  53. plugins: list[PluginRequirement] | None = None,
  54. env_vars: dict[str, str] | None = None,
  55. status_callback: Optional[Callable] = None,
  56. attach_to_existing: bool = False,
  57. headless_mode: bool = True,
  58. ):
  59. # We need to set session and action_semaphore before the __init__ below, or we get odd errors
  60. self.session = requests.Session()
  61. self.action_semaphore = threading.Semaphore(1)
  62. super().__init__(
  63. config,
  64. event_stream,
  65. sid,
  66. plugins,
  67. env_vars,
  68. status_callback,
  69. attach_to_existing,
  70. headless_mode,
  71. )
  72. if self.config.sandbox.api_key is None:
  73. raise ValueError(
  74. 'API key is required to use the remote runtime. '
  75. 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).'
  76. )
  77. self.session.headers.update({'X-API-Key': self.config.sandbox.api_key})
  78. if self.config.workspace_base is not None:
  79. self.log(
  80. 'debug',
  81. 'Setting workspace_base is not supported in the remote runtime.',
  82. )
  83. self.runtime_builder = RemoteRuntimeBuilder(
  84. self.config.sandbox.remote_runtime_api_url, self.config.sandbox.api_key
  85. )
  86. self.runtime_id: str | None = None
  87. self.runtime_url: str | None = None
  88. self._runtime_initialized: bool = False
  89. self._vscode_url: str | None = None # initial dummy value
  90. async def connect(self):
  91. try:
  92. await call_sync_from_async(self._start_or_attach_to_runtime)
  93. except RuntimeNotReadyError:
  94. self.log('error', 'Runtime failed to start, timed out before ready')
  95. raise
  96. await call_sync_from_async(self.setup_initial_env)
  97. self._runtime_initialized = True
  98. def _start_or_attach_to_runtime(self):
  99. existing_runtime = self._check_existing_runtime()
  100. if existing_runtime:
  101. self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
  102. elif self.attach_to_existing:
  103. raise RuntimeNotFoundError(
  104. f'Could not find existing runtime for SID: {self.sid}'
  105. )
  106. else:
  107. self.send_status_message('STATUS$STARTING_CONTAINER')
  108. if self.config.sandbox.runtime_container_image is None:
  109. self.log(
  110. 'info',
  111. f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
  112. )
  113. self._build_runtime()
  114. else:
  115. self.log(
  116. 'info',
  117. f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
  118. )
  119. self.container_image = self.config.sandbox.runtime_container_image
  120. self._start_runtime()
  121. assert (
  122. self.runtime_id is not None
  123. ), 'Runtime ID is not set. This should never happen.'
  124. assert (
  125. self.runtime_url is not None
  126. ), 'Runtime URL is not set. This should never happen.'
  127. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  128. if not self.attach_to_existing:
  129. self.log('info', 'Waiting for runtime to be alive...')
  130. self._wait_until_alive()
  131. if not self.attach_to_existing:
  132. self.log('info', 'Runtime is ready.')
  133. self.send_status_message(' ')
  134. def _check_existing_runtime(self) -> bool:
  135. try:
  136. with self._send_request(
  137. 'GET',
  138. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  139. is_retry=False,
  140. timeout=60,
  141. ) as response:
  142. data = response.json()
  143. status = data.get('status')
  144. if status == 'running' or status == 'paused':
  145. self._parse_runtime_response(response)
  146. except requests.HTTPError as e:
  147. if e.response.status_code == 404:
  148. return False
  149. self.log('debug', f'Error while looking for remote runtime: {e}')
  150. raise
  151. if status == 'running':
  152. return True
  153. elif status == 'stopped':
  154. self.log('debug', 'Found existing remote runtime, but it is stopped')
  155. return False
  156. elif status == 'paused':
  157. self.log('debug', 'Found existing remote runtime, but it is paused')
  158. self._resume_runtime()
  159. return True
  160. else:
  161. self.log('error', f'Invalid response from runtime API: {data}')
  162. return False
  163. def _build_runtime(self):
  164. self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
  165. with self._send_request(
  166. 'GET',
  167. f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
  168. is_retry=False,
  169. timeout=60,
  170. ) as response:
  171. response_json = response.json()
  172. registry_prefix = response_json['registry_prefix']
  173. os.environ['OH_RUNTIME_RUNTIME_IMAGE_REPO'] = (
  174. registry_prefix.rstrip('/') + '/runtime'
  175. )
  176. self.log(
  177. 'debug',
  178. f'Runtime image repo: {os.environ["OH_RUNTIME_RUNTIME_IMAGE_REPO"]}',
  179. )
  180. if self.config.sandbox.runtime_extra_deps:
  181. self.log(
  182. 'debug',
  183. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  184. )
  185. # Build the container image
  186. self.container_image = build_runtime_image(
  187. self.config.sandbox.base_container_image,
  188. self.runtime_builder,
  189. platform=self.config.sandbox.platform,
  190. extra_deps=self.config.sandbox.runtime_extra_deps,
  191. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  192. )
  193. with self._send_request(
  194. 'GET',
  195. f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
  196. is_retry=False,
  197. params={'image': self.container_image},
  198. timeout=60,
  199. ) as response:
  200. if not response.json()['exists']:
  201. raise RuntimeError(
  202. f'Container image {self.container_image} does not exist'
  203. )
  204. def _start_runtime(self):
  205. # Prepare the request body for the /start endpoint
  206. plugin_args = []
  207. if self.plugins is not None and len(self.plugins) > 0:
  208. plugin_args = ['--plugins'] + [plugin.name for plugin in self.plugins]
  209. browsergym_args = []
  210. if self.config.sandbox.browsergym_eval_env is not None:
  211. browsergym_args = [
  212. '--browsergym-eval-env'
  213. ] + self.config.sandbox.browsergym_eval_env.split(' ')
  214. command = get_remote_startup_command(
  215. self.port,
  216. self.config.workspace_mount_path_in_sandbox,
  217. 'openhands' if self.config.run_as_openhands else 'root',
  218. self.config.sandbox.user_id,
  219. plugin_args,
  220. browsergym_args,
  221. is_root=not self.config.run_as_openhands, # is_root=True when running as root
  222. )
  223. start_request = {
  224. 'image': self.container_image,
  225. 'command': command,
  226. 'working_dir': '/openhands/code/',
  227. 'environment': {'DEBUG': 'true'} if self.config.debug else {},
  228. 'session_id': self.sid,
  229. }
  230. # Start the sandbox using the /start endpoint
  231. try:
  232. with self._send_request(
  233. 'POST',
  234. f'{self.config.sandbox.remote_runtime_api_url}/start',
  235. is_retry=False,
  236. json=start_request,
  237. timeout=60,
  238. ) as response:
  239. self._parse_runtime_response(response)
  240. self.log(
  241. 'debug',
  242. f'Runtime started. URL: {self.runtime_url}',
  243. )
  244. except requests.HTTPError as e:
  245. self.log('error', f'Unable to start runtime: {e}')
  246. raise RuntimeUnavailableError() from e
  247. def _resume_runtime(self):
  248. with self._send_request(
  249. 'POST',
  250. f'{self.config.sandbox.remote_runtime_api_url}/resume',
  251. is_retry=False,
  252. json={'runtime_id': self.runtime_id},
  253. timeout=60,
  254. ):
  255. pass
  256. self.log('debug', 'Runtime resumed.')
  257. def _parse_runtime_response(self, response: requests.Response):
  258. start_response = response.json()
  259. self.runtime_id = start_response['runtime_id']
  260. self.runtime_url = start_response['url']
  261. if 'session_api_key' in start_response:
  262. self.session.headers.update(
  263. {'X-Session-API-Key': start_response['session_api_key']}
  264. )
  265. @property
  266. def vscode_url(self) -> str | None:
  267. if self.vscode_enabled and self._runtime_initialized:
  268. if (
  269. hasattr(self, '_vscode_url') and self._vscode_url is not None
  270. ): # cached value
  271. return self._vscode_url
  272. with self._send_request(
  273. 'GET',
  274. f'{self.runtime_url}/vscode/connection_token',
  275. timeout=60,
  276. ) as response:
  277. response_json = response.json()
  278. assert isinstance(response_json, dict)
  279. if response_json['token'] is None:
  280. return None
  281. # parse runtime_url to get vscode_url
  282. _parsed_url = urlparse(self.runtime_url)
  283. assert isinstance(_parsed_url.scheme, str) and isinstance(
  284. _parsed_url.netloc, str
  285. )
  286. self._vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
  287. self.log(
  288. 'debug',
  289. f'VSCode URL: {self._vscode_url}',
  290. )
  291. return self._vscode_url
  292. else:
  293. return None
  294. def _wait_until_alive(self):
  295. retry_decorator = tenacity.retry(
  296. stop=tenacity.stop_after_delay(
  297. self.config.sandbox.remote_runtime_init_timeout
  298. )
  299. | stop_if_should_exit(),
  300. reraise=True,
  301. retry=tenacity.retry_if_exception_type(RuntimeNotReadyError),
  302. wait=tenacity.wait_fixed(2),
  303. )
  304. return retry_decorator(self._wait_until_alive_impl)()
  305. def _wait_until_alive_impl(self):
  306. self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
  307. with self._send_request(
  308. 'GET',
  309. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  310. timeout=60,
  311. ) as runtime_info_response:
  312. runtime_data = runtime_info_response.json()
  313. assert 'runtime_id' in runtime_data
  314. assert runtime_data['runtime_id'] == self.runtime_id
  315. assert 'pod_status' in runtime_data
  316. pod_status = runtime_data['pod_status'].lower()
  317. self.log('debug', f'Pod status: {pod_status}')
  318. # FIXME: We should fix it at the backend of /start endpoint, make sure
  319. # the pod is created before returning the response.
  320. # Retry a period of time to give the cluster time to start the pod
  321. if pod_status == 'ready':
  322. try:
  323. with self._send_request(
  324. 'GET',
  325. f'{self.runtime_url}/alive',
  326. timeout=60,
  327. ): # will raise exception if we don't get 200 back.
  328. pass
  329. except requests.HTTPError as e:
  330. self.log(
  331. 'warning', f"Runtime /alive failed, but pod says it's ready: {e}"
  332. )
  333. raise RuntimeNotReadyError(
  334. f'Runtime /alive failed to respond with 200: {e}'
  335. )
  336. return
  337. elif (
  338. pod_status == 'not found'
  339. or pod_status == 'pending'
  340. or pod_status == 'running'
  341. ): # nb: Running is not yet Ready
  342. raise RuntimeNotReadyError(
  343. f'Runtime (ID={self.runtime_id}) is not yet ready. Status: {pod_status}'
  344. )
  345. elif pod_status in ('failed', 'unknown', 'crashloopbackoff'):
  346. # clean up the runtime
  347. self.close()
  348. raise RuntimeError(
  349. f'Runtime (ID={self.runtime_id}) failed to start. Current status: {pod_status}'
  350. )
  351. else:
  352. # Maybe this should be a hard failure, but passing through in case the API changes
  353. self.log('warning', f'Unknown pod status: {pod_status}')
  354. self.log(
  355. 'debug',
  356. f'Waiting for runtime pod to be active. Current status: {pod_status}',
  357. )
  358. raise RuntimeNotReadyError()
  359. def close(self, timeout: int = 10):
  360. if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
  361. self.session.close()
  362. return
  363. if self.runtime_id and self.session:
  364. try:
  365. with self._send_request(
  366. 'POST',
  367. f'{self.config.sandbox.remote_runtime_api_url}/stop',
  368. is_retry=False,
  369. json={'runtime_id': self.runtime_id},
  370. timeout=timeout,
  371. ):
  372. self.log('debug', 'Runtime stopped.')
  373. except Exception as e:
  374. raise e
  375. finally:
  376. self.session.close()
  377. def run_action(self, action: Action, is_retry: bool = False) -> Observation:
  378. if action.timeout is None:
  379. action.timeout = self.config.sandbox.timeout
  380. if isinstance(action, FileEditAction):
  381. return self.edit(action)
  382. with self.action_semaphore:
  383. if not action.runnable:
  384. return NullObservation('')
  385. action_type = action.action # type: ignore[attr-defined]
  386. if action_type not in ACTION_TYPE_TO_CLASS:
  387. raise ValueError(f'Action {action_type} does not exist.')
  388. if not hasattr(self, action_type):
  389. return ErrorObservation(
  390. f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.',
  391. error_id='AGENT_ERROR$BAD_ACTION',
  392. )
  393. assert action.timeout is not None
  394. try:
  395. request_body = {'action': event_to_dict(action)}
  396. self.log('debug', f'Request body: {request_body}')
  397. with self._send_request(
  398. 'POST',
  399. f'{self.runtime_url}/execute_action',
  400. is_retry=False,
  401. json=request_body,
  402. # wait a few more seconds to get the timeout error from client side
  403. timeout=action.timeout + 5,
  404. ) as response:
  405. output = response.json()
  406. obs = observation_from_dict(output)
  407. obs._cause = action.id # type: ignore[attr-defined]
  408. except requests.Timeout:
  409. raise RuntimeError(
  410. f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
  411. )
  412. return obs
  413. def _send_request(self, method, url, is_retry=False, **kwargs):
  414. is_runtime_request = self.runtime_url and self.runtime_url in url
  415. try:
  416. return send_request(self.session, method, url, **kwargs)
  417. except requests.Timeout:
  418. self.log('error', 'No response received within the timeout period.')
  419. raise
  420. except requests.HTTPError as e:
  421. if is_runtime_request and e.response.status_code == 404:
  422. raise RuntimeDisconnectedError(
  423. f'404 error while connecting to {self.runtime_url}'
  424. )
  425. elif is_runtime_request and e.response.status_code == 503:
  426. if not is_retry:
  427. self.log('warning', 'Runtime appears to be paused. Resuming...')
  428. self._resume_runtime()
  429. self._wait_until_alive()
  430. return self._send_request(method, url, True, **kwargs)
  431. else:
  432. raise e
  433. else:
  434. raise e
  435. def run(self, action: CmdRunAction) -> Observation:
  436. return self.run_action(action)
  437. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  438. return self.run_action(action)
  439. def read(self, action: FileReadAction) -> Observation:
  440. return self.run_action(action)
  441. def write(self, action: FileWriteAction) -> Observation:
  442. return self.run_action(action)
  443. def browse(self, action: BrowseURLAction) -> Observation:
  444. return self.run_action(action)
  445. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  446. return self.run_action(action)
  447. def copy_to(
  448. self, host_src: str, sandbox_dest: str, recursive: bool = False
  449. ) -> None:
  450. if not os.path.exists(host_src):
  451. raise FileNotFoundError(f'Source file {host_src} does not exist')
  452. try:
  453. if recursive:
  454. with tempfile.NamedTemporaryFile(
  455. suffix='.zip', delete=False
  456. ) as temp_zip:
  457. temp_zip_path = temp_zip.name
  458. with ZipFile(temp_zip_path, 'w') as zipf:
  459. for root, _, files in os.walk(host_src):
  460. for file in files:
  461. file_path = os.path.join(root, file)
  462. arcname = os.path.relpath(
  463. file_path, os.path.dirname(host_src)
  464. )
  465. zipf.write(file_path, arcname)
  466. upload_data = {'file': open(temp_zip_path, 'rb')}
  467. else:
  468. upload_data = {'file': open(host_src, 'rb')}
  469. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  470. with self._send_request(
  471. 'POST',
  472. f'{self.runtime_url}/upload_file',
  473. is_retry=False,
  474. files=upload_data,
  475. params=params,
  476. timeout=300,
  477. ) as response:
  478. self.log(
  479. 'debug',
  480. f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
  481. )
  482. finally:
  483. if recursive:
  484. os.unlink(temp_zip_path)
  485. self.log(
  486. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  487. )
  488. def list_files(self, path: str | None = None) -> list[str]:
  489. data = {}
  490. if path is not None:
  491. data['path'] = path
  492. with self._send_request(
  493. 'POST',
  494. f'{self.runtime_url}/list_files',
  495. is_retry=False,
  496. json=data,
  497. timeout=30,
  498. ) as response:
  499. response_json = response.json()
  500. assert isinstance(response_json, list)
  501. return response_json
  502. def copy_from(self, path: str) -> Path:
  503. """Zip all files in the sandbox and return as a stream of bytes."""
  504. params = {'path': path}
  505. with self._send_request(
  506. 'GET',
  507. f'{self.runtime_url}/download_files',
  508. is_retry=False,
  509. params=params,
  510. stream=True,
  511. timeout=30,
  512. ) as response:
  513. temp_file = tempfile.NamedTemporaryFile(delete=False)
  514. for chunk in response.iter_content(chunk_size=8192):
  515. if chunk: # filter out keep-alive new chunks
  516. temp_file.write(chunk)
  517. return Path(temp_file.name)