remote_runtime.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. import os
  2. import tempfile
  3. import threading
  4. from pathlib import Path
  5. from typing import Callable, Optional
  6. from zipfile import ZipFile
  7. import requests
  8. import tenacity
  9. from openhands.core.config import AppConfig
  10. from openhands.events import EventStream
  11. from openhands.events.action import (
  12. BrowseInteractiveAction,
  13. BrowseURLAction,
  14. CmdRunAction,
  15. FileEditAction,
  16. FileReadAction,
  17. FileWriteAction,
  18. IPythonRunCellAction,
  19. )
  20. from openhands.events.action.action import Action
  21. from openhands.events.observation import (
  22. ErrorObservation,
  23. NullObservation,
  24. Observation,
  25. )
  26. from openhands.events.serialization import event_to_dict, observation_from_dict
  27. from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
  28. from openhands.runtime.base import (
  29. Runtime,
  30. RuntimeDisconnectedError,
  31. RuntimeNotReadyError,
  32. )
  33. from openhands.runtime.builder.remote import RemoteRuntimeBuilder
  34. from openhands.runtime.plugins import PluginRequirement
  35. from openhands.runtime.utils.command import get_remote_startup_command
  36. from openhands.runtime.utils.request import (
  37. send_request,
  38. )
  39. from openhands.runtime.utils.runtime_build import build_runtime_image
  40. from openhands.utils.async_utils import call_sync_from_async
  41. from openhands.utils.tenacity_stop import stop_if_should_exit
  42. class RemoteRuntime(Runtime):
  43. """This runtime will connect to a remote oh-runtime-client."""
  44. port: int = 60000 # default port for the remote runtime client
  45. def __init__(
  46. self,
  47. config: AppConfig,
  48. event_stream: EventStream,
  49. sid: str = 'default',
  50. plugins: list[PluginRequirement] | None = None,
  51. env_vars: dict[str, str] | None = None,
  52. status_callback: Optional[Callable] = None,
  53. attach_to_existing: bool = False,
  54. ):
  55. # We need to set session and action_semaphore before the __init__ below, or we get odd errors
  56. self.session = requests.Session()
  57. self.action_semaphore = threading.Semaphore(1)
  58. super().__init__(
  59. config,
  60. event_stream,
  61. sid,
  62. plugins,
  63. env_vars,
  64. status_callback,
  65. attach_to_existing,
  66. )
  67. if self.config.sandbox.api_key is None:
  68. raise ValueError(
  69. 'API key is required to use the remote runtime. '
  70. 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).'
  71. )
  72. self.session.headers.update({'X-API-Key': self.config.sandbox.api_key})
  73. if self.config.workspace_base is not None:
  74. self.log(
  75. 'debug',
  76. 'Setting workspace_base is not supported in the remote runtime.',
  77. )
  78. self.runtime_builder = RemoteRuntimeBuilder(
  79. self.config.sandbox.remote_runtime_api_url, self.config.sandbox.api_key
  80. )
  81. self.runtime_id: str | None = None
  82. self.runtime_url: str | None = None
  83. async def connect(self):
  84. try:
  85. await call_sync_from_async(self._start_or_attach_to_runtime)
  86. except RuntimeNotReadyError:
  87. self.log('error', 'Runtime failed to start, timed out before ready')
  88. raise
  89. await call_sync_from_async(self.setup_initial_env)
  90. def _start_or_attach_to_runtime(self):
  91. existing_runtime = self._check_existing_runtime()
  92. if existing_runtime:
  93. self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
  94. elif self.attach_to_existing:
  95. raise RuntimeError('Could not find existing runtime to attach to.')
  96. else:
  97. self.send_status_message('STATUS$STARTING_CONTAINER')
  98. if self.config.sandbox.runtime_container_image is None:
  99. self.log(
  100. 'info',
  101. f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
  102. )
  103. self._build_runtime()
  104. else:
  105. self.log(
  106. 'info',
  107. f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
  108. )
  109. self.container_image = self.config.sandbox.runtime_container_image
  110. self._start_runtime()
  111. assert (
  112. self.runtime_id is not None
  113. ), 'Runtime ID is not set. This should never happen.'
  114. assert (
  115. self.runtime_url is not None
  116. ), 'Runtime URL is not set. This should never happen.'
  117. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  118. if not self.attach_to_existing:
  119. self.log('info', 'Waiting for runtime to be alive...')
  120. self._wait_until_alive()
  121. if not self.attach_to_existing:
  122. self.log('info', 'Runtime is ready.')
  123. self.send_status_message(' ')
  124. def _check_existing_runtime(self) -> bool:
  125. try:
  126. response = self._send_request(
  127. 'GET',
  128. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  129. is_retry=False,
  130. timeout=5,
  131. )
  132. except requests.HTTPError as e:
  133. if e.response.status_code == 404:
  134. return False
  135. self.log('debug', f'Error while looking for remote runtime: {e}')
  136. raise
  137. data = response.json()
  138. status = data.get('status')
  139. if status == 'running':
  140. self._parse_runtime_response(response)
  141. return True
  142. elif status == 'stopped':
  143. self.log('debug', 'Found existing remote runtime, but it is stopped')
  144. return False
  145. elif status == 'paused':
  146. self.log('debug', 'Found existing remote runtime, but it is paused')
  147. self._parse_runtime_response(response)
  148. self._resume_runtime()
  149. return True
  150. else:
  151. self.log('error', f'Invalid response from runtime API: {data}')
  152. return False
  153. def _build_runtime(self):
  154. self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
  155. response = self._send_request(
  156. 'GET',
  157. f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
  158. is_retry=False,
  159. timeout=10,
  160. )
  161. response_json = response.json()
  162. registry_prefix = response_json['registry_prefix']
  163. os.environ['OH_RUNTIME_RUNTIME_IMAGE_REPO'] = (
  164. registry_prefix.rstrip('/') + '/runtime'
  165. )
  166. self.log(
  167. 'debug',
  168. f'Runtime image repo: {os.environ["OH_RUNTIME_RUNTIME_IMAGE_REPO"]}',
  169. )
  170. if self.config.sandbox.runtime_extra_deps:
  171. self.log(
  172. 'debug',
  173. f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
  174. )
  175. # Build the container image
  176. self.container_image = build_runtime_image(
  177. self.config.sandbox.base_container_image,
  178. self.runtime_builder,
  179. platform=self.config.sandbox.platform,
  180. extra_deps=self.config.sandbox.runtime_extra_deps,
  181. force_rebuild=self.config.sandbox.force_rebuild_runtime,
  182. )
  183. response = self._send_request(
  184. 'GET',
  185. f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
  186. is_retry=False,
  187. params={'image': self.container_image},
  188. timeout=10,
  189. )
  190. if not response.json()['exists']:
  191. raise RuntimeError(f'Container image {self.container_image} does not exist')
  192. def _start_runtime(self):
  193. # Prepare the request body for the /start endpoint
  194. plugin_args = []
  195. if self.plugins is not None and len(self.plugins) > 0:
  196. plugin_args = ['--plugins'] + [plugin.name for plugin in self.plugins]
  197. browsergym_args = []
  198. if self.config.sandbox.browsergym_eval_env is not None:
  199. browsergym_args = [
  200. '--browsergym-eval-env'
  201. ] + self.config.sandbox.browsergym_eval_env.split(' ')
  202. command = get_remote_startup_command(
  203. self.port,
  204. self.config.workspace_mount_path_in_sandbox,
  205. 'openhands' if self.config.run_as_openhands else 'root',
  206. self.config.sandbox.user_id,
  207. plugin_args,
  208. browsergym_args,
  209. )
  210. start_request = {
  211. 'image': self.container_image,
  212. 'command': command,
  213. 'working_dir': '/openhands/code/',
  214. 'environment': {'DEBUG': 'true'} if self.config.debug else {},
  215. 'session_id': self.sid,
  216. }
  217. # Start the sandbox using the /start endpoint
  218. response = self._send_request(
  219. 'POST',
  220. f'{self.config.sandbox.remote_runtime_api_url}/start',
  221. is_retry=False,
  222. json=start_request,
  223. )
  224. self._parse_runtime_response(response)
  225. self.log(
  226. 'debug',
  227. f'Runtime started. URL: {self.runtime_url}',
  228. )
  229. def _resume_runtime(self):
  230. self._send_request(
  231. 'POST',
  232. f'{self.config.sandbox.remote_runtime_api_url}/resume',
  233. is_retry=False,
  234. json={'runtime_id': self.runtime_id},
  235. timeout=30,
  236. )
  237. self.log('debug', 'Runtime resumed.')
  238. def _parse_runtime_response(self, response: requests.Response):
  239. start_response = response.json()
  240. self.runtime_id = start_response['runtime_id']
  241. self.runtime_url = start_response['url']
  242. if 'session_api_key' in start_response:
  243. self.session.headers.update(
  244. {'X-Session-API-Key': start_response['session_api_key']}
  245. )
  246. def _wait_until_alive(self):
  247. retry_decorator = tenacity.retry(
  248. stop=tenacity.stop_after_delay(
  249. self.config.sandbox.remote_runtime_init_timeout
  250. )
  251. | stop_if_should_exit(),
  252. reraise=True,
  253. retry=tenacity.retry_if_exception_type(RuntimeNotReadyError),
  254. wait=tenacity.wait_fixed(2),
  255. )
  256. return retry_decorator(self._wait_until_alive_impl)()
  257. def _wait_until_alive_impl(self):
  258. self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
  259. runtime_info_response = self._send_request(
  260. 'GET',
  261. f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
  262. )
  263. runtime_data = runtime_info_response.json()
  264. assert 'runtime_id' in runtime_data
  265. assert runtime_data['runtime_id'] == self.runtime_id
  266. assert 'pod_status' in runtime_data
  267. pod_status = runtime_data['pod_status']
  268. self.log('debug', f'Pod status: {pod_status}')
  269. # FIXME: We should fix it at the backend of /start endpoint, make sure
  270. # the pod is created before returning the response.
  271. # Retry a period of time to give the cluster time to start the pod
  272. if pod_status == 'Ready':
  273. try:
  274. self._send_request(
  275. 'GET',
  276. f'{self.runtime_url}/alive',
  277. ) # will raise exception if we don't get 200 back.
  278. except requests.HTTPError as e:
  279. self.log(
  280. 'warning', f"Runtime /alive failed, but pod says it's ready: {e}"
  281. )
  282. raise RuntimeNotReadyError(
  283. f'Runtime /alive failed to respond with 200: {e}'
  284. )
  285. return
  286. elif (
  287. pod_status == 'Not Found'
  288. or pod_status == 'Pending'
  289. or pod_status == 'Running'
  290. ): # nb: Running is not yet Ready
  291. raise RuntimeNotReadyError(
  292. f'Runtime (ID={self.runtime_id}) is not yet ready. Status: {pod_status}'
  293. )
  294. elif pod_status in ('Failed', 'Unknown'):
  295. # clean up the runtime
  296. self.close()
  297. raise RuntimeError(
  298. f'Runtime (ID={self.runtime_id}) failed to start. Current status: {pod_status}'
  299. )
  300. else:
  301. # Maybe this should be a hard failure, but passing through in case the API changes
  302. self.log('warning', f'Unknown pod status: {pod_status}')
  303. self.log(
  304. 'debug',
  305. f'Waiting for runtime pod to be active. Current status: {pod_status}',
  306. )
  307. raise RuntimeNotReadyError()
  308. def close(self, timeout: int = 10):
  309. if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
  310. self.session.close()
  311. return
  312. if self.runtime_id and self.session:
  313. try:
  314. response = self._send_request(
  315. 'POST',
  316. f'{self.config.sandbox.remote_runtime_api_url}/stop',
  317. is_retry=False,
  318. json={'runtime_id': self.runtime_id},
  319. timeout=timeout,
  320. )
  321. if response.status_code != 200:
  322. self.log(
  323. 'error',
  324. f'Failed to stop runtime: {response.text}',
  325. )
  326. else:
  327. self.log('debug', 'Runtime stopped.')
  328. except Exception as e:
  329. raise e
  330. finally:
  331. self.session.close()
  332. def run_action(self, action: Action, is_retry: bool = False) -> Observation:
  333. if action.timeout is None:
  334. action.timeout = self.config.sandbox.timeout
  335. if isinstance(action, FileEditAction):
  336. return self.edit(action)
  337. with self.action_semaphore:
  338. if not action.runnable:
  339. return NullObservation('')
  340. action_type = action.action # type: ignore[attr-defined]
  341. if action_type not in ACTION_TYPE_TO_CLASS:
  342. raise ValueError(f'Action {action_type} does not exist.')
  343. if not hasattr(self, action_type):
  344. return ErrorObservation(
  345. f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.',
  346. error_id='AGENT_ERROR$BAD_ACTION',
  347. )
  348. assert action.timeout is not None
  349. try:
  350. request_body = {'action': event_to_dict(action)}
  351. self.log('debug', f'Request body: {request_body}')
  352. response = self._send_request(
  353. 'POST',
  354. f'{self.runtime_url}/execute_action',
  355. is_retry=False,
  356. json=request_body,
  357. # wait a few more seconds to get the timeout error from client side
  358. timeout=action.timeout + 5,
  359. )
  360. output = response.json()
  361. obs = observation_from_dict(output)
  362. obs._cause = action.id # type: ignore[attr-defined]
  363. except requests.Timeout:
  364. raise RuntimeError(
  365. f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
  366. )
  367. return obs
  368. def _send_request(self, method, url, is_retry=False, **kwargs):
  369. is_runtime_request = self.runtime_url and self.runtime_url in url
  370. try:
  371. return send_request(self.session, method, url, **kwargs)
  372. except requests.Timeout:
  373. self.log('error', 'No response received within the timeout period.')
  374. raise
  375. except requests.HTTPError as e:
  376. if is_runtime_request and e.response.status_code == 404:
  377. raise RuntimeDisconnectedError(
  378. f'404 error while connecting to {self.runtime_url}'
  379. )
  380. elif is_runtime_request and e.response.status_code == 503:
  381. if not is_retry:
  382. self.log('warning', 'Runtime appears to be paused. Resuming...')
  383. self._resume_runtime()
  384. self._wait_until_alive()
  385. return self._send_request(method, url, True, **kwargs)
  386. else:
  387. raise e
  388. else:
  389. raise e
  390. def run(self, action: CmdRunAction) -> Observation:
  391. return self.run_action(action)
  392. def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  393. return self.run_action(action)
  394. def read(self, action: FileReadAction) -> Observation:
  395. return self.run_action(action)
  396. def write(self, action: FileWriteAction) -> Observation:
  397. return self.run_action(action)
  398. def browse(self, action: BrowseURLAction) -> Observation:
  399. return self.run_action(action)
  400. def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  401. return self.run_action(action)
  402. def copy_to(
  403. self, host_src: str, sandbox_dest: str, recursive: bool = False
  404. ) -> None:
  405. if not os.path.exists(host_src):
  406. raise FileNotFoundError(f'Source file {host_src} does not exist')
  407. try:
  408. if recursive:
  409. with tempfile.NamedTemporaryFile(
  410. suffix='.zip', delete=False
  411. ) as temp_zip:
  412. temp_zip_path = temp_zip.name
  413. with ZipFile(temp_zip_path, 'w') as zipf:
  414. for root, _, files in os.walk(host_src):
  415. for file in files:
  416. file_path = os.path.join(root, file)
  417. arcname = os.path.relpath(
  418. file_path, os.path.dirname(host_src)
  419. )
  420. zipf.write(file_path, arcname)
  421. upload_data = {'file': open(temp_zip_path, 'rb')}
  422. else:
  423. upload_data = {'file': open(host_src, 'rb')}
  424. params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
  425. response = self._send_request(
  426. 'POST',
  427. f'{self.runtime_url}/upload_file',
  428. is_retry=False,
  429. files=upload_data,
  430. params=params,
  431. timeout=300,
  432. )
  433. self.log(
  434. 'debug',
  435. f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
  436. )
  437. finally:
  438. if recursive:
  439. os.unlink(temp_zip_path)
  440. self.log(
  441. 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
  442. )
  443. def list_files(self, path: str | None = None) -> list[str]:
  444. data = {}
  445. if path is not None:
  446. data['path'] = path
  447. response = self._send_request(
  448. 'POST',
  449. f'{self.runtime_url}/list_files',
  450. is_retry=False,
  451. json=data,
  452. timeout=30,
  453. )
  454. response_json = response.json()
  455. assert isinstance(response_json, list)
  456. return response_json
  457. def copy_from(self, path: str) -> Path:
  458. """Zip all files in the sandbox and return as a stream of bytes."""
  459. params = {'path': path}
  460. response = self._send_request(
  461. 'GET',
  462. f'{self.runtime_url}/download_files',
  463. is_retry=False,
  464. params=params,
  465. stream=True,
  466. timeout=30,
  467. )
  468. temp_file = tempfile.NamedTemporaryFile(delete=False)
  469. for chunk in response.iter_content(chunk_size=8192):
  470. if chunk: # filter out keep-alive new chunks
  471. temp_file.write(chunk)
  472. return Path(temp_file.name)