| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- """
- This is the main file for the runtime client.
- It is responsible for executing actions received from OpenHands backend and producing observations.
- NOTE: this will be executed inside the docker sandbox.
- """
- import argparse
- import asyncio
- import base64
- import io
- import mimetypes
- import os
- import shutil
- import tempfile
- import time
- from contextlib import asynccontextmanager
- from pathlib import Path
- from zipfile import ZipFile
- from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile
- from fastapi.exceptions import RequestValidationError
- from fastapi.responses import JSONResponse, StreamingResponse
- from fastapi.security import APIKeyHeader
- from pydantic import BaseModel
- from starlette.exceptions import HTTPException as StarletteHTTPException
- from uvicorn import run
- from openhands.core.logger import openhands_logger as logger
- from openhands.events.action import (
- Action,
- BrowseInteractiveAction,
- BrowseURLAction,
- CmdRunAction,
- FileReadAction,
- FileWriteAction,
- IPythonRunCellAction,
- )
- from openhands.events.observation import (
- CmdOutputObservation,
- ErrorObservation,
- FileReadObservation,
- FileWriteObservation,
- IPythonRunCellObservation,
- Observation,
- )
- from openhands.events.serialization import event_from_dict, event_to_dict
- from openhands.runtime.browser import browse
- from openhands.runtime.browser.browser_env import BrowserEnv
- from openhands.runtime.plugins import ALL_PLUGINS, JupyterPlugin, Plugin, VSCodePlugin
- from openhands.runtime.utils.bash import BashSession
- from openhands.runtime.utils.files import insert_lines, read_lines
- from openhands.runtime.utils.runtime_init import init_user_and_working_directory
- from openhands.runtime.utils.system import check_port_available
- from openhands.utils.async_utils import wait_all
- class ActionRequest(BaseModel):
- action: dict
- ROOT_GID = 0
- INIT_COMMANDS = [
- 'git config --global user.name "openhands" && git config --global user.email "openhands@all-hands.dev" && alias git="git --no-pager"',
- ]
- SESSION_API_KEY = os.environ.get('SESSION_API_KEY')
- api_key_header = APIKeyHeader(name='X-Session-API-Key', auto_error=False)
- def verify_api_key(api_key: str = Depends(api_key_header)):
- if SESSION_API_KEY and api_key != SESSION_API_KEY:
- raise HTTPException(status_code=403, detail='Invalid API Key')
- return api_key
- class ActionExecutor:
- """ActionExecutor is running inside docker sandbox.
- It is responsible for executing actions received from OpenHands backend and producing observations.
- """
- def __init__(
- self,
- plugins_to_load: list[Plugin],
- work_dir: str,
- username: str,
- user_id: int,
- browsergym_eval_env: str | None,
- ) -> None:
- self.plugins_to_load = plugins_to_load
- self._initial_pwd = work_dir
- self.username = username
- self.user_id = user_id
- _updated_user_id = init_user_and_working_directory(
- username=username, user_id=self.user_id, initial_pwd=work_dir
- )
- if _updated_user_id is not None:
- self.user_id = _updated_user_id
- self.bash_session = BashSession(
- work_dir=work_dir,
- username=username,
- )
- self.lock = asyncio.Lock()
- self.plugins: dict[str, Plugin] = {}
- self.browser = BrowserEnv(browsergym_eval_env)
- self.start_time = time.time()
- self.last_execution_time = self.start_time
- @property
- def initial_pwd(self):
- return self._initial_pwd
- async def ainit(self):
- await wait_all(
- (self._init_plugin(plugin) for plugin in self.plugins_to_load),
- timeout=30,
- )
- # This is a temporary workaround
- # TODO: refactor AgentSkills to be part of JupyterPlugin
- # AFTER ServerRuntime is deprecated
- if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
- obs = await self.run_ipython(
- IPythonRunCellAction(
- code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
- )
- )
- logger.debug(f'AgentSkills initialized: {obs}')
- await self._init_bash_commands()
- logger.debug('Runtime client initialized.')
- async def _init_plugin(self, plugin: Plugin):
- await plugin.initialize(self.username)
- self.plugins[plugin.name] = plugin
- logger.debug(f'Initializing plugin: {plugin.name}')
- if isinstance(plugin, JupyterPlugin):
- await self.run_ipython(
- IPythonRunCellAction(
- code=f'import os; os.chdir("{self.bash_session.pwd}")'
- )
- )
- async def _init_bash_commands(self):
- logger.debug(f'Initializing by running {len(INIT_COMMANDS)} bash commands...')
- for command in INIT_COMMANDS:
- action = CmdRunAction(command=command)
- action.timeout = 300
- logger.debug(f'Executing init command: {command}')
- obs = await self.run(action)
- assert isinstance(obs, CmdOutputObservation)
- logger.debug(
- f'Init command outputs (exit code: {obs.exit_code}): {obs.content}'
- )
- assert obs.exit_code == 0
- logger.debug('Bash init commands completed')
- async def run_action(self, action) -> Observation:
- async with self.lock:
- action_type = action.action
- logger.debug(f'Running action:\n{action}')
- observation = await getattr(self, action_type)(action)
- logger.debug(f'Action output:\n{observation}')
- return observation
- async def run(
- self, action: CmdRunAction
- ) -> CmdOutputObservation | ErrorObservation:
- return self.bash_session.run(action)
- async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
- if 'jupyter' in self.plugins:
- _jupyter_plugin: JupyterPlugin = self.plugins['jupyter'] # type: ignore
- # This is used to make AgentSkills in Jupyter aware of the
- # current working directory in Bash
- jupyter_pwd = getattr(self, '_jupyter_pwd', None)
- if self.bash_session.pwd != jupyter_pwd:
- logger.debug(
- f'{self.bash_session.pwd} != {jupyter_pwd} -> reset Jupyter PWD'
- )
- reset_jupyter_pwd_code = (
- f'import os; os.chdir("{self.bash_session.pwd}")'
- )
- _aux_action = IPythonRunCellAction(code=reset_jupyter_pwd_code)
- _reset_obs: IPythonRunCellObservation = await _jupyter_plugin.run(
- _aux_action
- )
- logger.debug(
- f'Changed working directory in IPython to: {self.bash_session.pwd}. Output: {_reset_obs}'
- )
- self._jupyter_pwd = self.bash_session.pwd
- obs: IPythonRunCellObservation = await _jupyter_plugin.run(action)
- obs.content = obs.content.rstrip()
- if action.include_extra:
- obs.content += (
- f'\n[Jupyter current working directory: {self.bash_session.pwd}]'
- )
- obs.content += f'\n[Jupyter Python interpreter: {_jupyter_plugin.python_interpreter_path}]'
- return obs
- else:
- raise RuntimeError(
- 'JupyterRequirement not found. Unable to run IPython action.'
- )
- def _resolve_path(self, path: str, working_dir: str) -> str:
- filepath = Path(path)
- if not filepath.is_absolute():
- return str(Path(working_dir) / filepath)
- return str(filepath)
- async def read(self, action: FileReadAction) -> Observation:
- # NOTE: the client code is running inside the sandbox,
- # so there's no need to check permission
- working_dir = self.bash_session.workdir
- filepath = self._resolve_path(action.path, working_dir)
- try:
- if filepath.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
- with open(filepath, 'rb') as file:
- image_data = file.read()
- encoded_image = base64.b64encode(image_data).decode('utf-8')
- mime_type, _ = mimetypes.guess_type(filepath)
- if mime_type is None:
- mime_type = 'image/png' # default to PNG if mime type cannot be determined
- encoded_image = f'data:{mime_type};base64,{encoded_image}'
- return FileReadObservation(path=filepath, content=encoded_image)
- elif filepath.lower().endswith('.pdf'):
- with open(filepath, 'rb') as file:
- pdf_data = file.read()
- encoded_pdf = base64.b64encode(pdf_data).decode('utf-8')
- encoded_pdf = f'data:application/pdf;base64,{encoded_pdf}'
- return FileReadObservation(path=filepath, content=encoded_pdf)
- elif filepath.lower().endswith(('.mp4', '.webm', '.ogg')):
- with open(filepath, 'rb') as file:
- video_data = file.read()
- encoded_video = base64.b64encode(video_data).decode('utf-8')
- mime_type, _ = mimetypes.guess_type(filepath)
- if mime_type is None:
- mime_type = 'video/mp4' # default to MP4 if MIME type cannot be determined
- encoded_video = f'data:{mime_type};base64,{encoded_video}'
- return FileReadObservation(path=filepath, content=encoded_video)
- with open(filepath, 'r', encoding='utf-8') as file:
- lines = read_lines(file.readlines(), action.start, action.end)
- except FileNotFoundError:
- return ErrorObservation(
- f'File not found: {filepath}. Your current working directory is {working_dir}.'
- )
- except UnicodeDecodeError:
- return ErrorObservation(f'File could not be decoded as utf-8: {filepath}.')
- except IsADirectoryError:
- return ErrorObservation(
- f'Path is a directory: {filepath}. You can only read files'
- )
- code_view = ''.join(lines)
- return FileReadObservation(path=filepath, content=code_view)
- async def write(self, action: FileWriteAction) -> Observation:
- working_dir = self.bash_session.workdir
- filepath = self._resolve_path(action.path, working_dir)
- insert = action.content.split('\n')
- try:
- if not os.path.exists(os.path.dirname(filepath)):
- os.makedirs(os.path.dirname(filepath))
- file_exists = os.path.exists(filepath)
- if file_exists:
- file_stat = os.stat(filepath)
- else:
- file_stat = None
- mode = 'w' if not file_exists else 'r+'
- try:
- with open(filepath, mode, encoding='utf-8') as file:
- if mode != 'w':
- all_lines = file.readlines()
- new_file = insert_lines(
- insert, all_lines, action.start, action.end
- )
- else:
- new_file = [i + '\n' for i in insert]
- file.seek(0)
- file.writelines(new_file)
- file.truncate()
- # Handle file permissions
- if file_exists:
- assert file_stat is not None
- # restore the original file permissions if the file already exists
- os.chmod(filepath, file_stat.st_mode)
- os.chown(filepath, file_stat.st_uid, file_stat.st_gid)
- else:
- # set the new file permissions if the file is new
- os.chmod(filepath, 0o664)
- os.chown(filepath, self.user_id, self.user_id)
- except FileNotFoundError:
- return ErrorObservation(f'File not found: {filepath}')
- except IsADirectoryError:
- return ErrorObservation(
- f'Path is a directory: {filepath}. You can only write to files'
- )
- except UnicodeDecodeError:
- return ErrorObservation(
- f'File could not be decoded as utf-8: {filepath}'
- )
- except PermissionError:
- return ErrorObservation(f'Malformed paths not permitted: {filepath}')
- return FileWriteObservation(content='', path=filepath)
- async def browse(self, action: BrowseURLAction) -> Observation:
- return await browse(action, self.browser)
- async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
- return await browse(action, self.browser)
- def close(self):
- self.bash_session.close()
- self.browser.close()
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('port', type=int, help='Port to listen on')
- parser.add_argument('--working-dir', type=str, help='Working directory')
- parser.add_argument('--plugins', type=str, help='Plugins to initialize', nargs='+')
- parser.add_argument(
- '--username', type=str, help='User to run as', default='openhands'
- )
- parser.add_argument('--user-id', type=int, help='User ID to run as', default=1000)
- parser.add_argument(
- '--browsergym-eval-env',
- type=str,
- help='BrowserGym environment used for browser evaluation',
- default=None,
- )
- # example: python client.py 8000 --working-dir /workspace --plugins JupyterRequirement
- args = parser.parse_args()
- os.environ['VSCODE_PORT'] = str(int(args.port) + 1)
- assert check_port_available(int(os.environ['VSCODE_PORT']))
- plugins_to_load: list[Plugin] = []
- if args.plugins:
- for plugin in args.plugins:
- if plugin not in ALL_PLUGINS:
- raise ValueError(f'Plugin {plugin} not found')
- plugins_to_load.append(ALL_PLUGINS[plugin]()) # type: ignore
- client: ActionExecutor | None = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- global client
- client = ActionExecutor(
- plugins_to_load,
- work_dir=args.working_dir,
- username=args.username,
- user_id=args.user_id,
- browsergym_eval_env=args.browsergym_eval_env,
- )
- await client.ainit()
- yield
- # Clean up & release the resources
- client.close()
- app = FastAPI(lifespan=lifespan)
- # TODO below 3 exception handlers were recommended by Sonnet.
- # Are these something we should keep?
- @app.exception_handler(Exception)
- async def global_exception_handler(request: Request, exc: Exception):
- logger.exception('Unhandled exception occurred:')
- return JSONResponse(
- status_code=500,
- content={
- 'message': 'An unexpected error occurred. Please try again later.'
- },
- )
- @app.exception_handler(StarletteHTTPException)
- async def http_exception_handler(request: Request, exc: StarletteHTTPException):
- logger.error(f'HTTP exception occurred: {exc.detail}')
- return JSONResponse(
- status_code=exc.status_code, content={'message': exc.detail}
- )
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(
- request: Request, exc: RequestValidationError
- ):
- logger.error(f'Validation error occurred: {exc}')
- return JSONResponse(
- status_code=422,
- content={'message': 'Invalid request parameters', 'details': exc.errors()},
- )
- @app.middleware('http')
- async def authenticate_requests(request: Request, call_next):
- if request.url.path != '/alive' and request.url.path != '/server_info':
- try:
- verify_api_key(request.headers.get('X-Session-API-Key'))
- except HTTPException as e:
- return e
- response = await call_next(request)
- return response
- @app.get('/server_info')
- async def get_server_info():
- assert client is not None
- current_time = time.time()
- uptime = current_time - client.start_time
- idle_time = current_time - client.last_execution_time
- return {'uptime': uptime, 'idle_time': idle_time}
- @app.post('/execute_action')
- async def execute_action(action_request: ActionRequest):
- assert client is not None
- try:
- action = event_from_dict(action_request.action)
- if not isinstance(action, Action):
- raise HTTPException(status_code=400, detail='Invalid action type')
- client.last_execution_time = time.time()
- observation = await client.run_action(action)
- return event_to_dict(observation)
- except Exception as e:
- logger.error(
- f'Error processing command: {str(e)}', exc_info=True, stack_info=True
- )
- raise HTTPException(status_code=500, detail=str(e))
- @app.post('/upload_file')
- async def upload_file(
- file: UploadFile, destination: str = '/', recursive: bool = False
- ):
- assert client is not None
- try:
- # Ensure the destination directory exists
- if not os.path.isabs(destination):
- raise HTTPException(
- status_code=400, detail='Destination must be an absolute path'
- )
- full_dest_path = destination
- if not os.path.exists(full_dest_path):
- os.makedirs(full_dest_path, exist_ok=True)
- if recursive or file.filename.endswith('.zip'):
- # For recursive uploads, we expect a zip file
- if not file.filename.endswith('.zip'):
- raise HTTPException(
- status_code=400, detail='Recursive uploads must be zip files'
- )
- zip_path = os.path.join(full_dest_path, file.filename)
- with open(zip_path, 'wb') as buffer:
- shutil.copyfileobj(file.file, buffer)
- # Extract the zip file
- shutil.unpack_archive(zip_path, full_dest_path)
- os.remove(zip_path) # Remove the zip file after extraction
- logger.debug(
- f'Uploaded file {file.filename} and extracted to {destination}'
- )
- else:
- # For single file uploads
- file_path = os.path.join(full_dest_path, file.filename)
- with open(file_path, 'wb') as buffer:
- shutil.copyfileobj(file.file, buffer)
- logger.debug(f'Uploaded file {file.filename} to {destination}')
- return JSONResponse(
- content={
- 'filename': file.filename,
- 'destination': destination,
- 'recursive': recursive,
- },
- status_code=200,
- )
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get('/download_files')
- async def download_file(path: str):
- logger.debug('Downloading files')
- try:
- if not os.path.isabs(path):
- raise HTTPException(
- status_code=400, detail='Path must be an absolute path'
- )
- if not os.path.exists(path):
- raise HTTPException(status_code=404, detail='File not found')
- with tempfile.TemporaryFile() as temp_zip:
- with ZipFile(temp_zip, 'w') as zipf:
- for root, _, files in os.walk(path):
- for file in files:
- file_path = os.path.join(root, file)
- zipf.write(
- file_path, arcname=os.path.relpath(file_path, path)
- )
- temp_zip.seek(0) # Rewind the file to the beginning after writing
- content = temp_zip.read()
- # Good for small to medium-sized files. For very large files, streaming directly from the
- # file chunks may be more memory-efficient.
- zip_stream = io.BytesIO(content)
- return StreamingResponse(
- content=zip_stream,
- media_type='application/zip',
- headers={'Content-Disposition': f'attachment; filename={path}.zip'},
- )
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get('/alive')
- async def alive():
- return {'status': 'ok'}
- # ================================
- # VSCode-specific operations
- # ================================
- @app.get('/vscode/connection_token')
- async def get_vscode_connection_token():
- assert client is not None
- if 'vscode' in client.plugins:
- plugin: VSCodePlugin = client.plugins['vscode'] # type: ignore
- return {'token': plugin.vscode_connection_token}
- else:
- return {'token': None}
- # ================================
- # File-specific operations for UI
- # ================================
- @app.post('/list_files')
- async def list_files(request: Request):
- """List files in the specified path.
- This function retrieves a list of files from the agent's runtime file store,
- excluding certain system and hidden files/directories.
- To list files:
- ```sh
- curl http://localhost:3000/api/list-files
- ```
- Args:
- request (Request): The incoming request object.
- path (str, optional): The path to list files from. Defaults to '/'.
- Returns:
- list: A list of file names in the specified path.
- Raises:
- HTTPException: If there's an error listing the files.
- """
- assert client is not None
- # get request as dict
- request_dict = await request.json()
- path = request_dict.get('path', None)
- # Get the full path of the requested directory
- if path is None:
- full_path = client.initial_pwd
- elif os.path.isabs(path):
- full_path = path
- else:
- full_path = os.path.join(client.initial_pwd, path)
- if not os.path.exists(full_path):
- # if user just removed a folder, prevent server error 500 in UI
- return []
- try:
- # Check if the directory exists
- if not os.path.exists(full_path) or not os.path.isdir(full_path):
- return []
- entries = os.listdir(full_path)
- # Separate directories and files
- directories = []
- files = []
- for entry in entries:
- # Remove leading slash and any parent directory components
- entry_relative = entry.lstrip('/').split('/')[-1]
- # Construct the full path by joining the base path with the relative entry path
- full_entry_path = os.path.join(full_path, entry_relative)
- if os.path.exists(full_entry_path):
- is_dir = os.path.isdir(full_entry_path)
- if is_dir:
- # add trailing slash to directories
- # required by FE to differentiate directories and files
- entry = entry.rstrip('/') + '/'
- directories.append(entry)
- else:
- files.append(entry)
- # Sort directories and files separately
- directories.sort(key=lambda s: s.lower())
- files.sort(key=lambda s: s.lower())
- # Combine sorted directories and files
- sorted_entries = directories + files
- return sorted_entries
- except Exception as e:
- logger.error(f'Error listing files: {e}', exc_info=True)
- return []
- logger.debug(f'Starting action execution API on port {args.port}')
- run(app, host='0.0.0.0', port=args.port)
|