remote_runtime.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. import os
  2. import tempfile
  3. import threading
  4. from pathlib import Path
  5. from typing import Callable, Optional
  6. from urllib.parse import urlparse
  7. from zipfile import ZipFile
  8. import requests
  9. import tenacity
  10. from openhands.core.config import AppConfig
  11. from openhands.events import EventStream
  12. from openhands.events.action import (
  13. BrowseInteractiveAction,
  14. BrowseURLAction,
  15. CmdRunAction,
  16. FileEditAction,
  17. FileReadAction,
  18. FileWriteAction,
  19. IPythonRunCellAction,
  20. )
  21. from openhands.events.action.action import Action
  22. from openhands.events.observation import (
  23. ErrorObservation,
  24. NullObservation,
  25. Observation,
  26. )
  27. from openhands.events.serialization import event_to_dict, observation_from_dict
  28. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  29. from openhands.runtime.base import (
  30. Runtime,
  31. RuntimeDisconnectedError,
  32. RuntimeNotFoundError,
  33. RuntimeNotReadyError,
  34. )
  35. from openhands.runtime.builder.remote import RemoteRuntimeBuilder
  36. from openhands.runtime.plugins import PluginRequirement
  37. from openhands.runtime.utils.command import get_remote_startup_command
  38. from openhands.runtime.utils.request import (
  39. send_request,
  40. )
  41. from openhands.runtime.utils.runtime_build import build_runtime_image
  42. from openhands.utils.async_utils import call_sync_from_async
  43. from openhands.utils.tenacity_stop import stop_if_should_exit
  44. class RemoteRuntime(Runtime):
  45. """This runtime will connect to a remote oh-runtime-client."""
  46. port: int = 60000 # default port for the remote runtime client
  47. def __init__(
  48. self,
  49. config: AppConfig,
  50. event_stream: EventStream,
  51. sid: str = 'default',
  52. plugins: list[PluginRequirement] | None = None,
  53. env_vars: dict[str, str] | None = None,
  54. status_callback: Optional[Callable] = None,
  55. attach_to_existing: bool = False,
  56. headless_mode: bool = True,
  57. ):
  58. # We need to set session and action_semaphore before the __init__ below, or we get odd errors
  59. self.session = requests.Session()
  60. self.action_semaphore = threading.Semaphore(1)
  61. super().__init__(
  62. config,
  63. event_stream,
  64. sid,
  65. plugins,
  66. env_vars,
  67. status_callback,
  68. attach_to_existing,
  69. headless_mode,
  70. )
  71. if self.config.sandbox.api_key is None:
  72. raise ValueError(
  73. 'API key is required to use the remote runtime. '
  74. 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).'
  75. )
  76. self.session.headers.update({'X-API-Key': self.config.sandbox.api_key})
  77. if self.config.workspace_base is not None:
  78. self.log(
  79. 'debug',
  80. 'Setting workspace_base is not supported in the remote runtime.',
  81. )
  82. self.runtime_builder = RemoteRuntimeBuilder(
  83. self.config.sandbox.remote_runtime_api_url, self.config.sandbox.api_key
  84. )
  85. self.runtime_id: str | None = None
  86. self.runtime_url: str | None = None
  87. self._runtime_initialized: bool = False
  88. self._vscode_url: str | None = None # initial dummy value
  89. async def connect(self):
  90. try:
  91. await call_sync_from_async(self._start_or_attach_to_runtime)
  92. except RuntimeNotReadyError:
  93. self.log('error', 'Runtime failed to start, timed out before ready')
  94. raise
  95. await call_sync_from_async(self.setup_initial_env)
  96. self._runtime_initialized = True
  97. def _start_or_attach_to_runtime(self):
  98. existing_runtime = self._check_existing_runtime()
  99. if existing_runtime:
  100. self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
  101. elif self.attach_to_existing:
  102. raise RuntimeNotFoundError(
  103. f'Could not find existing runtime for SID: {self.sid}'
  104. )
  105. else:
  106. self.send_status_message('STATUS$STARTING_CONTAINER')
  107. if self.config.sandbox.runtime_container_image is None:
  108. self.log(
  109. 'info',
  110. f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
  111. )
  112. self._build_runtime()
  113. else:
  114. self.log(
  115. 'info',
  116. f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
  117. )
  118. self.container_image = self.config.sandbox.runtime_container_image
  119. self._start_runtime()
  120. assert (
  121. self.runtime_id is not None
  122. ), 'Runtime ID is not set. This should never happen.'
  123. assert (
  124. self.runtime_url is not None
  125. ), 'Runtime URL is not set. This should never happen.'
  126. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  127. if not self.attach_to_existing:
  128. self.log('info', 'Waiting for runtime to be alive...')
  129. self._wait_until_alive()
  130. if not self.attach_to_existing:
  131. self.log('info', 'Runtime is ready.')
  132. self.send_status_message(' ')
  133. def _check_existing_runtime(self) -> bool:
  134. try:
  135. with self._send_request(
  136. 'GET',
  137. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  138. is_retry=False,
  139. timeout=5,
  140. ) as response:
  141. data = response.json()
  142. status = data.get('status')
  143. if status == 'running' or status == 'paused':
  144. self._parse_runtime_response(response)
  145. except requests.HTTPError as e:
  146. if e.response.status_code == 404:
  147. return False
  148. self.log('debug', f'Error while looking for remote runtime: {e}')
  149. raise
  150. if status == 'running':
  151. return True
  152. elif status == 'stopped':
  153. self.log('debug', 'Found existing remote runtime, but it is stopped')
  154. return False
  155. elif status == 'paused':
  156. self.log('debug', 'Found existing remote runtime, but it is paused')
  157. self._resume_runtime()
  158. return True
  159. else:
  160. self.log('error', f'Invalid response from runtime API: {data}')
  161. return False
  162. def _build_runtime(self):
  163. self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
  164. with self._send_request(
  165. 'GET',
  166. f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
  167. is_retry=False,
  168. timeout=10,
  169. ) as response:
  170. response_json = response.json()
  171. registry_prefix = response_json['registry_prefix']
  172. os.environ['OH_RUNTIME_RUNTIME_IMAGE_REPO'] = (
  173. registry_prefix.rstrip('/') + '/runtime'
  174. )
  175. self.log(
  176. 'debug',
  177. f'Runtime image repo: {os.environ["OH_RUNTIME_RUNTIME_IMAGE_REPO"]}',
  178. )
  179. if self.config.sandbox.runtime_extra_deps:
  180. self.log(
  181. 'debug',
  182. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  183. )
  184. # Build the container image
  185. self.container_image = build_runtime_image(
  186. self.config.sandbox.base_container_image,
  187. self.runtime_builder,
  188. platform=self.config.sandbox.platform,
  189. extra_deps=self.config.sandbox.runtime_extra_deps,
  190. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  191. )
  192. with self._send_request(
  193. 'GET',
  194. f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
  195. is_retry=False,
  196. params={'image': self.container_image},
  197. timeout=10,
  198. ) as response:
  199. if not response.json()['exists']:
  200. raise RuntimeError(
  201. f'Container image {self.container_image} does not exist'
  202. )
  203. def _start_runtime(self):
  204. # Prepare the request body for the /start endpoint
  205. plugin_args = []
  206. if self.plugins is not None and len(self.plugins) > 0:
  207. plugin_args = ['--plugins'] + [plugin.name for plugin in self.plugins]
  208. browsergym_args = []
  209. if self.config.sandbox.browsergym_eval_env is not None:
  210. browsergym_args = [
  211. '--browsergym-eval-env'
  212. ] + self.config.sandbox.browsergym_eval_env.split(' ')
  213. command = get_remote_startup_command(
  214. self.port,
  215. self.config.workspace_mount_path_in_sandbox,
  216. 'openhands' if self.config.run_as_openhands else 'root',
  217. self.config.sandbox.user_id,
  218. plugin_args,
  219. browsergym_args,
  220. is_root=not self.config.run_as_openhands, # is_root=True when running as root
  221. )
  222. start_request = {
  223. 'image': self.container_image,
  224. 'command': command,
  225. 'working_dir': '/openhands/code/',
  226. 'environment': {'DEBUG': 'true'} if self.config.debug else {},
  227. 'session_id': self.sid,
  228. }
  229. # Start the sandbox using the /start endpoint
  230. with self._send_request(
  231. 'POST',
  232. f'{self.config.sandbox.remote_runtime_api_url}/start',
  233. is_retry=False,
  234. json=start_request,
  235. ) as response:
  236. self._parse_runtime_response(response)
  237. self.log(
  238. 'debug',
  239. f'Runtime started. URL: {self.runtime_url}',
  240. )
  241. def _resume_runtime(self):
  242. with self._send_request(
  243. 'POST',
  244. f'{self.config.sandbox.remote_runtime_api_url}/resume',
  245. is_retry=False,
  246. json={'runtime_id': self.runtime_id},
  247. timeout=30,
  248. ):
  249. pass
  250. self.log('debug', 'Runtime resumed.')
  251. def _parse_runtime_response(self, response: requests.Response):
  252. start_response = response.json()
  253. self.runtime_id = start_response['runtime_id']
  254. self.runtime_url = start_response['url']
  255. if 'session_api_key' in start_response:
  256. self.session.headers.update(
  257. {'X-Session-API-Key': start_response['session_api_key']}
  258. )
  259. @property
  260. def vscode_url(self) -> str | None:
  261. if self.vscode_enabled and self._runtime_initialized:
  262. if (
  263. hasattr(self, '_vscode_url') and self._vscode_url is not None
  264. ): # cached value
  265. return self._vscode_url
  266. with self._send_request(
  267. 'GET',
  268. f'{self.runtime_url}/vscode/connection_token',
  269. timeout=10,
  270. ) as response:
  271. response_json = response.json()
  272. assert isinstance(response_json, dict)
  273. if response_json['token'] is None:
  274. return None
  275. # parse runtime_url to get vscode_url
  276. _parsed_url = urlparse(self.runtime_url)
  277. assert isinstance(_parsed_url.scheme, str) and isinstance(
  278. _parsed_url.netloc, str
  279. )
  280. self._vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
  281. self.log(
  282. 'debug',
  283. f'VSCode URL: {self._vscode_url}',
  284. )
  285. return self._vscode_url
  286. else:
  287. return None
  288. def _wait_until_alive(self):
  289. retry_decorator = tenacity.retry(
  290. stop=tenacity.stop_after_delay(
  291. self.config.sandbox.remote_runtime_init_timeout
  292. )
  293. | stop_if_should_exit(),
  294. reraise=True,
  295. retry=tenacity.retry_if_exception_type(RuntimeNotReadyError),
  296. wait=tenacity.wait_fixed(2),
  297. )
  298. return retry_decorator(self._wait_until_alive_impl)()
  299. def _wait_until_alive_impl(self):
  300. self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
  301. with self._send_request(
  302. 'GET',
  303. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  304. ) as runtime_info_response:
  305. runtime_data = runtime_info_response.json()
  306. assert 'runtime_id' in runtime_data
  307. assert runtime_data['runtime_id'] == self.runtime_id
  308. assert 'pod_status' in runtime_data
  309. pod_status = runtime_data['pod_status']
  310. self.log('debug', f'Pod status: {pod_status}')
  311. # FIXME: We should fix it at the backend of /start endpoint, make sure
  312. # the pod is created before returning the response.
  313. # Retry a period of time to give the cluster time to start the pod
  314. if pod_status == 'Ready':
  315. try:
  316. with self._send_request(
  317. 'GET',
  318. f'{self.runtime_url}/alive',
  319. ): # will raise exception if we don't get 200 back.
  320. pass
  321. except requests.HTTPError as e:
  322. self.log(
  323. 'warning', f"Runtime /alive failed, but pod says it's ready: {e}"
  324. )
  325. raise RuntimeNotReadyError(
  326. f'Runtime /alive failed to respond with 200: {e}'
  327. )
  328. return
  329. elif (
  330. pod_status == 'Not Found'
  331. or pod_status == 'Pending'
  332. or pod_status == 'Running'
  333. ): # nb: Running is not yet Ready
  334. raise RuntimeNotReadyError(
  335. f'Runtime (ID={self.runtime_id}) is not yet ready. Status: {pod_status}'
  336. )
  337. elif pod_status in ('Failed', 'Unknown'):
  338. # clean up the runtime
  339. self.close()
  340. raise RuntimeError(
  341. f'Runtime (ID={self.runtime_id}) failed to start. Current status: {pod_status}'
  342. )
  343. else:
  344. # Maybe this should be a hard failure, but passing through in case the API changes
  345. self.log('warning', f'Unknown pod status: {pod_status}')
  346. self.log(
  347. 'debug',
  348. f'Waiting for runtime pod to be active. Current status: {pod_status}',
  349. )
  350. raise RuntimeNotReadyError()
  351. def close(self, timeout: int = 10):
  352. if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
  353. self.session.close()
  354. return
  355. if self.runtime_id and self.session:
  356. try:
  357. with self._send_request(
  358. 'POST',
  359. f'{self.config.sandbox.remote_runtime_api_url}/stop',
  360. is_retry=False,
  361. json={'runtime_id': self.runtime_id},
  362. timeout=timeout,
  363. ):
  364. self.log('debug', 'Runtime stopped.')
  365. except Exception as e:
  366. raise e
  367. finally:
  368. self.session.close()
  369. def run_action(self, action: Action, is_retry: bool = False) -> Observation:
  370. if action.timeout is None:
  371. action.timeout = self.config.sandbox.timeout
  372. if isinstance(action, FileEditAction):
  373. return self.edit(action)
  374. with self.action_semaphore:
  375. if not action.runnable:
  376. return NullObservation('')
  377. action_type = action.action # type: ignore[attr-defined]
  378. if action_type not in ACTION_TYPE_TO_CLASS:
  379. raise ValueError(f'Action {action_type} does not exist.')
  380. if not hasattr(self, action_type):
  381. return ErrorObservation(
  382. f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.',
  383. error_id='AGENT_ERROR$BAD_ACTION',
  384. )
  385. assert action.timeout is not None
  386. try:
  387. request_body = {'action': event_to_dict(action)}
  388. self.log('debug', f'Request body: {request_body}')
  389. with self._send_request(
  390. 'POST',
  391. f'{self.runtime_url}/execute_action',
  392. is_retry=False,
  393. json=request_body,
  394. # wait a few more seconds to get the timeout error from client side
  395. timeout=action.timeout + 5,
  396. ) as response:
  397. output = response.json()
  398. obs = observation_from_dict(output)
  399. obs._cause = action.id # type: ignore[attr-defined]
  400. except requests.Timeout:
  401. raise RuntimeError(
  402. f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
  403. )
  404. return obs
  405. def _send_request(self, method, url, is_retry=False, **kwargs):
  406. is_runtime_request = self.runtime_url and self.runtime_url in url
  407. try:
  408. return send_request(self.session, method, url, **kwargs)
  409. except requests.Timeout:
  410. self.log('error', 'No response received within the timeout period.')
  411. raise
  412. except requests.HTTPError as e:
  413. if is_runtime_request and e.response.status_code == 404:
  414. raise RuntimeDisconnectedError(
  415. f'404 error while connecting to {self.runtime_url}'
  416. )
  417. elif is_runtime_request and e.response.status_code == 503:
  418. if not is_retry:
  419. self.log('warning', 'Runtime appears to be paused. Resuming...')
  420. self._resume_runtime()
  421. self._wait_until_alive()
  422. return self._send_request(method, url, True, **kwargs)
  423. else:
  424. raise e
  425. else:
  426. raise e
  427. def run(self, action: CmdRunAction) -> Observation:
  428. return self.run_action(action)
  429. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  430. return self.run_action(action)
  431. def read(self, action: FileReadAction) -> Observation:
  432. return self.run_action(action)
  433. def write(self, action: FileWriteAction) -> Observation:
  434. return self.run_action(action)
  435. def browse(self, action: BrowseURLAction) -> Observation:
  436. return self.run_action(action)
  437. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  438. return self.run_action(action)
  439. def copy_to(
  440. self, host_src: str, sandbox_dest: str, recursive: bool = False
  441. ) -> None:
  442. if not os.path.exists(host_src):
  443. raise FileNotFoundError(f'Source file {host_src} does not exist')
  444. try:
  445. if recursive:
  446. with tempfile.NamedTemporaryFile(
  447. suffix='.zip', delete=False
  448. ) as temp_zip:
  449. temp_zip_path = temp_zip.name
  450. with ZipFile(temp_zip_path, 'w') as zipf:
  451. for root, _, files in os.walk(host_src):
  452. for file in files:
  453. file_path = os.path.join(root, file)
  454. arcname = os.path.relpath(
  455. file_path, os.path.dirname(host_src)
  456. )
  457. zipf.write(file_path, arcname)
  458. upload_data = {'file': open(temp_zip_path, 'rb')}
  459. else:
  460. upload_data = {'file': open(host_src, 'rb')}
  461. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  462. with self._send_request(
  463. 'POST',
  464. f'{self.runtime_url}/upload_file',
  465. is_retry=False,
  466. files=upload_data,
  467. params=params,
  468. timeout=300,
  469. ) as response:
  470. self.log(
  471. 'debug',
  472. f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
  473. )
  474. finally:
  475. if recursive:
  476. os.unlink(temp_zip_path)
  477. self.log(
  478. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  479. )
  480. def list_files(self, path: str | None = None) -> list[str]:
  481. data = {}
  482. if path is not None:
  483. data['path'] = path
  484. with self._send_request(
  485. 'POST',
  486. f'{self.runtime_url}/list_files',
  487. is_retry=False,
  488. json=data,
  489. timeout=30,
  490. ) as response:
  491. response_json = response.json()
  492. assert isinstance(response_json, list)
  493. return response_json
  494. def copy_from(self, path: str) -> Path:
  495. """Zip all files in the sandbox and return as a stream of bytes."""
  496. params = {'path': path}
  497. with self._send_request(
  498. 'GET',
  499. f'{self.runtime_url}/download_files',
  500. is_retry=False,
  501. params=params,
  502. stream=True,
  503. timeout=30,
  504. ) as response:
  505. temp_file = tempfile.NamedTemporaryFile(delete=False)
  506. for chunk in response.iter_content(chunk_size=8192):
  507. if chunk: # filter out keep-alive new chunks
  508. temp_file.write(chunk)
  509. return Path(temp_file.name)