runloop_runtime.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import logging
  2. import threading
  3. import time
  4. from typing import Callable
  5. import requests
  6. import tenacity
  7. from runloop_api_client import Runloop
  8. from runloop_api_client.types import DevboxView
  9. from runloop_api_client.types.shared_params import LaunchParameters
  10. from openhands.core.config import AppConfig
  11. from openhands.core.logger import openhands_logger as logger
  12. from openhands.events import EventStream
  13. from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
  14. from openhands.runtime.plugins import PluginRequirement
  15. from openhands.runtime.utils.command import get_remote_startup_command
  16. from openhands.runtime.utils.log_streamer import LogStreamer
  17. from openhands.runtime.utils.request import send_request
  18. from openhands.utils.tenacity_stop import stop_if_should_exit
  19. CONTAINER_NAME_PREFIX = 'openhands-runtime-'
  20. class RunloopLogStreamer(LogStreamer):
  21. """Streams Runloop devbox logs to stdout.
  22. This class provides a way to stream logs from a Runloop devbox directly to stdout
  23. through the provided logging function.
  24. """
  25. def __init__(
  26. self,
  27. runloop_api_client: Runloop,
  28. devbox_id: str,
  29. logFn: Callable,
  30. ):
  31. self.runloop_api_client = runloop_api_client
  32. self.devbox_id = devbox_id
  33. self.log = logFn
  34. self.log_index = 0
  35. self._stop_event = threading.Event()
  36. # Start the stdout streaming thread
  37. self.stdout_thread = threading.Thread(target=self._stream_logs)
  38. self.stdout_thread.daemon = True
  39. self.stdout_thread.start()
  40. def _stream_logs(self):
  41. """Stream logs from the Runloop devbox."""
  42. try:
  43. while True:
  44. raw_logs = self.runloop_api_client.devboxes.logs.list(
  45. self.devbox_id
  46. ).logs[self.log_index :]
  47. logs = [
  48. log.message
  49. for log in raw_logs
  50. if log.message and log.cmd_id is None
  51. ]
  52. self.log_index += len(raw_logs)
  53. if self._stop_event.is_set():
  54. break
  55. if logs:
  56. for log_line in logs:
  57. self.log('debug', f'[inside devbox] {log_line}')
  58. time.sleep(1)
  59. except Exception as e:
  60. self.log('error', f'Error streaming runloop logs: {e}')
  61. class RunloopRuntime(EventStreamRuntime):
  62. """The RunloopRuntime class is an EventStreamRuntime that utilizes Runloop Devbox as a runtime environment."""
  63. _sandbox_port: int = 4444
  64. _vscode_port: int = 4445
  65. def __init__(
  66. self,
  67. config: AppConfig,
  68. event_stream: EventStream,
  69. sid: str = 'default',
  70. plugins: list[PluginRequirement] | None = None,
  71. env_vars: dict[str, str] | None = None,
  72. status_callback: Callable | None = None,
  73. attach_to_existing: bool = False,
  74. headless_mode: bool = True,
  75. ):
  76. assert config.runloop_api_key is not None, 'Runloop API key is required'
  77. self.devbox: DevboxView | None = None
  78. self.config = config
  79. self.runloop_api_client = Runloop(
  80. bearer_token=config.runloop_api_key,
  81. )
  82. self.session = requests.Session()
  83. self.container_name = CONTAINER_NAME_PREFIX + sid
  84. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  85. self.init_base_runtime(
  86. config,
  87. event_stream,
  88. sid,
  89. plugins,
  90. env_vars,
  91. status_callback,
  92. attach_to_existing,
  93. headless_mode,
  94. )
  95. # Buffer for container logs
  96. self.log_streamer: LogStreamer | None = None
  97. self._vscode_url: str | None = None
  98. @tenacity.retry(
  99. stop=tenacity.stop_after_attempt(120),
  100. wait=tenacity.wait_fixed(1),
  101. )
  102. def _wait_for_devbox(self, devbox: DevboxView) -> DevboxView:
  103. """Pull devbox status until it is running"""
  104. if devbox == 'running':
  105. return devbox
  106. devbox = self.runloop_api_client.devboxes.retrieve(id=devbox.id)
  107. if devbox.status != 'running':
  108. raise ConnectionRefusedError('Devbox is not running')
  109. # Devbox is connected and running
  110. logging.debug(f'devbox.id={devbox.id} is running')
  111. return devbox
  112. def _create_new_devbox(self) -> DevboxView:
  113. # Note: Runloop connect
  114. sandbox_workspace_dir = self.config.workspace_mount_path_in_sandbox
  115. plugin_args = []
  116. if self.plugins is not None and len(self.plugins) > 0:
  117. plugin_args.append('--plugins')
  118. plugin_args.extend([plugin.name for plugin in self.plugins])
  119. browsergym_args = []
  120. if self.config.sandbox.browsergym_eval_env is not None:
  121. browsergym_args = [
  122. '-browsergym-eval-env',
  123. self.config.sandbox.browsergym_eval_env,
  124. ]
  125. # Copied from EventstreamRuntime
  126. start_command = get_remote_startup_command(
  127. self._sandbox_port,
  128. sandbox_workspace_dir,
  129. 'openhands' if self.config.run_as_openhands else 'root',
  130. self.config.sandbox.user_id,
  131. plugin_args,
  132. browsergym_args,
  133. is_root=not self.config.run_as_openhands, # is_root=True when running as root
  134. )
  135. # Add some additional commands based on our image
  136. # NB: start off as root, action_execution_server will ultimately choose user but expects all context
  137. # (ie browser) to be installed as root
  138. start_command = (
  139. 'export MAMBA_ROOT_PREFIX=/openhands/micromamba && '
  140. 'cd /openhands/code && '
  141. + '/openhands/micromamba/bin/micromamba run -n openhands poetry config virtualenvs.path /openhands/poetry && '
  142. + ' '.join(start_command)
  143. )
  144. entrypoint = f"sudo bash -c '{start_command}'"
  145. devbox = self.runloop_api_client.devboxes.create(
  146. entrypoint=entrypoint,
  147. setup_commands=[f'mkdir -p {self.config.workspace_mount_path_in_sandbox}'],
  148. name=self.sid,
  149. environment_variables={'DEBUG': 'true'} if self.config.debug else {},
  150. prebuilt='openhands',
  151. launch_parameters=LaunchParameters(
  152. available_ports=[self._sandbox_port, self._vscode_port],
  153. resource_size_request='LARGE',
  154. ),
  155. metadata={'container-name': self.container_name},
  156. )
  157. return self._wait_for_devbox(devbox)
  158. async def connect(self):
  159. self.send_status_message('STATUS$STARTING_RUNTIME')
  160. if self.attach_to_existing:
  161. active_devboxes = self.runloop_api_client.devboxes.list(
  162. status='running'
  163. ).devboxes
  164. self.devbox = next(
  165. (devbox for devbox in active_devboxes if devbox.name == self.sid), None
  166. )
  167. if self.devbox is None:
  168. self.devbox = self._create_new_devbox()
  169. # Create tunnel - this will return a stable url, so is safe to call if we are attaching to existing
  170. tunnel = self.runloop_api_client.devboxes.create_tunnel(
  171. id=self.devbox.id,
  172. port=self._sandbox_port,
  173. )
  174. # Hook up logs
  175. self.log_streamer = RunloopLogStreamer(
  176. self.runloop_api_client, self.devbox.id, logger.info
  177. )
  178. self.api_url = tunnel.url
  179. logger.info(f'Container started. Server url: {self.api_url}')
  180. # End Runloop connect
  181. # NOTE: Copied from EventStreamRuntime
  182. logger.info('Waiting for client to become ready...')
  183. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  184. self._wait_until_alive()
  185. if not self.attach_to_existing:
  186. self.setup_initial_env()
  187. logger.info(
  188. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
  189. )
  190. self.send_status_message(' ')
  191. @tenacity.retry(
  192. stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
  193. wait=tenacity.wait_fixed(1),
  194. reraise=(ConnectionRefusedError,),
  195. )
  196. def _wait_until_alive(self):
  197. if not self.log_streamer:
  198. raise RuntimeError('Runtime client is not ready.')
  199. response = send_request(
  200. self.session,
  201. 'GET',
  202. f'{self.api_url}/alive',
  203. timeout=5,
  204. )
  205. if response.status_code == 200:
  206. return
  207. else:
  208. msg = f'Action execution API is not alive. Response: {response}'
  209. logger.error(msg)
  210. raise RuntimeError(msg)
  211. def close(self, rm_all_containers: bool | None = True):
  212. if self.log_streamer:
  213. self.log_streamer.close()
  214. if self.session:
  215. self.session.close()
  216. if self.attach_to_existing:
  217. return
  218. if self.devbox:
  219. self.runloop_api_client.devboxes.shutdown(self.devbox.id)
  220. @property
  221. def vscode_url(self) -> str | None:
  222. if self.vscode_enabled and self.devbox and self.devbox.status == 'running':
  223. if self._vscode_url is not None:
  224. return self._vscode_url
  225. try:
  226. with send_request(
  227. self.session,
  228. 'GET',
  229. f'{self.api_url}/vscode/connection_token',
  230. timeout=10,
  231. ) as response:
  232. response_json = response.json()
  233. assert isinstance(response_json, dict)
  234. if response_json['token'] is None:
  235. return None
  236. token = response_json['token']
  237. self._vscode_url = (
  238. self.runloop_api_client.devboxes.create_tunnel(
  239. id=self.devbox.id,
  240. port=self._vscode_port,
  241. ).url
  242. + f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
  243. )
  244. self.log(
  245. 'debug',
  246. f'VSCode URL: {self._vscode_url}',
  247. )
  248. return self._vscode_url
  249. except Exception as e:
  250. self.log(
  251. 'error',
  252. f'Failed to create vscode tunnel {e}',
  253. )
  254. return None
  255. else:
  256. return None