| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553 |
- import os
- import tempfile
- import threading
- from pathlib import Path
- from typing import Callable, Optional
- from urllib.parse import urlparse
- from zipfile import ZipFile
- import requests
- import tenacity
- from openhands.core.config import AppConfig
- from openhands.events import EventStream
- from openhands.events.action import (
- BrowseInteractiveAction,
- BrowseURLAction,
- CmdRunAction,
- FileEditAction,
- FileReadAction,
- FileWriteAction,
- IPythonRunCellAction,
- )
- from openhands.events.action.action import Action
- from openhands.events.observation import (
- ErrorObservation,
- NullObservation,
- Observation,
- )
- from openhands.events.serialization import event_to_dict, observation_from_dict
- from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
- from openhands.runtime.base import (
- Runtime,
- RuntimeDisconnectedError,
- RuntimeNotReadyError,
- )
- from openhands.runtime.builder.remote import RemoteRuntimeBuilder
- from openhands.runtime.plugins import PluginRequirement
- from openhands.runtime.utils.command import get_remote_startup_command
- from openhands.runtime.utils.request import (
- send_request,
- )
- from openhands.runtime.utils.runtime_build import build_runtime_image
- from openhands.utils.async_utils import call_sync_from_async
- from openhands.utils.tenacity_stop import stop_if_should_exit
- class RemoteRuntime(Runtime):
- """This runtime will connect to a remote oh-runtime-client."""
- port: int = 60000 # default port for the remote runtime client
- def __init__(
- self,
- config: AppConfig,
- event_stream: EventStream,
- sid: str = 'default',
- plugins: list[PluginRequirement] | None = None,
- env_vars: dict[str, str] | None = None,
- status_callback: Optional[Callable] = None,
- attach_to_existing: bool = False,
- headless_mode: bool = True,
- ):
- # We need to set session and action_semaphore before the __init__ below, or we get odd errors
- self.session = requests.Session()
- self.action_semaphore = threading.Semaphore(1)
- super().__init__(
- config,
- event_stream,
- sid,
- plugins,
- env_vars,
- status_callback,
- attach_to_existing,
- headless_mode,
- )
- if self.config.sandbox.api_key is None:
- raise ValueError(
- 'API key is required to use the remote runtime. '
- 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).'
- )
- self.session.headers.update({'X-API-Key': self.config.sandbox.api_key})
- if self.config.workspace_base is not None:
- self.log(
- 'debug',
- 'Setting workspace_base is not supported in the remote runtime.',
- )
- self.runtime_builder = RemoteRuntimeBuilder(
- self.config.sandbox.remote_runtime_api_url, self.config.sandbox.api_key
- )
- self.runtime_id: str | None = None
- self.runtime_url: str | None = None
- self._runtime_initialized: bool = False
- self._vscode_url: str | None = None # initial dummy value
- async def connect(self):
- try:
- await call_sync_from_async(self._start_or_attach_to_runtime)
- except RuntimeNotReadyError:
- self.log('error', 'Runtime failed to start, timed out before ready')
- raise
- await call_sync_from_async(self.setup_initial_env)
- self._runtime_initialized = True
- def _start_or_attach_to_runtime(self):
- existing_runtime = self._check_existing_runtime()
- if existing_runtime:
- self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
- elif self.attach_to_existing:
- raise RuntimeError('Could not find existing runtime to attach to.')
- else:
- self.send_status_message('STATUS$STARTING_CONTAINER')
- if self.config.sandbox.runtime_container_image is None:
- self.log(
- 'info',
- f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
- )
- self._build_runtime()
- else:
- self.log(
- 'info',
- f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
- )
- self.container_image = self.config.sandbox.runtime_container_image
- self._start_runtime()
- assert (
- self.runtime_id is not None
- ), 'Runtime ID is not set. This should never happen.'
- assert (
- self.runtime_url is not None
- ), 'Runtime URL is not set. This should never happen.'
- self.send_status_message('STATUS$WAITING_FOR_CLIENT')
- if not self.attach_to_existing:
- self.log('info', 'Waiting for runtime to be alive...')
- self._wait_until_alive()
- if not self.attach_to_existing:
- self.log('info', 'Runtime is ready.')
- self.send_status_message(' ')
- def _check_existing_runtime(self) -> bool:
- try:
- with self._send_request(
- 'GET',
- f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
- is_retry=False,
- timeout=5,
- ) as response:
- data = response.json()
- status = data.get('status')
- if status == 'running' or status == 'paused':
- self._parse_runtime_response(response)
- except requests.HTTPError as e:
- if e.response.status_code == 404:
- return False
- self.log('debug', f'Error while looking for remote runtime: {e}')
- raise
- if status == 'running':
- return True
- elif status == 'stopped':
- self.log('debug', 'Found existing remote runtime, but it is stopped')
- return False
- elif status == 'paused':
- self.log('debug', 'Found existing remote runtime, but it is paused')
- self._resume_runtime()
- return True
- else:
- self.log('error', f'Invalid response from runtime API: {data}')
- return False
- def _build_runtime(self):
- self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
- with self._send_request(
- 'GET',
- f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
- is_retry=False,
- timeout=10,
- ) as response:
- response_json = response.json()
- registry_prefix = response_json['registry_prefix']
- os.environ['OH_RUNTIME_RUNTIME_IMAGE_REPO'] = (
- registry_prefix.rstrip('/') + '/runtime'
- )
- self.log(
- 'debug',
- f'Runtime image repo: {os.environ["OH_RUNTIME_RUNTIME_IMAGE_REPO"]}',
- )
- if self.config.sandbox.runtime_extra_deps:
- self.log(
- 'debug',
- f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
- )
- # Build the container image
- self.container_image = build_runtime_image(
- self.config.sandbox.base_container_image,
- self.runtime_builder,
- platform=self.config.sandbox.platform,
- extra_deps=self.config.sandbox.runtime_extra_deps,
- force_rebuild=self.config.sandbox.force_rebuild_runtime,
- )
- with self._send_request(
- 'GET',
- f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
- is_retry=False,
- params={'image': self.container_image},
- timeout=10,
- ) as response:
- if not response.json()['exists']:
- raise RuntimeError(
- f'Container image {self.container_image} does not exist'
- )
- def _start_runtime(self):
- # Prepare the request body for the /start endpoint
- plugin_args = []
- if self.plugins is not None and len(self.plugins) > 0:
- plugin_args = ['--plugins'] + [plugin.name for plugin in self.plugins]
- browsergym_args = []
- if self.config.sandbox.browsergym_eval_env is not None:
- browsergym_args = [
- '--browsergym-eval-env'
- ] + self.config.sandbox.browsergym_eval_env.split(' ')
- command = get_remote_startup_command(
- self.port,
- self.config.workspace_mount_path_in_sandbox,
- 'openhands' if self.config.run_as_openhands else 'root',
- self.config.sandbox.user_id,
- plugin_args,
- browsergym_args,
- )
- start_request = {
- 'image': self.container_image,
- 'command': command,
- 'working_dir': '/openhands/code/',
- 'environment': {'DEBUG': 'true'} if self.config.debug else {},
- 'session_id': self.sid,
- }
- # Start the sandbox using the /start endpoint
- with self._send_request(
- 'POST',
- f'{self.config.sandbox.remote_runtime_api_url}/start',
- is_retry=False,
- json=start_request,
- ) as response:
- self._parse_runtime_response(response)
- self.log(
- 'debug',
- f'Runtime started. URL: {self.runtime_url}',
- )
- def _resume_runtime(self):
- with self._send_request(
- 'POST',
- f'{self.config.sandbox.remote_runtime_api_url}/resume',
- is_retry=False,
- json={'runtime_id': self.runtime_id},
- timeout=30,
- ):
- pass
- self.log('debug', 'Runtime resumed.')
- def _parse_runtime_response(self, response: requests.Response):
- start_response = response.json()
- self.runtime_id = start_response['runtime_id']
- self.runtime_url = start_response['url']
- if 'session_api_key' in start_response:
- self.session.headers.update(
- {'X-Session-API-Key': start_response['session_api_key']}
- )
- @property
- def vscode_url(self) -> str | None:
- if self.vscode_enabled and self._runtime_initialized:
- if (
- hasattr(self, '_vscode_url') and self._vscode_url is not None
- ): # cached value
- return self._vscode_url
- with self._send_request(
- 'GET',
- f'{self.runtime_url}/vscode/connection_token',
- timeout=10,
- ) as response:
- response_json = response.json()
- assert isinstance(response_json, dict)
- if response_json['token'] is None:
- return None
- # parse runtime_url to get vscode_url
- _parsed_url = urlparse(self.runtime_url)
- assert isinstance(_parsed_url.scheme, str) and isinstance(
- _parsed_url.netloc, str
- )
- self._vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
- self.log(
- 'debug',
- f'VSCode URL: {self._vscode_url}',
- )
- return self._vscode_url
- else:
- return None
- def _wait_until_alive(self):
- retry_decorator = tenacity.retry(
- stop=tenacity.stop_after_delay(
- self.config.sandbox.remote_runtime_init_timeout
- )
- | stop_if_should_exit(),
- reraise=True,
- retry=tenacity.retry_if_exception_type(RuntimeNotReadyError),
- wait=tenacity.wait_fixed(2),
- )
- return retry_decorator(self._wait_until_alive_impl)()
- def _wait_until_alive_impl(self):
- self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
- with self._send_request(
- 'GET',
- f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
- ) as runtime_info_response:
- runtime_data = runtime_info_response.json()
- assert 'runtime_id' in runtime_data
- assert runtime_data['runtime_id'] == self.runtime_id
- assert 'pod_status' in runtime_data
- pod_status = runtime_data['pod_status']
- self.log('debug', f'Pod status: {pod_status}')
- # FIXME: We should fix it at the backend of /start endpoint, make sure
- # the pod is created before returning the response.
- # Retry a period of time to give the cluster time to start the pod
- if pod_status == 'Ready':
- try:
- with self._send_request(
- 'GET',
- f'{self.runtime_url}/alive',
- ): # will raise exception if we don't get 200 back.
- pass
- except requests.HTTPError as e:
- self.log(
- 'warning', f"Runtime /alive failed, but pod says it's ready: {e}"
- )
- raise RuntimeNotReadyError(
- f'Runtime /alive failed to respond with 200: {e}'
- )
- return
- elif (
- pod_status == 'Not Found'
- or pod_status == 'Pending'
- or pod_status == 'Running'
- ): # nb: Running is not yet Ready
- raise RuntimeNotReadyError(
- f'Runtime (ID={self.runtime_id}) is not yet ready. Status: {pod_status}'
- )
- elif pod_status in ('Failed', 'Unknown'):
- # clean up the runtime
- self.close()
- raise RuntimeError(
- f'Runtime (ID={self.runtime_id}) failed to start. Current status: {pod_status}'
- )
- else:
- # Maybe this should be a hard failure, but passing through in case the API changes
- self.log('warning', f'Unknown pod status: {pod_status}')
- self.log(
- 'debug',
- f'Waiting for runtime pod to be active. Current status: {pod_status}',
- )
- raise RuntimeNotReadyError()
- def close(self, timeout: int = 10):
- if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
- self.session.close()
- return
- if self.runtime_id and self.session:
- try:
- with self._send_request(
- 'POST',
- f'{self.config.sandbox.remote_runtime_api_url}/stop',
- is_retry=False,
- json={'runtime_id': self.runtime_id},
- timeout=timeout,
- ):
- self.log('debug', 'Runtime stopped.')
- except Exception as e:
- raise e
- finally:
- self.session.close()
- def run_action(self, action: Action, is_retry: bool = False) -> Observation:
- if action.timeout is None:
- action.timeout = self.config.sandbox.timeout
- if isinstance(action, FileEditAction):
- return self.edit(action)
- with self.action_semaphore:
- if not action.runnable:
- return NullObservation('')
- action_type = action.action # type: ignore[attr-defined]
- if action_type not in ACTION_TYPE_TO_CLASS:
- raise ValueError(f'Action {action_type} does not exist.')
- if not hasattr(self, action_type):
- return ErrorObservation(
- f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.',
- error_id='AGENT_ERROR$BAD_ACTION',
- )
- assert action.timeout is not None
- try:
- request_body = {'action': event_to_dict(action)}
- self.log('debug', f'Request body: {request_body}')
- with self._send_request(
- 'POST',
- f'{self.runtime_url}/execute_action',
- is_retry=False,
- json=request_body,
- # wait a few more seconds to get the timeout error from client side
- timeout=action.timeout + 5,
- ) as response:
- output = response.json()
- obs = observation_from_dict(output)
- obs._cause = action.id # type: ignore[attr-defined]
- except requests.Timeout:
- raise RuntimeError(
- f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
- )
- return obs
- def _send_request(self, method, url, is_retry=False, **kwargs):
- is_runtime_request = self.runtime_url and self.runtime_url in url
- try:
- return send_request(self.session, method, url, **kwargs)
- except requests.Timeout:
- self.log('error', 'No response received within the timeout period.')
- raise
- except requests.HTTPError as e:
- if is_runtime_request and e.response.status_code == 404:
- raise RuntimeDisconnectedError(
- f'404 error while connecting to {self.runtime_url}'
- )
- elif is_runtime_request and e.response.status_code == 503:
- if not is_retry:
- self.log('warning', 'Runtime appears to be paused. Resuming...')
- self._resume_runtime()
- self._wait_until_alive()
- return self._send_request(method, url, True, **kwargs)
- else:
- raise e
- else:
- raise e
- def run(self, action: CmdRunAction) -> Observation:
- return self.run_action(action)
- def run_ipython(self, action: IPythonRunCellAction) -> Observation:
- return self.run_action(action)
- def read(self, action: FileReadAction) -> Observation:
- return self.run_action(action)
- def write(self, action: FileWriteAction) -> Observation:
- return self.run_action(action)
- def browse(self, action: BrowseURLAction) -> Observation:
- return self.run_action(action)
- def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
- return self.run_action(action)
- def copy_to(
- self, host_src: str, sandbox_dest: str, recursive: bool = False
- ) -> None:
- if not os.path.exists(host_src):
- raise FileNotFoundError(f'Source file {host_src} does not exist')
- try:
- if recursive:
- with tempfile.NamedTemporaryFile(
- suffix='.zip', delete=False
- ) as temp_zip:
- temp_zip_path = temp_zip.name
- with ZipFile(temp_zip_path, 'w') as zipf:
- for root, _, files in os.walk(host_src):
- for file in files:
- file_path = os.path.join(root, file)
- arcname = os.path.relpath(
- file_path, os.path.dirname(host_src)
- )
- zipf.write(file_path, arcname)
- upload_data = {'file': open(temp_zip_path, 'rb')}
- else:
- upload_data = {'file': open(host_src, 'rb')}
- params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
- with self._send_request(
- 'POST',
- f'{self.runtime_url}/upload_file',
- is_retry=False,
- files=upload_data,
- params=params,
- timeout=300,
- ) as response:
- self.log(
- 'debug',
- f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
- )
- finally:
- if recursive:
- os.unlink(temp_zip_path)
- self.log(
- 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
- )
- def list_files(self, path: str | None = None) -> list[str]:
- data = {}
- if path is not None:
- data['path'] = path
- with self._send_request(
- 'POST',
- f'{self.runtime_url}/list_files',
- is_retry=False,
- json=data,
- timeout=30,
- ) as response:
- response_json = response.json()
- assert isinstance(response_json, list)
- return response_json
- def copy_from(self, path: str) -> Path:
- """Zip all files in the sandbox and return as a stream of bytes."""
- params = {'path': path}
- with self._send_request(
- 'GET',
- f'{self.runtime_url}/download_files',
- is_retry=False,
- params=params,
- stream=True,
- timeout=30,
- ) as response:
- temp_file = tempfile.NamedTemporaryFile(delete=False)
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # filter out keep-alive new chunks
- temp_file.write(chunk)
- return Path(temp_file.name)
|