client.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. """
  2. This is the main file for the runtime client.
  3. It is responsible for executing actions received from OpenDevin backend and producing observations.
  4. NOTE: this will be executed inside the docker sandbox.
  5. If you already have pre-build docker image yet you changed the code in this file OR dependencies, you need to rebuild the docker image to update the source code.
  6. You should add SANDBOX_UPDATE_SOURCE_CODE=True to any `python XXX.py` command you run to update the source code.
  7. """
  8. import argparse
  9. import asyncio
  10. import os
  11. import re
  12. import subprocess
  13. from contextlib import asynccontextmanager
  14. from pathlib import Path
  15. import pexpect
  16. from fastapi import FastAPI, HTTPException, Request
  17. from pydantic import BaseModel
  18. from uvicorn import run
  19. from opendevin.core.logger import opendevin_logger as logger
  20. from opendevin.events.action import (
  21. Action,
  22. BrowseInteractiveAction,
  23. BrowseURLAction,
  24. CmdRunAction,
  25. FileReadAction,
  26. FileWriteAction,
  27. IPythonRunCellAction,
  28. )
  29. from opendevin.events.observation import (
  30. CmdOutputObservation,
  31. ErrorObservation,
  32. FileReadObservation,
  33. FileWriteObservation,
  34. IPythonRunCellObservation,
  35. Observation,
  36. )
  37. from opendevin.events.serialization import event_from_dict, event_to_dict
  38. from opendevin.runtime.browser import browse
  39. from opendevin.runtime.browser.browser_env import BrowserEnv
  40. from opendevin.runtime.plugins import (
  41. ALL_PLUGINS,
  42. JupyterPlugin,
  43. Plugin,
  44. )
  45. from opendevin.runtime.server.files import insert_lines, read_lines
  46. from opendevin.runtime.utils import split_bash_commands
  47. class ActionRequest(BaseModel):
  48. action: dict
  49. class RuntimeClient:
  50. """RuntimeClient is running inside docker sandbox.
  51. It is responsible for executing actions received from OpenDevin backend and producing observations.
  52. """
  53. def __init__(
  54. self, plugins_to_load: list[Plugin], work_dir: str, username: str, user_id: int
  55. ) -> None:
  56. self.plugins_to_load = plugins_to_load
  57. self.username = username
  58. self.user_id = user_id
  59. self.pwd = work_dir # current PWD
  60. self._init_user(self.username, self.user_id)
  61. self._init_bash_shell(self.pwd, self.username)
  62. self.lock = asyncio.Lock()
  63. self.plugins: dict[str, Plugin] = {}
  64. self.browser = BrowserEnv()
  65. async def ainit(self):
  66. for plugin in self.plugins_to_load:
  67. await plugin.initialize(self.username)
  68. self.plugins[plugin.name] = plugin
  69. logger.info(f'Initializing plugin: {plugin.name}')
  70. if isinstance(plugin, JupyterPlugin):
  71. await self.run_ipython(
  72. IPythonRunCellAction(code=f'import os; os.chdir("{self.pwd}")')
  73. )
  74. # This is a temporary workaround
  75. # TODO: refactor AgentSkills to be part of JupyterPlugin
  76. # AFTER ServerRuntime is deprecated
  77. if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
  78. obs = await self.run_ipython(
  79. IPythonRunCellAction(
  80. code=(
  81. 'import sys\n'
  82. 'sys.path.insert(0, "/opendevin/code/opendevin/runtime/plugins/agent_skills")\n'
  83. 'from agentskills import *'
  84. )
  85. )
  86. )
  87. logger.info(f'AgentSkills initialized: {obs}')
  88. def _init_user(self, username: str, user_id: int) -> None:
  89. """Create user if not exists."""
  90. # Skip root since it is already created
  91. if username == 'root':
  92. return
  93. # Add sudoer
  94. sudoer_line = r"echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers"
  95. output = subprocess.run(sudoer_line, shell=True, capture_output=True)
  96. if output.returncode != 0:
  97. raise RuntimeError(f'Failed to add sudoer: {output.stderr.decode()}')
  98. logger.debug(f'Added sudoer successfully. Output: [{output.stdout.decode()}]')
  99. # Add user
  100. output = subprocess.run(
  101. (
  102. f'useradd -rm -d /home/{username} -s /bin/bash '
  103. f'-g root -G sudo -g root -G sudo -u {user_id} {username}'
  104. ),
  105. shell=True,
  106. capture_output=True,
  107. )
  108. if output.returncode != 0:
  109. raise RuntimeError(
  110. f'Failed to create user {username}: {output.stderr.decode()}'
  111. )
  112. logger.debug(
  113. f'Added user {username} successfully. Output: [{output.stdout.decode()}]'
  114. )
  115. def _init_bash_shell(self, work_dir: str, username: str) -> None:
  116. self.shell = pexpect.spawn(
  117. f'su - {username}',
  118. encoding='utf-8',
  119. echo=False,
  120. )
  121. self.__bash_PS1 = r'[PEXPECT_BEGIN] \u@\h:\w [PEXPECT_END]'
  122. # This should NOT match "PS1=\u@\h:\w [PEXPECT]$" when `env` is executed
  123. self.__bash_expect_regex = (
  124. r'\[PEXPECT_BEGIN\] ([a-z0-9_-]*)@([a-zA-Z0-9.-]*):(.+) \[PEXPECT_END\]'
  125. )
  126. self.shell.sendline(f'export PS1="{self.__bash_PS1}"; export PS2=""')
  127. self.shell.expect(self.__bash_expect_regex)
  128. self.shell.sendline(f'cd {work_dir}')
  129. self.shell.expect(self.__bash_expect_regex)
  130. logger.debug(
  131. f'Bash initialized. Working directory: {work_dir}. Output: {self.shell.before}'
  132. )
  133. def _get_bash_prompt_and_update_pwd(self):
  134. ps1 = self.shell.after
  135. # begin at the last occurence of '[PEXPECT_BEGIN]'.
  136. # In multi-line bash commands, the prompt will be repeated
  137. # and the matched regex captures all of them
  138. # - we only want the last one (newest prompt)
  139. _begin_pos = ps1.rfind('[PEXPECT_BEGIN]')
  140. if _begin_pos != -1:
  141. ps1 = ps1[_begin_pos:]
  142. # parse the ps1 to get username, hostname, and working directory
  143. matched = re.match(self.__bash_expect_regex, ps1)
  144. assert (
  145. matched is not None
  146. ), f'Failed to parse bash prompt: {ps1}. This should not happen.'
  147. username, hostname, working_dir = matched.groups()
  148. self._prev_pwd = self.pwd
  149. self.pwd = working_dir
  150. # re-assemble the prompt
  151. prompt = f'{username}@{hostname}:{working_dir} '
  152. if username == 'root':
  153. prompt += '#'
  154. else:
  155. prompt += '$'
  156. return prompt + ' '
  157. def _execute_bash(
  158. self,
  159. command: str,
  160. keep_prompt: bool = True,
  161. timeout: int = 300,
  162. ) -> tuple[str, int]:
  163. logger.debug(f'Executing command: {command}')
  164. self.shell.sendline(command)
  165. self.shell.expect(self.__bash_expect_regex, timeout=timeout)
  166. output = self.shell.before
  167. if keep_prompt:
  168. output += '\r\n' + self._get_bash_prompt_and_update_pwd()
  169. logger.debug(f'Command output: {output}')
  170. # Get exit code
  171. self.shell.sendline('echo $?')
  172. logger.debug(f'Executing command for exit code: {command}')
  173. self.shell.expect(self.__bash_expect_regex, timeout=timeout)
  174. _exit_code_output = self.shell.before
  175. logger.debug(f'Exit code Output: {_exit_code_output}')
  176. exit_code = int(_exit_code_output.strip().split()[0])
  177. return output, exit_code
  178. async def run_action(self, action) -> Observation:
  179. action_type = action.action
  180. observation = await getattr(self, action_type)(action)
  181. return observation
  182. async def run(self, action: CmdRunAction) -> CmdOutputObservation:
  183. try:
  184. commands = split_bash_commands(action.command)
  185. all_output = ''
  186. for command in commands:
  187. output, exit_code = self._execute_bash(command)
  188. if all_output:
  189. # previous output already exists with prompt "user@hostname:working_dir #""
  190. # we need to add the command to the previous output,
  191. # so model knows the following is the output of another action)
  192. all_output = all_output.rstrip() + ' ' + command + '\r\n'
  193. all_output += str(output) + '\r\n'
  194. if exit_code != 0:
  195. break
  196. return CmdOutputObservation(
  197. command_id=-1,
  198. content=all_output.rstrip('\r\n'),
  199. command=action.command,
  200. exit_code=exit_code,
  201. )
  202. except UnicodeDecodeError:
  203. raise RuntimeError('Command output could not be decoded as utf-8')
  204. async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
  205. if 'jupyter' in self.plugins:
  206. _jupyter_plugin: JupyterPlugin = self.plugins['jupyter'] # type: ignore
  207. # This is used to make AgentSkills in Jupyter aware of the
  208. # current working directory in Bash
  209. if not hasattr(self, '_prev_pwd') or self.pwd != self._prev_pwd:
  210. reset_jupyter_pwd_code = (
  211. f'import os; os.environ["JUPYTER_PWD"] = "{self.pwd}"\n\n'
  212. )
  213. _aux_action = IPythonRunCellAction(code=reset_jupyter_pwd_code)
  214. _ = await _jupyter_plugin.run(_aux_action)
  215. obs: IPythonRunCellObservation = await _jupyter_plugin.run(action)
  216. return obs
  217. else:
  218. raise RuntimeError(
  219. 'JupyterRequirement not found. Unable to run IPython action.'
  220. )
  221. def get_working_directory(self):
  222. result, exit_code = self._execute_bash('pwd', keep_prompt=False)
  223. if exit_code != 0:
  224. raise RuntimeError('Failed to get working directory')
  225. return result.strip()
  226. def _resolve_path(self, path: str, working_dir: str) -> str:
  227. filepath = Path(path)
  228. if not filepath.is_absolute():
  229. return str(Path(working_dir) / filepath)
  230. return str(filepath)
  231. async def read(self, action: FileReadAction) -> Observation:
  232. # NOTE: the client code is running inside the sandbox,
  233. # so there's no need to check permission
  234. working_dir = self.get_working_directory()
  235. filepath = self._resolve_path(action.path, working_dir)
  236. try:
  237. with open(filepath, 'r', encoding='utf-8') as file:
  238. lines = read_lines(file.readlines(), action.start, action.end)
  239. except FileNotFoundError:
  240. return ErrorObservation(
  241. f'File not found: {filepath}. Your current working directory is {working_dir}.'
  242. )
  243. except UnicodeDecodeError:
  244. return ErrorObservation(f'File could not be decoded as utf-8: {filepath}.')
  245. except IsADirectoryError:
  246. return ErrorObservation(
  247. f'Path is a directory: {filepath}. You can only read files'
  248. )
  249. code_view = ''.join(lines)
  250. return FileReadObservation(path=filepath, content=code_view)
  251. async def write(self, action: FileWriteAction) -> Observation:
  252. working_dir = self.get_working_directory()
  253. filepath = self._resolve_path(action.path, working_dir)
  254. insert = action.content.split('\n')
  255. try:
  256. if not os.path.exists(os.path.dirname(filepath)):
  257. os.makedirs(os.path.dirname(filepath))
  258. mode = 'w' if not os.path.exists(filepath) else 'r+'
  259. try:
  260. with open(filepath, mode, encoding='utf-8') as file:
  261. if mode != 'w':
  262. all_lines = file.readlines()
  263. new_file = insert_lines(
  264. insert, all_lines, action.start, action.end
  265. )
  266. else:
  267. new_file = [i + '\n' for i in insert]
  268. file.seek(0)
  269. file.writelines(new_file)
  270. file.truncate()
  271. except FileNotFoundError:
  272. return ErrorObservation(f'File not found: {filepath}')
  273. except IsADirectoryError:
  274. return ErrorObservation(
  275. f'Path is a directory: {filepath}. You can only write to files'
  276. )
  277. except UnicodeDecodeError:
  278. return ErrorObservation(
  279. f'File could not be decoded as utf-8: {filepath}'
  280. )
  281. except PermissionError:
  282. return ErrorObservation(f'Malformed paths not permitted: {filepath}')
  283. return FileWriteObservation(content='', path=filepath)
  284. async def browse(self, action: BrowseURLAction) -> Observation:
  285. return await browse(action, self.browser)
  286. async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
  287. return await browse(action, self.browser)
  288. def close(self):
  289. self.shell.close()
  290. self.browser.close()
  291. # def test_run_commond():
  292. # client = RuntimeClient()
  293. # command = CmdRunAction(command='ls -l')
  294. # obs = client.run_action(command)
  295. # print(obs)
  296. # def test_shell(message):
  297. # shell = pexpect.spawn('/bin/bash', encoding='utf-8')
  298. # shell.expect(r'[$#] ')
  299. # print(f'Received command: {message}')
  300. # shell.sendline(message)
  301. # shell.expect(r'[$#] ')
  302. # output = shell.before.strip().split('\r\n', 1)[1].strip()
  303. # print(f'Output: {output}')
  304. # shell.close()
  305. if __name__ == '__main__':
  306. parser = argparse.ArgumentParser()
  307. parser.add_argument('port', type=int, help='Port to listen on')
  308. parser.add_argument('--working-dir', type=str, help='Working directory')
  309. parser.add_argument('--plugins', type=str, help='Plugins to initialize', nargs='+')
  310. parser.add_argument(
  311. '--username', type=str, help='User to run as', default='opendevin'
  312. )
  313. parser.add_argument('--user-id', type=int, help='User ID to run as', default=1000)
  314. # example: python client.py 8000 --working-dir /workspace --plugins JupyterRequirement
  315. args = parser.parse_args()
  316. plugins_to_load: list[Plugin] = []
  317. if args.plugins:
  318. for plugin in args.plugins:
  319. if plugin not in ALL_PLUGINS:
  320. raise ValueError(f'Plugin {plugin} not found')
  321. plugins_to_load.append(ALL_PLUGINS[plugin]()) # type: ignore
  322. client: RuntimeClient | None = None
  323. @asynccontextmanager
  324. async def lifespan(app: FastAPI):
  325. global client
  326. client = RuntimeClient(
  327. plugins_to_load,
  328. work_dir=args.working_dir,
  329. username=args.username,
  330. user_id=args.user_id,
  331. )
  332. await client.ainit()
  333. yield
  334. # Clean up & release the resources
  335. client.close()
  336. app = FastAPI(lifespan=lifespan)
  337. @app.middleware('http')
  338. async def one_request_at_a_time(request: Request, call_next):
  339. assert client is not None
  340. async with client.lock:
  341. response = await call_next(request)
  342. return response
  343. @app.post('/execute_action')
  344. async def execute_action(action_request: ActionRequest):
  345. assert client is not None
  346. try:
  347. action = event_from_dict(action_request.action)
  348. if not isinstance(action, Action):
  349. raise HTTPException(status_code=400, detail='Invalid action type')
  350. observation = await client.run_action(action)
  351. return event_to_dict(observation)
  352. except Exception as e:
  353. logger.error(f'Error processing command: {str(e)}')
  354. raise HTTPException(status_code=500, detail=str(e))
  355. @app.get('/alive')
  356. async def alive():
  357. return {'status': 'ok'}
  358. logger.info(f'Starting action execution API on port {args.port}')
  359. print(f'Starting action execution API on port {args.port}')
  360. run(app, host='0.0.0.0', port=args.port)