"""Bash-related tests for the EventStreamRuntime, which connects to the ActionExecutor running in the sandbox.""" import os from pathlib import Path import pytest from conftest import ( TEST_IN_CI, _close_test_runtime, _get_sandbox_folder, _load_runtime, ) from openhands.core.logger import openhands_logger as logger from openhands.events.action import CmdRunAction from openhands.events.observation import CmdOutputObservation from openhands.runtime.base import Runtime # ============================================================================================================================ # Bash-specific tests # ============================================================================================================================ def _run_cmd_action(runtime, custom_command: str, keep_prompt=True): action = CmdRunAction(command=custom_command, keep_prompt=keep_prompt) logger.info(action, extra={'msg_type': 'ACTION'}) obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) return obs def test_bash_command_pexcept(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: # We set env var PS1="\u@\h:\w $" # and construct the PEXCEPT prompt base on it. # When run `env`, bad implementation of CmdRunAction will be pexcepted by this # and failed to pexcept the right content, causing it fail to get error code. obs = runtime.run_action(CmdRunAction(command='env')) # For example: # 02:16:13 - openhands:DEBUG: client.py:78 - Executing command: env # 02:16:13 - openhands:DEBUG: client.py:82 - Command output: PYTHONUNBUFFERED=1 # CONDA_EXE=/openhands/miniforge3/bin/conda # [...] # LC_CTYPE=C.UTF-8 # PS1=\u@\h:\w $ # 02:16:13 - openhands:DEBUG: client.py:89 - Executing command for exit code: env # 02:16:13 - openhands:DEBUG: client.py:92 - Exit code Output: # CONDA_DEFAULT_ENV=base # As long as the exit code is 0, the test will pass. assert isinstance( obs, CmdOutputObservation ), 'The observation should be a CmdOutputObservation.' assert obs.exit_code == 0, 'The exit code should be 0.' finally: _close_test_runtime(runtime) def test_bash_timeout_and_keyboard_interrupt(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: action = CmdRunAction(command='python -c "import time; time.sleep(10)"') action.timeout = 1 obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert ( '[Command timed out after 1 seconds. SIGINT was sent to interrupt the command.]' in obs.content ) assert 'KeyboardInterrupt' in obs.content # follow up command should not be affected action = CmdRunAction(command='ls') action.timeout = 1 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 logger.info(obs, extra={'msg_type': 'OBSERVATION'}) # run it again! action = CmdRunAction(command='python -c "import time; time.sleep(10)"') action.timeout = 1 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert ( '[Command timed out after 1 seconds. SIGINT was sent to interrupt the command.]' in obs.content ) assert 'KeyboardInterrupt' in obs.content # things should still work action = CmdRunAction(command='ls') action.timeout = 1 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert '/workspace' in obs.interpreter_details finally: _close_test_runtime(runtime) def test_bash_pexcept_eof(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: action = CmdRunAction(command='python3 -m http.server 8080') action.timeout = 1 obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 130 # script was killed by SIGINT assert 'Serving HTTP on 0.0.0.0 port 8080' in obs.content assert 'Keyboard interrupt received, exiting.' in obs.content action = CmdRunAction(command='ls') action.timeout = 1 obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert '/workspace' in obs.interpreter_details # run it again! action = CmdRunAction(command='python3 -m http.server 8080') action.timeout = 1 obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 130 # script was killed by SIGINT assert 'Serving HTTP on 0.0.0.0 port 8080' in obs.content assert 'Keyboard interrupt received, exiting.' in obs.content # things should still work action = CmdRunAction(command='ls') action.timeout = 1 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert '/workspace' in obs.interpreter_details finally: _close_test_runtime(runtime) def test_process_resistant_to_one_sigint(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: # Create a bash script that ignores SIGINT up to 1 times script_content = """ #!/bin/bash trap_count=0 trap 'echo "Caught SIGINT ($((++trap_count))/1), ignoring..."; [ $trap_count -ge 1 ] && trap - INT && exit' INT while true; do echo "Still running..." sleep 1 done """.strip() with open(f'{temp_dir}/resistant_script.sh', 'w') as f: f.write(script_content) os.chmod(f'{temp_dir}/resistant_script.sh', 0o777) runtime.copy_to( os.path.join(temp_dir, 'resistant_script.sh'), runtime.config.workspace_mount_path_in_sandbox, ) # Run the resistant script action = CmdRunAction(command='sudo bash ./resistant_script.sh') action.timeout = 5 action.blocking = True logger.info(action, extra={'msg_type': 'ACTION'}) obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 130 # script was killed by SIGINT assert 'Still running...' in obs.content assert 'Caught SIGINT (1/1), ignoring...' in obs.content assert 'Stopped' not in obs.content assert ( '[Command timed out after 5 seconds. SIGINT was sent to interrupt the command.]' in obs.content ) # Normal command should still work action = CmdRunAction(command='ls') action.timeout = 10 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert '/workspace' in obs.interpreter_details assert 'resistant_script.sh' in obs.content finally: _close_test_runtime(runtime) def test_process_resistant_to_multiple_sigint(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: # Create a bash script that ignores SIGINT up to 2 times script_content = """ #!/bin/bash trap_count=0 trap 'echo "Caught SIGINT ($((++trap_count))/3), ignoring..."; [ $trap_count -ge 3 ] && trap - INT && exit' INT while true; do echo "Still running..." sleep 1 done """.strip() with open(f'{temp_dir}/resistant_script.sh', 'w') as f: f.write(script_content) os.chmod(f'{temp_dir}/resistant_script.sh', 0o777) runtime.copy_to( os.path.join(temp_dir, 'resistant_script.sh'), runtime.config.workspace_mount_path_in_sandbox, ) # Run the resistant script action = CmdRunAction(command='sudo bash ./resistant_script.sh') action.timeout = 2 action.blocking = True logger.info(action, extra={'msg_type': 'ACTION'}) obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert 'Still running...' in obs.content assert 'Caught SIGINT (1/3), ignoring...' in obs.content assert '[1]+' and 'Stopped' in obs.content assert ( '[Command timed out after 2 seconds. SIGINT was sent to interrupt the command, but failed. The command was killed.]' in obs.content ) # Normal command should still work action = CmdRunAction(command='ls') action.timeout = 10 obs = runtime.run_action(action) assert isinstance(obs, CmdOutputObservation) assert obs.exit_code == 0 assert '/workspace' in obs.interpreter_details assert 'resistant_script.sh' in obs.content finally: _close_test_runtime(runtime) def test_multiline_commands(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: # single multiline command obs = _run_cmd_action(runtime, 'echo \\\n -e "foo"') assert obs.exit_code == 0, 'The exit code should be 0.' assert 'foo' in obs.content # test multiline echo obs = _run_cmd_action(runtime, 'echo -e "hello\nworld"') assert obs.exit_code == 0, 'The exit code should be 0.' assert 'hello\r\nworld' in obs.content # test whitespace obs = _run_cmd_action(runtime, 'echo -e "a\\n\\n\\nz"') assert obs.exit_code == 0, 'The exit code should be 0.' assert '\r\n\r\n\r\n' in obs.content finally: _close_test_runtime(runtime) def test_multiple_multiline_commands(temp_dir, runtime_cls, run_as_openhands): cmds = [ 'ls -l', 'echo -e "hello\nworld"', """ echo -e "hello it\\'s me" """.strip(), """ echo \\ -e 'hello' \\ -v """.strip(), """ echo -e 'hello\\nworld\\nare\\nyou\\nthere?' """.strip(), """ echo -e 'hello world are you\\n there?' """.strip(), """ echo -e 'hello world " ' """.strip(), ] joined_cmds = '\n'.join(cmds) runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: obs = _run_cmd_action(runtime, joined_cmds) assert obs.exit_code == 0, 'The exit code should be 0.' assert 'total 0' in obs.content assert 'hello\r\nworld' in obs.content assert "hello it\\'s me" in obs.content assert 'hello -v' in obs.content assert 'hello\r\nworld\r\nare\r\nyou\r\nthere?' in obs.content assert 'hello\r\nworld\r\nare\r\nyou\r\n\r\nthere?' in obs.content finally: _close_test_runtime(runtime) def test_no_ps2_in_output(temp_dir, runtime_cls, run_as_openhands): """Test that the PS2 sign is not added to the output of a multiline command.""" runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: obs = _run_cmd_action(runtime, 'echo -e "hello\nworld"') assert obs.exit_code == 0, 'The exit code should be 0.' assert 'hello\r\nworld' in obs.content assert '>' not in obs.content finally: _close_test_runtime(runtime) def test_multiline_command_loop(temp_dir, runtime_cls): # https://github.com/All-Hands-AI/OpenHands/issues/3143 init_cmd = """ mkdir -p _modules && \ for month in {01..04}; do for day in {01..05}; do touch "_modules/2024-${month}-${day}-sample.md" done done echo "created files" """ follow_up_cmd = """ for file in _modules/*.md; do new_date=$(echo $file | sed -E 's/2024-(01|02|03|04)-/2024-/;s/2024-01/2024-08/;s/2024-02/2024-09/;s/2024-03/2024-10/;s/2024-04/2024-11/') mv "$file" "$new_date" done echo "success" """ runtime = _load_runtime(temp_dir, runtime_cls) try: obs = _run_cmd_action(runtime, init_cmd) assert obs.exit_code == 0, 'The exit code should be 0.' assert 'created files' in obs.content obs = _run_cmd_action(runtime, follow_up_cmd) assert obs.exit_code == 0, 'The exit code should be 0.' assert 'success' in obs.content finally: _close_test_runtime(runtime) def test_cmd_run(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: obs = _run_cmd_action(runtime, 'ls -l /openhands/workspace') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, 'ls -l') assert obs.exit_code == 0 assert 'total 0' in obs.content obs = _run_cmd_action(runtime, 'mkdir test') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, 'ls -l') assert obs.exit_code == 0 if run_as_openhands: assert 'openhands' in obs.content else: assert 'root' in obs.content assert 'test' in obs.content obs = _run_cmd_action(runtime, 'touch test/foo.txt') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, 'ls -l test') assert obs.exit_code == 0 assert 'foo.txt' in obs.content # clean up: this is needed, since CI will not be # run as root, and this test may leave a file # owned by root _run_cmd_action(runtime, 'rm -rf test') assert obs.exit_code == 0 finally: _close_test_runtime(runtime) def test_run_as_user_correct_home_dir(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: obs = _run_cmd_action(runtime, 'cd ~ && pwd') assert obs.exit_code == 0 if run_as_openhands: assert '/home/openhands' in obs.content else: assert '/root' in obs.content finally: _close_test_runtime(runtime) def test_multi_cmd_run_in_single_line(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: obs = _run_cmd_action(runtime, 'pwd && ls -l') assert obs.exit_code == 0 assert '/workspace' in obs.content assert 'total 0' in obs.content finally: _close_test_runtime(runtime) def test_stateful_cmd(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) sandbox_dir = _get_sandbox_folder(runtime) try: obs = _run_cmd_action(runtime, 'mkdir -p test') assert obs.exit_code == 0, 'The exit code should be 0.' obs = _run_cmd_action(runtime, 'cd test') assert obs.exit_code == 0, 'The exit code should be 0.' obs = _run_cmd_action(runtime, 'pwd') assert obs.exit_code == 0, 'The exit code should be 0.' assert f'{sandbox_dir}/test' in obs.content finally: _close_test_runtime(runtime) def test_failed_cmd(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: obs = _run_cmd_action(runtime, 'non_existing_command') assert obs.exit_code != 0, 'The exit code should not be 0 for a failed command.' finally: _close_test_runtime(runtime) def _create_test_file(host_temp_dir): # Single file with open(os.path.join(host_temp_dir, 'test_file.txt'), 'w') as f: f.write('Hello, World!') def test_copy_single_file(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: sandbox_dir = _get_sandbox_folder(runtime) sandbox_file = os.path.join(sandbox_dir, 'test_file.txt') _create_test_file(temp_dir) runtime.copy_to(os.path.join(temp_dir, 'test_file.txt'), sandbox_dir) obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}') assert obs.exit_code == 0 assert 'test_file.txt' in obs.content obs = _run_cmd_action(runtime, f'cat {sandbox_file}') assert obs.exit_code == 0 assert 'Hello, World!' in obs.content finally: _close_test_runtime(runtime) def _create_host_test_dir_with_files(test_dir): logger.debug(f'creating `{test_dir}`') if not os.path.isdir(test_dir): os.makedirs(test_dir, exist_ok=True) logger.debug('creating test files in `test_dir`') with open(os.path.join(test_dir, 'file1.txt'), 'w') as f: f.write('File 1 content') with open(os.path.join(test_dir, 'file2.txt'), 'w') as f: f.write('File 2 content') def test_copy_directory_recursively(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) sandbox_dir = _get_sandbox_folder(runtime) try: temp_dir_copy = os.path.join(temp_dir, 'test_dir') # We need a separate directory, since temp_dir is mounted to /workspace _create_host_test_dir_with_files(temp_dir_copy) runtime.copy_to(temp_dir_copy, sandbox_dir, recursive=True) obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}') assert obs.exit_code == 0 assert 'test_dir' in obs.content assert 'file1.txt' not in obs.content assert 'file2.txt' not in obs.content obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}/test_dir') assert obs.exit_code == 0 assert 'file1.txt' in obs.content assert 'file2.txt' in obs.content obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/test_dir/file1.txt') assert obs.exit_code == 0 assert 'File 1 content' in obs.content finally: _close_test_runtime(runtime) def test_copy_to_non_existent_directory(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: sandbox_dir = _get_sandbox_folder(runtime) _create_test_file(temp_dir) runtime.copy_to( os.path.join(temp_dir, 'test_file.txt'), f'{sandbox_dir}/new_dir' ) obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/new_dir/test_file.txt') assert obs.exit_code == 0 assert 'Hello, World!' in obs.content finally: _close_test_runtime(runtime) def test_overwrite_existing_file(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: sandbox_dir = _get_sandbox_folder(runtime) obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, f'touch {sandbox_dir}/test_file.txt') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}') assert obs.exit_code == 0 obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/test_file.txt') assert obs.exit_code == 0 assert 'Hello, World!' not in obs.content _create_test_file(temp_dir) runtime.copy_to(os.path.join(temp_dir, 'test_file.txt'), sandbox_dir) obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/test_file.txt') assert obs.exit_code == 0 assert 'Hello, World!' in obs.content finally: _close_test_runtime(runtime) def test_copy_non_existent_file(temp_dir, runtime_cls): runtime = _load_runtime(temp_dir, runtime_cls) try: sandbox_dir = _get_sandbox_folder(runtime) with pytest.raises(FileNotFoundError): runtime.copy_to( os.path.join(sandbox_dir, 'non_existent_file.txt'), f'{sandbox_dir}/should_not_exist.txt', ) obs = _run_cmd_action(runtime, f'ls {sandbox_dir}/should_not_exist.txt') assert obs.exit_code != 0 # File should not exist finally: _close_test_runtime(runtime) def test_copy_from_directory(temp_dir, runtime_cls): runtime: Runtime = _load_runtime(temp_dir, runtime_cls) sandbox_dir = _get_sandbox_folder(runtime) try: temp_dir_copy = os.path.join(temp_dir, 'test_dir') # We need a separate directory, since temp_dir is mounted to /workspace _create_host_test_dir_with_files(temp_dir_copy) # Initial state runtime.copy_to(temp_dir_copy, sandbox_dir, recursive=True) path_to_copy_from = f'{sandbox_dir}/test_dir' result = runtime.copy_from(path=path_to_copy_from) # Result is returned as a path assert isinstance(result, Path) result.unlink() finally: _close_test_runtime(runtime) def test_keep_prompt(runtime_cls, temp_dir): runtime = _load_runtime( temp_dir, runtime_cls=runtime_cls, run_as_openhands=False, ) try: sandbox_dir = _get_sandbox_folder(runtime) obs = _run_cmd_action(runtime, f'touch {sandbox_dir}/test_file.txt') assert obs.exit_code == 0 assert 'root@' in obs.interpreter_details obs = _run_cmd_action( runtime, f'cat {sandbox_dir}/test_file.txt', keep_prompt=False ) assert obs.exit_code == 0 assert 'root@' not in obs.interpreter_details finally: _close_test_runtime(runtime) @pytest.mark.skipif( TEST_IN_CI != 'True', reason='This test is not working in WSL (file ownership)', ) def test_git_operation(runtime_cls): # do not mount workspace, since workspace mount by tests will be owned by root # while the user_id we get via os.getuid() is different from root # which causes permission issues runtime = _load_runtime( temp_dir=None, runtime_cls=runtime_cls, # Need to use non-root user to expose issues run_as_openhands=True, ) # this will happen if permission of runtime is not properly configured # fatal: detected dubious ownership in repository at '/workspace' try: # check the ownership of the current directory obs = _run_cmd_action(runtime, 'ls -alh .') assert obs.exit_code == 0 # drwx--S--- 2 openhands root 64 Aug 7 23:32 . # drwxr-xr-x 1 root root 4.0K Aug 7 23:33 .. for line in obs.content.split('\r\n'): if ' ..' in line: # parent directory should be owned by root assert 'root' in line assert 'openhands' not in line elif ' .' in line: # current directory should be owned by openhands # and its group should be root assert 'openhands' in line assert 'root' in line # make sure all git operations are allowed obs = _run_cmd_action(runtime, 'git init') assert obs.exit_code == 0 # create a file obs = _run_cmd_action(runtime, 'echo "hello" > test_file.txt') assert obs.exit_code == 0 # git add obs = _run_cmd_action(runtime, 'git add test_file.txt') assert obs.exit_code == 0 # git diff obs = _run_cmd_action(runtime, 'git diff') assert obs.exit_code == 0 # git commit obs = _run_cmd_action(runtime, 'git commit -m "test commit"') assert obs.exit_code == 0 finally: _close_test_runtime(runtime) def test_python_version(temp_dir, runtime_cls, run_as_openhands): runtime = _load_runtime(temp_dir, runtime_cls, run_as_openhands) try: obs = runtime.run_action(CmdRunAction(command='python --version')) assert isinstance( obs, CmdOutputObservation ), 'The observation should be a CmdOutputObservation.' assert obs.exit_code == 0, 'The exit code should be 0.' assert 'Python 3' in obs.content, 'The output should contain "Python 3".' finally: _close_test_runtime(runtime)