| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import time
- from typing import Any, Union
- import requests
- from requests.exceptions import ConnectionError, HTTPError, Timeout
- class InvariantClient:
- timeout: int = 120
- def __init__(self, server_url: str, session_id: str | None = None):
- self.server = server_url
- self.session_id, err = self._create_session(session_id)
- if err:
- raise RuntimeError(f'Failed to create session: {err}')
- self.Policy = self._Policy(self)
- self.Monitor = self._Monitor(self)
- def _create_session(
- self, session_id: str | None = None
- ) -> tuple[str | None, Exception | None]:
- elapsed = 0
- while elapsed < self.timeout:
- try:
- if session_id:
- response = requests.get(
- f'{self.server}/session/new?session_id={session_id}', timeout=60
- )
- else:
- response = requests.get(f'{self.server}/session/new', timeout=60)
- response.raise_for_status()
- return response.json().get('id'), None
- except (ConnectionError, Timeout):
- elapsed += 1
- time.sleep(1)
- except HTTPError as http_err:
- return None, http_err
- except Exception as err:
- return None, err
- return None, ConnectionError('Connection timed out')
- def close_session(self) -> Union[None, Exception]:
- try:
- response = requests.delete(
- f'{self.server}/session/?session_id={self.session_id}', timeout=60
- )
- response.raise_for_status()
- except (ConnectionError, Timeout, HTTPError) as err:
- return err
- return None
- class _Policy:
- def __init__(self, invariant):
- self.server = invariant.server
- self.session_id = invariant.session_id
- def _create_policy(self, rule: str) -> tuple[str | None, Exception | None]:
- try:
- response = requests.post(
- f'{self.server}/policy/new?session_id={self.session_id}',
- json={'rule': rule},
- timeout=60,
- )
- response.raise_for_status()
- return response.json().get('policy_id'), None
- except (ConnectionError, Timeout, HTTPError) as err:
- return None, err
- def get_template(self) -> tuple[str | None, Exception | None]:
- try:
- response = requests.get(
- f'{self.server}/policy/template',
- timeout=60,
- )
- response.raise_for_status()
- return response.json(), None
- except (ConnectionError, Timeout, HTTPError) as err:
- return None, err
- def from_string(self, rule: str):
- policy_id, err = self._create_policy(rule)
- if err:
- raise err
- self.policy_id = policy_id
- return self
- def analyze(self, trace: list[dict]) -> Union[Any, Exception]:
- try:
- response = requests.post(
- f'{self.server}/policy/{self.policy_id}/analyze?session_id={self.session_id}',
- json={'trace': trace},
- timeout=60,
- )
- response.raise_for_status()
- return response.json(), None
- except (ConnectionError, Timeout, HTTPError) as err:
- return None, err
- class _Monitor:
- def __init__(self, invariant):
- self.server = invariant.server
- self.session_id = invariant.session_id
- self.policy = ''
- def _create_monitor(self, rule: str) -> tuple[str | None, Exception | None]:
- try:
- response = requests.post(
- f'{self.server}/monitor/new?session_id={self.session_id}',
- json={'rule': rule},
- timeout=60,
- )
- response.raise_for_status()
- return response.json().get('monitor_id'), None
- except (ConnectionError, Timeout, HTTPError) as err:
- return None, err
- def from_string(self, rule: str):
- monitor_id, err = self._create_monitor(rule)
- if err:
- raise err
- self.monitor_id = monitor_id
- self.policy = rule
- return self
- def check(
- self, past_events: list[dict], pending_events: list[dict]
- ) -> Union[Any, Exception]:
- try:
- response = requests.post(
- f'{self.server}/monitor/{self.monitor_id}/check?session_id={self.session_id}',
- json={'past_events': past_events, 'pending_events': pending_events},
- timeout=60,
- )
- response.raise_for_status()
- return response.json(), None
- except (ConnectionError, Timeout, HTTPError) as err:
- return None, err
|