test_agent_controller.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. import asyncio
  2. from unittest.mock import AsyncMock, MagicMock, Mock
  3. from uuid import uuid4
  4. import pytest
  5. from openhands.controller.agent import Agent
  6. from openhands.controller.agent_controller import AgentController
  7. from openhands.controller.state.state import TrafficControlState
  8. from openhands.core.config import AppConfig
  9. from openhands.core.main import run_controller
  10. from openhands.core.schema import AgentState
  11. from openhands.events import Event, EventSource, EventStream, EventStreamSubscriber
  12. from openhands.events.action import ChangeAgentStateAction, CmdRunAction, MessageAction
  13. from openhands.events.observation import (
  14. ErrorObservation,
  15. )
  16. from openhands.events.serialization import event_to_dict
  17. from openhands.llm import LLM
  18. from openhands.llm.metrics import Metrics
  19. from openhands.runtime.base import Runtime
  20. from openhands.storage import get_file_store
  21. @pytest.fixture
  22. def temp_dir(tmp_path_factory: pytest.TempPathFactory) -> str:
  23. return str(tmp_path_factory.mktemp('test_event_stream'))
  24. @pytest.fixture(scope='function')
  25. def event_loop():
  26. loop = asyncio.get_event_loop_policy().new_event_loop()
  27. yield loop
  28. loop.close()
  29. @pytest.fixture
  30. def mock_agent():
  31. return MagicMock(spec=Agent)
  32. @pytest.fixture
  33. def mock_event_stream():
  34. return MagicMock(spec=EventStream)
  35. @pytest.fixture
  36. def mock_status_callback():
  37. return AsyncMock()
  38. @pytest.mark.asyncio
  39. async def test_set_agent_state(mock_agent, mock_event_stream):
  40. controller = AgentController(
  41. agent=mock_agent,
  42. event_stream=mock_event_stream,
  43. max_iterations=10,
  44. sid='test',
  45. confirmation_mode=False,
  46. headless_mode=True,
  47. )
  48. await controller.set_agent_state_to(AgentState.RUNNING)
  49. assert controller.get_agent_state() == AgentState.RUNNING
  50. await controller.set_agent_state_to(AgentState.PAUSED)
  51. assert controller.get_agent_state() == AgentState.PAUSED
  52. await controller.close()
  53. @pytest.mark.asyncio
  54. async def test_on_event_message_action(mock_agent, mock_event_stream):
  55. controller = AgentController(
  56. agent=mock_agent,
  57. event_stream=mock_event_stream,
  58. max_iterations=10,
  59. sid='test',
  60. confirmation_mode=False,
  61. headless_mode=True,
  62. )
  63. controller.state.agent_state = AgentState.RUNNING
  64. message_action = MessageAction(content='Test message')
  65. await controller.on_event(message_action)
  66. assert controller.get_agent_state() == AgentState.RUNNING
  67. await controller.close()
  68. @pytest.mark.asyncio
  69. async def test_on_event_change_agent_state_action(mock_agent, mock_event_stream):
  70. controller = AgentController(
  71. agent=mock_agent,
  72. event_stream=mock_event_stream,
  73. max_iterations=10,
  74. sid='test',
  75. confirmation_mode=False,
  76. headless_mode=True,
  77. )
  78. controller.state.agent_state = AgentState.RUNNING
  79. change_state_action = ChangeAgentStateAction(agent_state=AgentState.PAUSED)
  80. await controller.on_event(change_state_action)
  81. assert controller.get_agent_state() == AgentState.PAUSED
  82. await controller.close()
  83. @pytest.mark.asyncio
  84. async def test_react_to_exception(mock_agent, mock_event_stream, mock_status_callback):
  85. controller = AgentController(
  86. agent=mock_agent,
  87. event_stream=mock_event_stream,
  88. status_callback=mock_status_callback,
  89. max_iterations=10,
  90. sid='test',
  91. confirmation_mode=False,
  92. headless_mode=True,
  93. )
  94. error_message = 'Test error'
  95. await controller._react_to_exception(RuntimeError(error_message))
  96. controller.status_callback.assert_called_once()
  97. await controller.close()
  98. @pytest.mark.asyncio
  99. async def test_run_controller_with_fatal_error(mock_agent, mock_event_stream):
  100. config = AppConfig()
  101. file_store = get_file_store(config.file_store, config.file_store_path)
  102. event_stream = EventStream(sid='test', file_store=file_store)
  103. agent = MagicMock(spec=Agent)
  104. agent = MagicMock(spec=Agent)
  105. def agent_step_fn(state):
  106. print(f'agent_step_fn received state: {state}')
  107. return CmdRunAction(command='ls')
  108. agent.step = agent_step_fn
  109. agent.llm = MagicMock(spec=LLM)
  110. agent.llm.metrics = Metrics()
  111. agent.llm.config = config.get_llm_config()
  112. runtime = MagicMock(spec=Runtime)
  113. async def on_event(event: Event):
  114. if isinstance(event, CmdRunAction):
  115. error_obs = ErrorObservation('You messed around with Jim')
  116. error_obs._cause = event.id
  117. event_stream.add_event(error_obs, EventSource.USER)
  118. event_stream.subscribe(EventStreamSubscriber.RUNTIME, on_event, str(uuid4()))
  119. runtime.event_stream = event_stream
  120. state = await run_controller(
  121. config=config,
  122. initial_user_action=MessageAction(content='Test message'),
  123. runtime=runtime,
  124. sid='test',
  125. agent=agent,
  126. fake_user_response_fn=lambda _: 'repeat',
  127. )
  128. print(f'state: {state}')
  129. print(f'event_stream: {list(event_stream.get_events())}')
  130. assert state.iteration == 4
  131. assert state.agent_state == AgentState.ERROR
  132. assert state.last_error == 'Agent got stuck in a loop'
  133. assert len(list(event_stream.get_events())) == 11
  134. @pytest.mark.asyncio
  135. async def test_run_controller_stop_with_stuck():
  136. config = AppConfig()
  137. file_store = get_file_store(config.file_store, config.file_store_path)
  138. event_stream = EventStream(sid='test', file_store=file_store)
  139. agent = MagicMock(spec=Agent)
  140. def agent_step_fn(state):
  141. print(f'agent_step_fn received state: {state}')
  142. return CmdRunAction(command='ls')
  143. agent.step = agent_step_fn
  144. agent.llm = MagicMock(spec=LLM)
  145. agent.llm.metrics = Metrics()
  146. agent.llm.config = config.get_llm_config()
  147. runtime = MagicMock(spec=Runtime)
  148. async def on_event(event: Event):
  149. if isinstance(event, CmdRunAction):
  150. non_fatal_error_obs = ErrorObservation(
  151. 'Non fatal error here to trigger loop'
  152. )
  153. non_fatal_error_obs._cause = event.id
  154. event_stream.add_event(non_fatal_error_obs, EventSource.ENVIRONMENT)
  155. event_stream.subscribe(EventStreamSubscriber.RUNTIME, on_event, str(uuid4()))
  156. runtime.event_stream = event_stream
  157. state = await run_controller(
  158. config=config,
  159. initial_user_action=MessageAction(content='Test message'),
  160. runtime=runtime,
  161. sid='test',
  162. agent=agent,
  163. fake_user_response_fn=lambda _: 'repeat',
  164. )
  165. events = list(event_stream.get_events())
  166. print(f'state: {state}')
  167. for i, event in enumerate(events):
  168. print(f'event {i}: {event_to_dict(event)}')
  169. assert state.iteration == 4
  170. assert len(events) == 11
  171. # check the eventstream have 4 pairs of repeated actions and observations
  172. repeating_actions_and_observations = events[2:10]
  173. for action, observation in zip(
  174. repeating_actions_and_observations[0::2],
  175. repeating_actions_and_observations[1::2],
  176. ):
  177. action_dict = event_to_dict(action)
  178. observation_dict = event_to_dict(observation)
  179. assert action_dict['action'] == 'run' and action_dict['args']['command'] == 'ls'
  180. assert (
  181. observation_dict['observation'] == 'error'
  182. and observation_dict['content'] == 'Non fatal error here to trigger loop'
  183. )
  184. last_event = event_to_dict(events[-1])
  185. assert last_event['extras']['agent_state'] == 'error'
  186. assert last_event['observation'] == 'agent_state_changed'
  187. assert state.agent_state == AgentState.ERROR
  188. assert state.last_error == 'Agent got stuck in a loop'
  189. @pytest.mark.asyncio
  190. @pytest.mark.parametrize(
  191. 'delegate_state',
  192. [
  193. AgentState.RUNNING,
  194. AgentState.FINISHED,
  195. AgentState.ERROR,
  196. AgentState.REJECTED,
  197. ],
  198. )
  199. async def test_delegate_step_different_states(
  200. mock_agent, mock_event_stream, delegate_state
  201. ):
  202. controller = AgentController(
  203. agent=mock_agent,
  204. event_stream=mock_event_stream,
  205. max_iterations=10,
  206. sid='test',
  207. confirmation_mode=False,
  208. headless_mode=True,
  209. )
  210. mock_delegate = AsyncMock()
  211. controller.delegate = mock_delegate
  212. mock_delegate.state.iteration = 5
  213. mock_delegate.state.outputs = {'result': 'test'}
  214. mock_delegate.agent.name = 'TestDelegate'
  215. mock_delegate.get_agent_state = Mock(return_value=delegate_state)
  216. mock_delegate._step = AsyncMock()
  217. mock_delegate.close = AsyncMock()
  218. await controller._delegate_step()
  219. mock_delegate._step.assert_called_once()
  220. if delegate_state == AgentState.RUNNING:
  221. assert controller.delegate is not None
  222. assert controller.state.iteration == 0
  223. mock_delegate.close.assert_not_called()
  224. else:
  225. assert controller.delegate is None
  226. assert controller.state.iteration == 5
  227. mock_delegate.close.assert_called_once()
  228. await controller.close()
  229. @pytest.mark.asyncio
  230. async def test_step_max_iterations(mock_agent, mock_event_stream):
  231. controller = AgentController(
  232. agent=mock_agent,
  233. event_stream=mock_event_stream,
  234. max_iterations=10,
  235. sid='test',
  236. confirmation_mode=False,
  237. headless_mode=False,
  238. )
  239. controller.state.agent_state = AgentState.RUNNING
  240. controller.state.iteration = 10
  241. assert controller.state.traffic_control_state == TrafficControlState.NORMAL
  242. await controller._step()
  243. assert controller.state.traffic_control_state == TrafficControlState.THROTTLING
  244. assert controller.state.agent_state == AgentState.ERROR
  245. await controller.close()
  246. @pytest.mark.asyncio
  247. async def test_step_max_iterations_headless(mock_agent, mock_event_stream):
  248. controller = AgentController(
  249. agent=mock_agent,
  250. event_stream=mock_event_stream,
  251. max_iterations=10,
  252. sid='test',
  253. confirmation_mode=False,
  254. headless_mode=True,
  255. )
  256. controller.state.agent_state = AgentState.RUNNING
  257. controller.state.iteration = 10
  258. assert controller.state.traffic_control_state == TrafficControlState.NORMAL
  259. await controller._step()
  260. assert controller.state.traffic_control_state == TrafficControlState.THROTTLING
  261. # In headless mode, throttling results in an error
  262. assert controller.state.agent_state == AgentState.ERROR
  263. await controller.close()
  264. @pytest.mark.asyncio
  265. async def test_step_max_budget(mock_agent, mock_event_stream):
  266. controller = AgentController(
  267. agent=mock_agent,
  268. event_stream=mock_event_stream,
  269. max_iterations=10,
  270. max_budget_per_task=10,
  271. sid='test',
  272. confirmation_mode=False,
  273. headless_mode=False,
  274. )
  275. controller.state.agent_state = AgentState.RUNNING
  276. controller.state.metrics.accumulated_cost = 10.1
  277. assert controller.state.traffic_control_state == TrafficControlState.NORMAL
  278. await controller._step()
  279. assert controller.state.traffic_control_state == TrafficControlState.THROTTLING
  280. assert controller.state.agent_state == AgentState.ERROR
  281. await controller.close()
  282. @pytest.mark.asyncio
  283. async def test_step_max_budget_headless(mock_agent, mock_event_stream):
  284. controller = AgentController(
  285. agent=mock_agent,
  286. event_stream=mock_event_stream,
  287. max_iterations=10,
  288. max_budget_per_task=10,
  289. sid='test',
  290. confirmation_mode=False,
  291. headless_mode=True,
  292. )
  293. controller.state.agent_state = AgentState.RUNNING
  294. controller.state.metrics.accumulated_cost = 10.1
  295. assert controller.state.traffic_control_state == TrafficControlState.NORMAL
  296. await controller._step()
  297. assert controller.state.traffic_control_state == TrafficControlState.THROTTLING
  298. # In headless mode, throttling results in an error
  299. assert controller.state.agent_state == AgentState.ERROR
  300. await controller.close()