remote_runtime.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. import os
  2. import tempfile
  3. import threading
  4. import time
  5. from typing import Callable, Optional
  6. from zipfile import ZipFile
  7. import requests
  8. from requests.exceptions import Timeout
  9. from openhands.core.config import AppConfig
  10. from openhands.events import EventStream
  11. from openhands.events.action import (
  12. BrowseInteractiveAction,
  13. BrowseURLAction,
  14. CmdRunAction,
  15. FileEditAction,
  16. FileReadAction,
  17. FileWriteAction,
  18. IPythonRunCellAction,
  19. )
  20. from openhands.events.action.action import Action
  21. from openhands.events.observation import (
  22. FatalErrorObservation,
  23. NullObservation,
  24. Observation,
  25. )
  26. from openhands.events.serialization import event_to_dict, observation_from_dict
  27. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  28. from openhands.runtime.base import Runtime
  29. from openhands.runtime.builder.remote import RemoteRuntimeBuilder
  30. from openhands.runtime.plugins import PluginRequirement
  31. from openhands.runtime.utils.command import get_remote_startup_command
  32. from openhands.runtime.utils.request import (
  33. is_404_error,
  34. is_503_error,
  35. send_request_with_retry,
  36. )
  37. from openhands.runtime.utils.runtime_build import build_runtime_image
  38. class RemoteRuntime(Runtime):
  39. """This runtime will connect to a remote oh-runtime-client."""
  40. port: int = 60000 # default port for the remote runtime client
  41. def __init__(
  42. self,
  43. config: AppConfig,
  44. event_stream: EventStream,
  45. sid: str = 'default',
  46. plugins: list[PluginRequirement] | None = None,
  47. env_vars: dict[str, str] | None = None,
  48. status_message_callback: Optional[Callable] = None,
  49. attach_to_existing: bool = False,
  50. ):
  51. super().__init__(
  52. config,
  53. event_stream,
  54. sid,
  55. plugins,
  56. env_vars,
  57. status_message_callback,
  58. attach_to_existing,
  59. )
  60. if self.config.sandbox.api_key is None:
  61. raise ValueError(
  62. 'API key is required to use the remote runtime. '
  63. 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).'
  64. )
  65. self.session = requests.Session()
  66. self.session.headers.update({'X-API-Key': self.config.sandbox.api_key})
  67. self.action_semaphore = threading.Semaphore(1)
  68. if self.config.workspace_base is not None:
  69. self.log(
  70. 'warning',
  71. 'Setting workspace_base is not supported in the remote runtime.',
  72. )
  73. self.runtime_builder = RemoteRuntimeBuilder(
  74. self.config.sandbox.remote_runtime_api_url, self.config.sandbox.api_key
  75. )
  76. self.runtime_id: str | None = None
  77. self.runtime_url: str | None = None
  78. async def connect(self):
  79. self._start_or_attach_to_runtime()
  80. self._wait_until_alive()
  81. self.setup_initial_env()
  82. def _start_or_attach_to_runtime(self):
  83. existing_runtime = self._check_existing_runtime()
  84. if existing_runtime:
  85. self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
  86. elif self.attach_to_existing:
  87. raise RuntimeError('Could not find existing runtime to attach to.')
  88. else:
  89. self.send_status_message('STATUS$STARTING_CONTAINER')
  90. if self.config.sandbox.runtime_container_image is None:
  91. self.log(
  92. 'info',
  93. f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
  94. )
  95. self._build_runtime()
  96. else:
  97. self.log(
  98. 'info',
  99. f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
  100. )
  101. self.container_image = self.config.sandbox.runtime_container_image
  102. self._start_runtime()
  103. assert (
  104. self.runtime_id is not None
  105. ), 'Runtime ID is not set. This should never happen.'
  106. assert (
  107. self.runtime_url is not None
  108. ), 'Runtime URL is not set. This should never happen.'
  109. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  110. if not self.attach_to_existing:
  111. self.log('info', 'Waiting for runtime to be alive...')
  112. self._wait_until_alive()
  113. if not self.attach_to_existing:
  114. self.log('info', 'Runtime is ready.')
  115. self.send_status_message(' ')
  116. def _check_existing_runtime(self) -> bool:
  117. try:
  118. response = send_request_with_retry(
  119. self.session,
  120. 'GET',
  121. f'{self.config.sandbox.remote_runtime_api_url}/runtime/{self.sid}',
  122. timeout=5,
  123. )
  124. except Exception as e:
  125. self.log('debug', f'Error while looking for remote runtime: {e}')
  126. return False
  127. if response.status_code == 200:
  128. data = response.json()
  129. status = data.get('status')
  130. if status == 'running':
  131. self._parse_runtime_response(response)
  132. return True
  133. elif status == 'stopped':
  134. self.log('debug', 'Found existing remote runtime, but it is stopped')
  135. return False
  136. elif status == 'paused':
  137. self.log('debug', 'Found existing remote runtime, but it is paused')
  138. self._parse_runtime_response(response)
  139. self._resume_runtime()
  140. return True
  141. else:
  142. self.log('error', f'Invalid response from runtime API: {data}')
  143. return False
  144. else:
  145. self.log('debug', 'Could not find existing remote runtime')
  146. return False
  147. def _build_runtime(self):
  148. self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
  149. response = send_request_with_retry(
  150. self.session,
  151. 'GET',
  152. f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
  153. timeout=30,
  154. )
  155. response_json = response.json()
  156. registry_prefix = response_json['registry_prefix']
  157. os.environ['OH_RUNTIME_RUNTIME_IMAGE_REPO'] = (
  158. registry_prefix.rstrip('/') + '/runtime'
  159. )
  160. self.log(
  161. 'debug',
  162. f'Runtime image repo: {os.environ["OH_RUNTIME_RUNTIME_IMAGE_REPO"]}',
  163. )
  164. if self.config.sandbox.runtime_extra_deps:
  165. self.log(
  166. 'debug',
  167. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  168. )
  169. # Build the container image
  170. self.container_image = build_runtime_image(
  171. self.config.sandbox.base_container_image,
  172. self.runtime_builder,
  173. platform=self.config.sandbox.platform,
  174. extra_deps=self.config.sandbox.runtime_extra_deps,
  175. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  176. )
  177. response = send_request_with_retry(
  178. self.session,
  179. 'GET',
  180. f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
  181. params={'image': self.container_image},
  182. timeout=30,
  183. )
  184. if response.status_code != 200 or not response.json()['exists']:
  185. raise RuntimeError(f'Container image {self.container_image} does not exist')
  186. def _start_runtime(self):
  187. # Prepare the request body for the /start endpoint
  188. plugin_args = []
  189. if self.plugins is not None and len(self.plugins) > 0:
  190. plugin_args = ['--plugins'] + [plugin.name for plugin in self.plugins]
  191. browsergym_args = []
  192. if self.config.sandbox.browsergym_eval_env is not None:
  193. browsergym_args = [
  194. '--browsergym-eval-env'
  195. ] + self.config.sandbox.browsergym_eval_env.split(' ')
  196. command = get_remote_startup_command(
  197. self.port,
  198. self.config.workspace_mount_path_in_sandbox,
  199. 'openhands' if self.config.run_as_openhands else 'root',
  200. self.config.sandbox.user_id,
  201. plugin_args,
  202. browsergym_args,
  203. )
  204. start_request = {
  205. 'image': self.container_image,
  206. 'command': command,
  207. 'working_dir': '/openhands/code/',
  208. 'environment': {'DEBUG': 'true'} if self.config.debug else {},
  209. 'runtime_id': self.sid,
  210. }
  211. # Start the sandbox using the /start endpoint
  212. response = send_request_with_retry(
  213. self.session,
  214. 'POST',
  215. f'{self.config.sandbox.remote_runtime_api_url}/start',
  216. json=start_request,
  217. timeout=300,
  218. )
  219. if response.status_code != 201:
  220. raise RuntimeError(
  221. f'[Runtime (ID={self.runtime_id})] Failed to start runtime: {response.text}'
  222. )
  223. self._parse_runtime_response(response)
  224. self.log(
  225. 'debug',
  226. f'Runtime started. URL: {self.runtime_url}',
  227. )
  228. def _resume_runtime(self):
  229. response = send_request_with_retry(
  230. self.session,
  231. 'POST',
  232. f'{self.config.sandbox.remote_runtime_api_url}/resume',
  233. json={'runtime_id': self.runtime_id},
  234. timeout=30,
  235. )
  236. if response.status_code != 200:
  237. raise RuntimeError(
  238. f'[Runtime (ID={self.runtime_id})] Failed to resume runtime: {response.text}'
  239. )
  240. self.log('debug', 'Runtime resumed.')
  241. def _parse_runtime_response(self, response: requests.Response):
  242. start_response = response.json()
  243. self.runtime_id = start_response['runtime_id']
  244. self.runtime_url = start_response['url']
  245. if 'session_api_key' in start_response:
  246. self.session.headers.update(
  247. {'X-Session-API-Key': start_response['session_api_key']}
  248. )
  249. def _wait_until_alive(self):
  250. self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
  251. # send GET request to /runtime/<id>
  252. pod_running = False
  253. max_not_found_count = 12 # 2 minutes
  254. not_found_count = 0
  255. while not pod_running:
  256. runtime_info_response = send_request_with_retry(
  257. self.session,
  258. 'GET',
  259. f'{self.config.sandbox.remote_runtime_api_url}/runtime/{self.runtime_id}',
  260. timeout=5,
  261. )
  262. if runtime_info_response.status_code != 200:
  263. raise RuntimeError(
  264. f'Failed to get runtime status: {runtime_info_response.status_code}. Response: {runtime_info_response.text}'
  265. )
  266. runtime_data = runtime_info_response.json()
  267. assert runtime_data['runtime_id'] == self.runtime_id
  268. pod_status = runtime_data['pod_status']
  269. self.log(
  270. 'debug',
  271. f'Waiting for runtime pod to be active. Current status: {pod_status}',
  272. )
  273. if pod_status == 'Ready':
  274. pod_running = True
  275. break
  276. elif pod_status == 'Not Found' and not_found_count < max_not_found_count:
  277. not_found_count += 1
  278. self.log(
  279. 'debug',
  280. f'Runtime pod not found. Count: {not_found_count} / {max_not_found_count}',
  281. )
  282. elif pod_status in ('Failed', 'Unknown', 'Not Found'):
  283. # clean up the runtime
  284. self.close()
  285. raise RuntimeError(
  286. f'Runtime (ID={self.runtime_id}) failed to start. Current status: {pod_status}'
  287. )
  288. # Pending otherwise - add proper sleep
  289. time.sleep(10)
  290. response = send_request_with_retry(
  291. self.session,
  292. 'GET',
  293. f'{self.runtime_url}/alive',
  294. # Retry 404 & 503 errors for the /alive endpoint
  295. # because the runtime might just be starting up
  296. # and have not registered the endpoint yet
  297. retry_fns=[is_404_error, is_503_error],
  298. # leave enough time for the runtime to start up
  299. timeout=600,
  300. )
  301. if response.status_code != 200:
  302. msg = f'Runtime (ID={self.runtime_id}) is not alive yet. Status: {response.status_code}.'
  303. self.log('warning', msg)
  304. raise RuntimeError(msg)
  305. def close(self, timeout: int = 10):
  306. if self.config.sandbox.keep_remote_runtime_alive or self.attach_to_existing:
  307. self.session.close()
  308. return
  309. if self.runtime_id:
  310. try:
  311. response = send_request_with_retry(
  312. self.session,
  313. 'POST',
  314. f'{self.config.sandbox.remote_runtime_api_url}/stop',
  315. json={'runtime_id': self.runtime_id},
  316. timeout=timeout,
  317. )
  318. if response.status_code != 200:
  319. self.log(
  320. 'error',
  321. f'Failed to stop runtime: {response.text}',
  322. )
  323. else:
  324. self.log('debug', 'Runtime stopped.')
  325. except Exception as e:
  326. raise e
  327. finally:
  328. self.session.close()
  329. def run_action(self, action: Action) -> Observation:
  330. if action.timeout is None:
  331. action.timeout = self.config.sandbox.timeout
  332. if isinstance(action, FileEditAction):
  333. return self.edit(action)
  334. with self.action_semaphore:
  335. if not action.runnable:
  336. return NullObservation('')
  337. action_type = action.action # type: ignore[attr-defined]
  338. if action_type not in ACTION_TYPE_TO_CLASS:
  339. return FatalErrorObservation(
  340. f'[Runtime (ID={self.runtime_id})] Action {action_type} does not exist.'
  341. )
  342. if not hasattr(self, action_type):
  343. return FatalErrorObservation(
  344. f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.'
  345. )
  346. assert action.timeout is not None
  347. try:
  348. request_body = {'action': event_to_dict(action)}
  349. self.log('debug', f'Request body: {request_body}')
  350. response = send_request_with_retry(
  351. self.session,
  352. 'POST',
  353. f'{self.runtime_url}/execute_action',
  354. json=request_body,
  355. timeout=action.timeout,
  356. )
  357. if response.status_code == 200:
  358. output = response.json()
  359. obs = observation_from_dict(output)
  360. obs._cause = action.id # type: ignore[attr-defined]
  361. return obs
  362. else:
  363. error_message = response.text
  364. self.log('error', f'Error from server: {error_message}')
  365. obs = FatalErrorObservation(
  366. f'Action execution failed: {error_message}'
  367. )
  368. except Timeout:
  369. self.log('error', 'No response received within the timeout period.')
  370. obs = FatalErrorObservation(
  371. f'[Runtime (ID={self.runtime_id})] Action execution timed out'
  372. )
  373. except Exception as e:
  374. self.log('error', f'Error during action execution: {e}')
  375. obs = FatalErrorObservation(
  376. f'[Runtime (ID={self.runtime_id})] Action execution failed: {str(e)}'
  377. )
  378. return obs
  379. def run(self, action: CmdRunAction) -> Observation:
  380. return self.run_action(action)
  381. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  382. return self.run_action(action)
  383. def read(self, action: FileReadAction) -> Observation:
  384. return self.run_action(action)
  385. def write(self, action: FileWriteAction) -> Observation:
  386. return self.run_action(action)
  387. def browse(self, action: BrowseURLAction) -> Observation:
  388. return self.run_action(action)
  389. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  390. return self.run_action(action)
  391. def copy_to(
  392. self, host_src: str, sandbox_dest: str, recursive: bool = False
  393. ) -> None:
  394. if not os.path.exists(host_src):
  395. raise FileNotFoundError(f'Source file {host_src} does not exist')
  396. try:
  397. if recursive:
  398. with tempfile.NamedTemporaryFile(
  399. suffix='.zip', delete=False
  400. ) as temp_zip:
  401. temp_zip_path = temp_zip.name
  402. with ZipFile(temp_zip_path, 'w') as zipf:
  403. for root, _, files in os.walk(host_src):
  404. for file in files:
  405. file_path = os.path.join(root, file)
  406. arcname = os.path.relpath(
  407. file_path, os.path.dirname(host_src)
  408. )
  409. zipf.write(file_path, arcname)
  410. upload_data = {'file': open(temp_zip_path, 'rb')}
  411. else:
  412. upload_data = {'file': open(host_src, 'rb')}
  413. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  414. response = send_request_with_retry(
  415. self.session,
  416. 'POST',
  417. f'{self.runtime_url}/upload_file',
  418. files=upload_data,
  419. params=params,
  420. timeout=300,
  421. )
  422. if response.status_code == 200:
  423. self.log(
  424. 'debug',
  425. f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
  426. )
  427. return
  428. else:
  429. error_message = response.text
  430. raise Exception(
  431. f'[Runtime (ID={self.runtime_id})] Copy operation failed: {error_message}'
  432. )
  433. except TimeoutError:
  434. raise TimeoutError(
  435. f'[Runtime (ID={self.runtime_id})] Copy operation timed out'
  436. )
  437. except Exception as e:
  438. raise RuntimeError(
  439. f'[Runtime (ID={self.runtime_id})] Copy operation failed: {str(e)}'
  440. )
  441. finally:
  442. if recursive:
  443. os.unlink(temp_zip_path)
  444. self.log(
  445. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  446. )
  447. def list_files(self, path: str | None = None) -> list[str]:
  448. try:
  449. data = {}
  450. if path is not None:
  451. data['path'] = path
  452. response = send_request_with_retry(
  453. self.session,
  454. 'POST',
  455. f'{self.runtime_url}/list_files',
  456. json=data,
  457. timeout=30,
  458. )
  459. if response.status_code == 200:
  460. response_json = response.json()
  461. assert isinstance(response_json, list)
  462. return response_json
  463. else:
  464. error_message = response.text
  465. raise Exception(
  466. f'[Runtime (ID={self.runtime_id})] List files operation failed: {error_message}'
  467. )
  468. except TimeoutError:
  469. raise TimeoutError(
  470. f'[Runtime (ID={self.runtime_id})] List files operation timed out'
  471. )
  472. except Exception as e:
  473. raise RuntimeError(
  474. f'[Runtime (ID={self.runtime_id})] List files operation failed: {str(e)}'
  475. )
  476. def copy_from(self, path: str) -> bytes:
  477. """Zip all files in the sandbox and return as a stream of bytes."""
  478. try:
  479. params = {'path': path}
  480. response = send_request_with_retry(
  481. self.session,
  482. 'GET',
  483. f'{self.runtime_url}/download_files',
  484. params=params,
  485. timeout=30,
  486. )
  487. if response.status_code == 200:
  488. return response.content
  489. else:
  490. error_message = response.text
  491. raise Exception(
  492. f'[Runtime (ID={self.runtime_id})] Copy operation failed: {error_message}'
  493. )
  494. except requests.Timeout:
  495. raise TimeoutError(
  496. f'[Runtime (ID={self.runtime_id})] Copy operation timed out'
  497. )
  498. except Exception as e:
  499. raise RuntimeError(
  500. f'[Runtime (ID={self.runtime_id})] Copy operation failed: {str(e)}'
  501. )
  502. def send_status_message(self, message: str):
  503. """Sends a status message if the callback function was provided."""
  504. if self.status_message_callback:
  505. self.status_message_callback(message)