analyzer.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from typing import Any
  2. from uuid import uuid4
  3. from fastapi import Request
  4. from openhands.core.logger import openhands_logger as logger
  5. from openhands.events.action.action import Action, ActionSecurityRisk
  6. from openhands.events.event import Event
  7. from openhands.events.stream import EventStream, EventStreamSubscriber
  8. class SecurityAnalyzer:
  9. """Security analyzer that receives all events and analyzes agent actions for security risks."""
  10. def __init__(self, event_stream: EventStream):
  11. """Initializes a new instance of the SecurityAnalyzer class.
  12. Args:
  13. event_stream: The event stream to listen for events.
  14. """
  15. self.event_stream = event_stream
  16. self.event_stream.subscribe(
  17. EventStreamSubscriber.SECURITY_ANALYZER, self.on_event, str(uuid4())
  18. )
  19. async def on_event(self, event: Event) -> None:
  20. """Handles the incoming event, and when Action is received, analyzes it for security risks."""
  21. logger.debug(f'SecurityAnalyzer received event: {event}')
  22. await self.log_event(event)
  23. if not isinstance(event, Action):
  24. return
  25. try:
  26. event.security_risk = await self.security_risk(event) # type: ignore [attr-defined]
  27. await self.act(event)
  28. except Exception as e:
  29. logger.error(f'Error occurred while analyzing the event: {e}')
  30. async def handle_api_request(self, request: Request) -> Any:
  31. """Handles the incoming API request."""
  32. raise NotImplementedError(
  33. 'Need to implement handle_api_request method in SecurityAnalyzer subclass'
  34. )
  35. async def log_event(self, event: Event) -> None:
  36. """Logs the incoming event."""
  37. pass
  38. async def act(self, event: Event) -> None:
  39. """Performs an action based on the analyzed event."""
  40. pass
  41. async def security_risk(self, event: Action) -> ActionSecurityRisk:
  42. """Evaluates the Action for security risks and returns the risk level."""
  43. raise NotImplementedError(
  44. 'Need to implement security_risk method in SecurityAnalyzer subclass'
  45. )
  46. async def close(self) -> None:
  47. """Cleanup resources allocated by the SecurityAnalyzer."""
  48. pass