| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- from typing import Any
- from uuid import uuid4
- from fastapi import Request
- from openhands.core.logger import openhands_logger as logger
- from openhands.events.action.action import Action, ActionSecurityRisk
- from openhands.events.event import Event
- from openhands.events.stream import EventStream, EventStreamSubscriber
- class SecurityAnalyzer:
- """Security analyzer that receives all events and analyzes agent actions for security risks."""
- def __init__(self, event_stream: EventStream):
- """Initializes a new instance of the SecurityAnalyzer class.
- Args:
- event_stream: The event stream to listen for events.
- """
- self.event_stream = event_stream
- self.event_stream.subscribe(
- EventStreamSubscriber.SECURITY_ANALYZER, self.on_event, str(uuid4())
- )
- async def on_event(self, event: Event) -> None:
- """Handles the incoming event, and when Action is received, analyzes it for security risks."""
- logger.debug(f'SecurityAnalyzer received event: {event}')
- await self.log_event(event)
- if not isinstance(event, Action):
- return
- try:
- event.security_risk = await self.security_risk(event) # type: ignore [attr-defined]
- await self.act(event)
- except Exception as e:
- logger.error(f'Error occurred while analyzing the event: {e}')
- async def handle_api_request(self, request: Request) -> Any:
- """Handles the incoming API request."""
- raise NotImplementedError(
- 'Need to implement handle_api_request method in SecurityAnalyzer subclass'
- )
- async def log_event(self, event: Event) -> None:
- """Logs the incoming event."""
- pass
- async def act(self, event: Event) -> None:
- """Performs an action based on the analyzed event."""
- pass
- async def security_risk(self, event: Action) -> ActionSecurityRisk:
- """Evaluates the Action for security risks and returns the risk level."""
- raise NotImplementedError(
- 'Need to implement security_risk method in SecurityAnalyzer subclass'
- )
- async def close(self) -> None:
- """Cleanup resources allocated by the SecurityAnalyzer."""
- pass
|