analyzer.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from typing import Any
  2. from fastapi import Request
  3. from openhands.core.logger import openhands_logger as logger
  4. from openhands.events.action.action import Action, ActionSecurityRisk
  5. from openhands.events.event import Event
  6. from openhands.events.stream import EventStream, EventStreamSubscriber
  7. class SecurityAnalyzer:
  8. """Security analyzer that receives all events and analyzes agent actions for security risks."""
  9. def __init__(self, event_stream: EventStream):
  10. """Initializes a new instance of the SecurityAnalyzer class.
  11. Args:
  12. event_stream: The event stream to listen for events.
  13. """
  14. self.event_stream = event_stream
  15. self.event_stream.subscribe(
  16. EventStreamSubscriber.SECURITY_ANALYZER, self.on_event
  17. )
  18. async def on_event(self, event: Event) -> None:
  19. """Handles the incoming event, and when Action is received, analyzes it for security risks."""
  20. logger.info(f'SecurityAnalyzer received event: {event}')
  21. await self.log_event(event)
  22. if not isinstance(event, Action):
  23. return
  24. try:
  25. event.security_risk = await self.security_risk(event) # type: ignore [attr-defined]
  26. await self.act(event)
  27. except Exception as e:
  28. logger.error(f'Error occurred while analyzing the event: {e}')
  29. async def handle_api_request(self, request: Request) -> Any:
  30. """Handles the incoming API request."""
  31. raise NotImplementedError(
  32. 'Need to implement handle_api_request method in SecurityAnalyzer subclass'
  33. )
  34. async def log_event(self, event: Event) -> None:
  35. """Logs the incoming event."""
  36. pass
  37. async def act(self, event: Event) -> None:
  38. """Performs an action based on the analyzed event."""
  39. pass
  40. async def security_risk(self, event: Action) -> ActionSecurityRisk:
  41. """Evaluates the Action for security risks and returns the risk level."""
  42. raise NotImplementedError(
  43. 'Need to implement security_risk method in SecurityAnalyzer subclass'
  44. )
  45. async def close(self) -> None:
  46. """Cleanup resources allocated by the SecurityAnalyzer."""
  47. pass