| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- import threading
- import chromadb
- import llama_index.embeddings.openai.base as llama_openai
- from llama_index.core import Document, VectorStoreIndex
- from llama_index.core.retrievers import VectorIndexRetriever
- from llama_index.vector_stores.chroma import ChromaVectorStore
- from openai._exceptions import APIConnectionError, InternalServerError, RateLimitError
- from tenacity import (
- retry,
- retry_if_exception_type,
- stop_after_attempt,
- wait_random_exponential,
- )
- from opendevin.core.config import config
- from opendevin.core.logger import opendevin_logger as logger
- from opendevin.core.utils import json
- num_retries = config.llm.num_retries
- retry_min_wait = config.llm.retry_min_wait
- retry_max_wait = config.llm.retry_max_wait
- # llama-index includes a retry decorator around openai.get_embeddings() function
- # it is initialized with hard-coded values and errors
- # this non-customizable behavior is creating issues when it's retrying faster than providers' rate limits
- # this block attempts to banish it and replace it with our decorator, to allow users to set their own limits
- if hasattr(llama_openai.get_embeddings, '__wrapped__'):
- original_get_embeddings = llama_openai.get_embeddings.__wrapped__
- else:
- logger.warning('Cannot set custom retry limits.')
- num_retries = 1
- original_get_embeddings = llama_openai.get_embeddings
- def attempt_on_error(retry_state):
- logger.error(
- f'{retry_state.outcome.exception()}. Attempt #{retry_state.attempt_number} | You can customize these settings in the configuration.',
- exc_info=False,
- )
- return True
- @retry(
- reraise=True,
- stop=stop_after_attempt(num_retries),
- wait=wait_random_exponential(min=retry_min_wait, max=retry_max_wait),
- retry=retry_if_exception_type(
- (RateLimitError, APIConnectionError, InternalServerError)
- ),
- after=attempt_on_error,
- )
- def wrapper_get_embeddings(*args, **kwargs):
- return original_get_embeddings(*args, **kwargs)
- llama_openai.get_embeddings = wrapper_get_embeddings
- class EmbeddingsLoader:
- """Loader for embedding model initialization."""
- @staticmethod
- def get_embedding_model(strategy: str):
- supported_ollama_embed_models = [
- 'llama2',
- 'mxbai-embed-large',
- 'nomic-embed-text',
- 'all-minilm',
- 'stable-code',
- ]
- if strategy in supported_ollama_embed_models:
- from llama_index.embeddings.ollama import OllamaEmbedding
- return OllamaEmbedding(
- model_name=strategy,
- base_url=config.llm.embedding_base_url,
- ollama_additional_kwargs={'mirostat': 0},
- )
- elif strategy == 'openai':
- from llama_index.embeddings.openai import OpenAIEmbedding
- return OpenAIEmbedding(
- model='text-embedding-ada-002',
- api_key=config.llm.api_key,
- )
- elif strategy == 'azureopenai':
- from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
- return AzureOpenAIEmbedding(
- model='text-embedding-ada-002',
- deployment_name=config.llm.embedding_deployment_name,
- api_key=config.llm.api_key,
- azure_endpoint=config.llm.base_url,
- api_version=config.llm.api_version,
- )
- elif (strategy is not None) and (strategy.lower() == 'none'):
- # TODO: this works but is not elegant enough. The incentive is when
- # monologue agent is not used, there is no reason we need to initialize an
- # embedding model
- return None
- else:
- from llama_index.embeddings.huggingface import HuggingFaceEmbedding
- return HuggingFaceEmbedding(model_name='BAAI/bge-small-en-v1.5')
- sema = threading.Semaphore(value=config.agent.memory_max_threads)
- class LongTermMemory:
- """
- Handles storing information for the agent to access later, using chromadb.
- """
- def __init__(self):
- """
- Initialize the chromadb and set up ChromaVectorStore for later use.
- """
- db = chromadb.Client(chromadb.Settings(anonymized_telemetry=False))
- self.collection = db.get_or_create_collection(name='memories')
- vector_store = ChromaVectorStore(chroma_collection=self.collection)
- embedding_strategy = config.llm.embedding_model
- embed_model = EmbeddingsLoader.get_embedding_model(embedding_strategy)
- self.index = VectorStoreIndex.from_vector_store(vector_store, embed_model)
- self.thought_idx = 0
- self._add_threads = []
- def add_event(self, event: dict):
- """
- Adds a new event to the long term memory with a unique id.
- Parameters:
- - event (dict): The new event to be added to memory
- """
- id = ''
- t = ''
- if 'action' in event:
- t = 'action'
- id = event['action']
- elif 'observation' in event:
- t = 'observation'
- id = event['observation']
- doc = Document(
- text=json.dumps(event),
- doc_id=str(self.thought_idx),
- extra_info={
- 'type': t,
- 'id': id,
- 'idx': self.thought_idx,
- },
- )
- self.thought_idx += 1
- logger.debug('Adding %s event to memory: %d', t, self.thought_idx)
- thread = threading.Thread(target=self._add_doc, args=(doc,))
- self._add_threads.append(thread)
- thread.start() # We add the doc concurrently so we don't have to wait ~500ms for the insert
- def _add_doc(self, doc):
- with sema:
- self.index.insert(doc)
- def search(self, query: str, k: int = 10):
- """
- Searches through the current memory using VectorIndexRetriever
- Parameters:
- - query (str): A query to match search results to
- - k (int): Number of top results to return
- Returns:
- - list[str]: list of top k results found in current memory
- """
- retriever = VectorIndexRetriever(
- index=self.index,
- similarity_top_k=k,
- )
- results = retriever.retrieve(query)
- return [r.get_text() for r in results]
|