__init__.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import subprocess
  2. import time
  3. from dataclasses import dataclass
  4. from openhands.core.logger import openhands_logger as logger
  5. from openhands.events.action import Action, IPythonRunCellAction
  6. from openhands.events.observation import IPythonRunCellObservation
  7. from openhands.runtime.plugins.jupyter.execute_server import JupyterKernel
  8. from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
  9. from openhands.runtime.utils import find_available_tcp_port
  10. from openhands.runtime.utils.shutdown_listener import should_continue
  11. @dataclass
  12. class JupyterRequirement(PluginRequirement):
  13. name: str = 'jupyter'
  14. class JupyterPlugin(Plugin):
  15. name: str = 'jupyter'
  16. async def initialize(self, username: str, kernel_id: str = 'openhands-default'):
  17. self.kernel_gateway_port = find_available_tcp_port(40000, 49999)
  18. self.kernel_id = kernel_id
  19. self.gateway_process = subprocess.Popen(
  20. (
  21. f"su - {username} -s /bin/bash << 'EOF'\n"
  22. 'cd /openhands/code\n'
  23. 'export POETRY_VIRTUALENVS_PATH=/openhands/poetry;\n'
  24. 'export PYTHONPATH=/openhands/code:$PYTHONPATH;\n'
  25. '/openhands/miniforge3/bin/mamba run -n base '
  26. 'poetry run jupyter kernelgateway '
  27. '--KernelGatewayApp.ip=0.0.0.0 '
  28. f'--KernelGatewayApp.port={self.kernel_gateway_port}\n'
  29. 'EOF'
  30. ),
  31. stderr=subprocess.STDOUT,
  32. shell=True,
  33. )
  34. # read stdout until the kernel gateway is ready
  35. output = ''
  36. while should_continue() and self.gateway_process.stdout is not None:
  37. line = self.gateway_process.stdout.readline().decode('utf-8')
  38. output += line
  39. if 'at' in line:
  40. break
  41. time.sleep(1)
  42. logger.debug('Waiting for jupyter kernel gateway to start...')
  43. logger.info(
  44. f'Jupyter kernel gateway started at port {self.kernel_gateway_port}. Output: {output}'
  45. )
  46. _obs = await self.run(
  47. IPythonRunCellAction(code='import sys; print(sys.executable)')
  48. )
  49. self.python_interpreter_path = _obs.content.strip()
  50. async def _run(self, action: Action) -> IPythonRunCellObservation:
  51. """Internal method to run a code cell in the jupyter kernel."""
  52. if not isinstance(action, IPythonRunCellAction):
  53. raise ValueError(
  54. f'Jupyter plugin only supports IPythonRunCellAction, but got {action}'
  55. )
  56. if not hasattr(self, 'kernel'):
  57. self.kernel = JupyterKernel(
  58. f'localhost:{self.kernel_gateway_port}', self.kernel_id
  59. )
  60. if not self.kernel.initialized:
  61. await self.kernel.initialize()
  62. output = await self.kernel.execute(action.code, timeout=action.timeout)
  63. return IPythonRunCellObservation(
  64. content=output,
  65. code=action.code,
  66. )
  67. async def run(self, action: Action) -> IPythonRunCellObservation:
  68. obs = await self._run(action)
  69. return obs