__init__.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import subprocess
  2. import time
  3. from dataclasses import dataclass
  4. from openhands.core.logger import openhands_logger as logger
  5. from openhands.events.action import Action, IPythonRunCellAction
  6. from openhands.events.observation import IPythonRunCellObservation
  7. from openhands.runtime.plugins.jupyter.execute_server import JupyterKernel
  8. from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
  9. from openhands.runtime.utils import find_available_tcp_port
  10. @dataclass
  11. class JupyterRequirement(PluginRequirement):
  12. name: str = 'jupyter'
  13. class JupyterPlugin(Plugin):
  14. name: str = 'jupyter'
  15. async def initialize(self, username: str, kernel_id: str = 'openhands-default'):
  16. self.kernel_gateway_port = find_available_tcp_port()
  17. self.kernel_id = kernel_id
  18. self.gateway_process = subprocess.Popen(
  19. (
  20. f"su - {username} -s /bin/bash << 'EOF'\n"
  21. 'cd /openhands/code\n'
  22. 'export POETRY_VIRTUALENVS_PATH=/openhands/poetry;\n'
  23. 'export PYTHONPATH=/openhands/code:$PYTHONPATH;\n'
  24. '/openhands/miniforge3/bin/mamba run -n base '
  25. 'poetry run jupyter kernelgateway '
  26. '--KernelGatewayApp.ip=0.0.0.0 '
  27. f'--KernelGatewayApp.port={self.kernel_gateway_port}\n'
  28. 'EOF'
  29. ),
  30. stderr=subprocess.STDOUT,
  31. shell=True,
  32. )
  33. # read stdout until the kernel gateway is ready
  34. output = ''
  35. while True and self.gateway_process.stdout is not None:
  36. line = self.gateway_process.stdout.readline().decode('utf-8')
  37. output += line
  38. if 'at' in line:
  39. break
  40. time.sleep(1)
  41. logger.debug('Waiting for jupyter kernel gateway to start...')
  42. logger.info(
  43. f'Jupyter kernel gateway started at port {self.kernel_gateway_port}. Output: {output}'
  44. )
  45. _obs = await self.run(
  46. IPythonRunCellAction(code='import sys; print(sys.executable)')
  47. )
  48. self.python_interpreter_path = _obs.content.strip()
  49. async def _run(self, action: Action) -> IPythonRunCellObservation:
  50. """Internal method to run a code cell in the jupyter kernel."""
  51. if not isinstance(action, IPythonRunCellAction):
  52. raise ValueError(
  53. f'Jupyter plugin only supports IPythonRunCellAction, but got {action}'
  54. )
  55. if not hasattr(self, 'kernel'):
  56. self.kernel = JupyterKernel(
  57. f'localhost:{self.kernel_gateway_port}', self.kernel_id
  58. )
  59. if not self.kernel.initialized:
  60. await self.kernel.initialize()
  61. output = await self.kernel.execute(action.code, timeout=action.timeout)
  62. return IPythonRunCellObservation(
  63. content=output,
  64. code=action.code,
  65. )
  66. async def run(self, action: Action) -> IPythonRunCellObservation:
  67. obs = await self._run(action)
  68. return obs