runloop_runtime.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import logging
  2. import threading
  3. import time
  4. from typing import Callable
  5. import requests
  6. import tenacity
  7. from runloop_api_client import Runloop
  8. from runloop_api_client.types import DevboxView
  9. from runloop_api_client.types.shared_params import LaunchParameters
  10. from openhands.core.config import AppConfig
  11. from openhands.core.logger import openhands_logger as logger
  12. from openhands.events import EventStream
  13. from openhands.runtime.impl.eventstream.eventstream_runtime import (
  14. EventStreamRuntime,
  15. LogBuffer,
  16. )
  17. from openhands.runtime.plugins import PluginRequirement
  18. from openhands.runtime.utils.command import get_remote_startup_command
  19. from openhands.runtime.utils.request import send_request
  20. from openhands.utils.tenacity_stop import stop_if_should_exit
  21. CONTAINER_NAME_PREFIX = 'openhands-runtime-'
  22. class RunloopLogBuffer(LogBuffer):
  23. """Synchronous buffer for Runloop devbox logs.
  24. This class provides a thread-safe way to collect, store, and retrieve logs
  25. from a Docker container. It uses a list to store log lines and provides methods
  26. for appending, retrieving, and clearing logs.
  27. """
  28. def __init__(self, runloop_api_client: Runloop, devbox_id: str):
  29. self.client_ready = False
  30. self.init_msg = 'Runtime client initialized.'
  31. self.buffer: list[str] = []
  32. self.lock = threading.Lock()
  33. self._stop_event = threading.Event()
  34. self.runloop_api_client = runloop_api_client
  35. self.devbox_id = devbox_id
  36. self.log_index = 0
  37. self.log_stream_thread = threading.Thread(target=self.stream_logs)
  38. self.log_stream_thread.daemon = True
  39. self.log_stream_thread.start()
  40. def stream_logs(self):
  41. """Stream logs from the Docker container in a separate thread.
  42. This method runs in its own thread to handle the blocking
  43. operation of reading log lines from the Docker SDK's synchronous generator.
  44. """
  45. try:
  46. # TODO(Runloop) Replace with stream
  47. while True:
  48. raw_logs = self.runloop_api_client.devboxes.logs.list(
  49. self.devbox_id
  50. ).logs[self.log_index :]
  51. logs = [
  52. log.message
  53. for log in raw_logs
  54. if log.message and log.cmd_id is None
  55. ]
  56. self.log_index += len(raw_logs)
  57. if self._stop_event.is_set():
  58. break
  59. if logs:
  60. for log_line in logs:
  61. self.append(log_line)
  62. if self.init_msg in log_line:
  63. self.client_ready = True
  64. time.sleep(1)
  65. except Exception as e:
  66. logger.error(f'Error streaming runloop logs: {e}')
  67. # NB: Match LogBuffer behavior on below methods
  68. def get_and_clear(self) -> list[str]:
  69. with self.lock:
  70. logs = list(self.buffer)
  71. self.buffer.clear()
  72. return logs
  73. def append(self, log_line: str):
  74. with self.lock:
  75. self.buffer.append(log_line)
  76. def close(self, timeout: float = 5.0):
  77. self._stop_event.set()
  78. self.log_stream_thread.join(timeout)
  79. class RunloopRuntime(EventStreamRuntime):
  80. """The RunloopRuntime class is an EventStreamRuntime that utilizes Runloop Devbox as a runtime environment."""
  81. _sandbox_port: int = 4444
  82. _vscode_port: int = 4445
  83. def __init__(
  84. self,
  85. config: AppConfig,
  86. event_stream: EventStream,
  87. sid: str = 'default',
  88. plugins: list[PluginRequirement] | None = None,
  89. env_vars: dict[str, str] | None = None,
  90. status_callback: Callable | None = None,
  91. attach_to_existing: bool = False,
  92. headless_mode: bool = True,
  93. ):
  94. assert config.runloop_api_key is not None, 'Runloop API key is required'
  95. self.devbox: DevboxView | None = None
  96. self.config = config
  97. self.runloop_api_client = Runloop(
  98. bearer_token=config.runloop_api_key,
  99. )
  100. self.session = requests.Session()
  101. self.container_name = CONTAINER_NAME_PREFIX + sid
  102. self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
  103. self.init_base_runtime(
  104. config,
  105. event_stream,
  106. sid,
  107. plugins,
  108. env_vars,
  109. status_callback,
  110. attach_to_existing,
  111. headless_mode,
  112. )
  113. # Buffer for container logs
  114. self.log_buffer: LogBuffer | None = None
  115. self._vscode_url: str | None = None
  116. @tenacity.retry(
  117. stop=tenacity.stop_after_attempt(120),
  118. wait=tenacity.wait_fixed(1),
  119. )
  120. def _wait_for_devbox(self, devbox: DevboxView) -> DevboxView:
  121. """Pull devbox status until it is running"""
  122. if devbox == 'running':
  123. return devbox
  124. devbox = self.runloop_api_client.devboxes.retrieve(id=devbox.id)
  125. if devbox.status != 'running':
  126. raise ConnectionRefusedError('Devbox is not running')
  127. # Devbox is connected and running
  128. logging.debug(f'devbox.id={devbox.id} is running')
  129. return devbox
  130. def _create_new_devbox(self) -> DevboxView:
  131. # Note: Runloop connect
  132. sandbox_workspace_dir = self.config.workspace_mount_path_in_sandbox
  133. plugin_args = []
  134. if self.plugins is not None and len(self.plugins) > 0:
  135. plugin_args.append('--plugins')
  136. plugin_args.extend([plugin.name for plugin in self.plugins])
  137. browsergym_args = []
  138. if self.config.sandbox.browsergym_eval_env is not None:
  139. browsergym_args = [
  140. '-browsergym-eval-env',
  141. self.config.sandbox.browsergym_eval_env,
  142. ]
  143. # Copied from EventstreamRuntime
  144. start_command = get_remote_startup_command(
  145. self._sandbox_port,
  146. sandbox_workspace_dir,
  147. 'openhands' if self.config.run_as_openhands else 'root',
  148. self.config.sandbox.user_id,
  149. plugin_args,
  150. browsergym_args,
  151. is_root=not self.config.run_as_openhands, # is_root=True when running as root
  152. )
  153. # Add some additional commands based on our image
  154. # NB: start off as root, action_execution_server will ultimately choose user but expects all context
  155. # (ie browser) to be installed as root
  156. start_command = (
  157. 'export MAMBA_ROOT_PREFIX=/openhands/micromamba && '
  158. 'cd /openhands/code && '
  159. + '/openhands/micromamba/bin/micromamba run -n openhands poetry config virtualenvs.path /openhands/poetry && '
  160. + ' '.join(start_command)
  161. )
  162. entrypoint = f"sudo bash -c '{start_command}'"
  163. devbox = self.runloop_api_client.devboxes.create(
  164. entrypoint=entrypoint,
  165. setup_commands=[f'mkdir -p {self.config.workspace_mount_path_in_sandbox}'],
  166. name=self.sid,
  167. environment_variables={'DEBUG': 'true'} if self.config.debug else {},
  168. prebuilt='openhands',
  169. launch_parameters=LaunchParameters(
  170. available_ports=[self._sandbox_port, self._vscode_port],
  171. resource_size_request='LARGE',
  172. ),
  173. metadata={'container-name': self.container_name},
  174. )
  175. return self._wait_for_devbox(devbox)
  176. async def connect(self):
  177. self.send_status_message('STATUS$STARTING_RUNTIME')
  178. if self.attach_to_existing:
  179. active_devboxes = self.runloop_api_client.devboxes.list(
  180. status='running'
  181. ).devboxes
  182. self.devbox = next(
  183. (devbox for devbox in active_devboxes if devbox.name == self.sid), None
  184. )
  185. if self.devbox is None:
  186. self.devbox = self._create_new_devbox()
  187. # Create tunnel - this will return a stable url, so is safe to call if we are attaching to existing
  188. tunnel = self.runloop_api_client.devboxes.create_tunnel(
  189. id=self.devbox.id,
  190. port=self._sandbox_port,
  191. )
  192. # Hook up logs
  193. self.log_buffer = RunloopLogBuffer(self.runloop_api_client, self.devbox.id)
  194. self.api_url = tunnel.url
  195. logger.info(f'Container started. Server url: {self.api_url}')
  196. # End Runloop connect
  197. # NOTE: Copied from EventStreamRuntime
  198. logger.info('Waiting for client to become ready...')
  199. self.send_status_message('STATUS$WAITING_FOR_CLIENT')
  200. self._wait_until_alive()
  201. if not self.attach_to_existing:
  202. self.setup_initial_env()
  203. logger.info(
  204. f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
  205. )
  206. self.send_status_message(' ')
  207. @tenacity.retry(
  208. stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
  209. wait=tenacity.wait_fixed(1),
  210. reraise=(ConnectionRefusedError,),
  211. )
  212. def _wait_until_alive(self):
  213. # NB(Runloop): Remote logs are not guaranteed realtime, removing client_ready check from logs
  214. self._refresh_logs()
  215. if not self.log_buffer:
  216. raise RuntimeError('Runtime client is not ready.')
  217. response = send_request(
  218. self.session,
  219. 'GET',
  220. f'{self.api_url}/alive',
  221. timeout=5,
  222. )
  223. if response.status_code == 200:
  224. return
  225. else:
  226. msg = f'Action execution API is not alive. Response: {response}'
  227. logger.error(msg)
  228. raise RuntimeError(msg)
  229. def close(self, rm_all_containers: bool | None = True):
  230. if self.log_buffer:
  231. self.log_buffer.close()
  232. if self.session:
  233. self.session.close()
  234. if self.attach_to_existing:
  235. return
  236. if self.devbox:
  237. self.runloop_api_client.devboxes.shutdown(self.devbox.id)
  238. @property
  239. def vscode_url(self) -> str | None:
  240. if self.vscode_enabled and self.devbox and self.devbox.status == 'running':
  241. if self._vscode_url is not None:
  242. return self._vscode_url
  243. try:
  244. with send_request(
  245. self.session,
  246. 'GET',
  247. f'{self.api_url}/vscode/connection_token',
  248. timeout=10,
  249. ) as response:
  250. response_json = response.json()
  251. assert isinstance(response_json, dict)
  252. if response_json['token'] is None:
  253. return None
  254. token = response_json['token']
  255. self._vscode_url = (
  256. self.runloop_api_client.devboxes.create_tunnel(
  257. id=self.devbox.id,
  258. port=self._vscode_port,
  259. ).url
  260. + f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
  261. )
  262. self.log(
  263. 'debug',
  264. f'VSCode URL: {self._vscode_url}',
  265. )
  266. return self._vscode_url
  267. except Exception as e:
  268. self.log(
  269. 'error',
  270. f'Failed to create vscode tunnel {e}',
  271. )
  272. return None
  273. else:
  274. return None