Nowadays, AI has become an indispensable tool in daily work, especially for office workers and developers. However, current AI models still face significant limitations in accessing and processing information.
Therefore, in this article, I will introduce a method to enhance AI’s information retrieval capabilities and improve the efficiency of its data access.
AI and humans share common traits, especially in how they search for and process information. As a result, both AI and humans face similar challenges in this process. To better understand this, let’s start by examining how humans retrieve and process information.
Imagine you are an intelligent person with strong analytical and information-processing skills. However, you cannot know everything. When encountering a new concept, such as “RAG”, your brain follows this process:
This process applies to AI as well. While AI does not inherently “know” everything, it can efficiently search, analyze, and synthesize information based on input data.
RAG (Retrieval-Augmented Generation) is a method that enables AI to retrieve and utilize information more effectively.
A typical RAG system consists of two main components:
With RAG, AI can extract information from external sources instead of relying solely on pre-trained knowledge. This enhances accuracy and relevance, ensuring AI responses remain up-to-date. 🚀
Trong bài viết này, mình sẽ sử dụng RediSearch làm bộ tìm kiếm thông tin và mô hình Llama 3.1 thông qua Ollama.
Dưới đây là thông tin và hướng dẫn cài đặt:
In this article, I will use RediSearch as the information retrieval system and the Llama 3.1 model via Ollama.
Below are the resources and installation guides:
In this article, the input dataset consists of my entire email history, which will be indexed in RediSearch for retrieval.
Each email, once downloaded from Gmail, will be vectorized using an embedding algorithm. I will cover this process in more detail in future articles.
For now, you only need to understand that the raw input data has a variable length and needs to be converted into a fixed-length representation. Specifically, each email will be represented as a 384-dimensional vector.
Once vectorized, the K-Nearest Neighbors (KNN) algorithm will be used to find the most relevant emails, enabling the AI model to retrieve information effectively.
Tools and Libraries
I will use Python along with the following libraries:
google-api-python-client==2.122.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.0
redis>=5.2.1
sentence-transformers>=4.0.1
The following code is used to retrieve credentials from Google. The credentials.json file is a service account key, which you can generate in the Google Cloud Platform Console.
import os.path
import pickle
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
def get_gmail_service():
    """Gets Gmail API service instance."""
    creds = None
    pickle_file = os.path.join(ROOT_DIR, 'etc', 'token.pickle')
    # The file token.pickle stores the user's access and refresh tokens
    if os.path.exists(pickle_file):
        with open(pickle_file, 'rb') as token:
            creds = pickle.load(token)
    
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                os.path.join(ROOT_DIR, 'etc', 'credentials.json'), SCOPES)
            # Default port that matches Google Cloud Console's default redirect URI
            creds = flow.run_local_server(port=8000)
        # Save the credentials for the next run
        with open(pickle_file, 'wb') as token:
            pickle.dump(creds, token)
    return build('gmail', 'v1', credentials=creds)
This function retrieves a list of emails from start_date to end_date using a Gmail query.
def get_list_emails(service, start_date=None, end_date=None):
    # Construct date query if dates provided
    query = ""
    if start_date:
        query += f"after:{start_date} "
    if end_date:
        query += f"before:{end_date}"
        
    # Get all message IDs first
    print("Fetching email IDs...")
    results = service.users().messages().list(userId='me', q=query).execute()
    messages = results.get('messages', [])
    
    # Keep getting messages if there are more pages
    while 'nextPageToken' in results:
        results = service.users().messages().list(
            userId='me',
            q=query,
            pageToken=results['nextPageToken']
        ).execute()
        messages.extend(results.get('messages', []))
        
    total_emails = len(messages)
    print(f"Found {total_emails} emails")
    return messages
def get_detail_email(service, msg_id):
    message = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
    body = ""
    try:
        if 'parts' in message['payload']:
            for part in message['payload']['parts']:
                if part['mimeType'] == 'text/plain':  # Can also check for 'text/html'
                    body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
                    break
        else:
            body = base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8')
    except Exception:
        body = ""
    # Extract headers
    headers = message['payload']['headers']
    email_data = {
        'id': msg_id,
        'subject': '',
        'from': '',
        'date': '',
        'snippet': message['snippet'],
        'body': body,
    }
    # Get relevant headers
    for header in headers:
        name = header['name'].lower()
        if name == 'subject':
            email_data['subject'] = header['value']
        elif name == 'from':
            email_data['from'] = header['value']
        elif name == 'date':
            email_data['date'] = header['value']
            
    return email_data
From the functions above, we can retrieve emails as follows. The generator will sequentially fetch each email.
g_service = get_gmail_service()
list_emails = get_list_emails(g_service, '2024/01/01', '2024/12/31')
emails_generator = (get_detail_email(g_service, email['id']) for email in list_emails)
def get_redis(db=0):
    return redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=db, decode_responses=True)
We use the all-MiniLM-L6-v2 model, a simple yet high-performance model. Since email content is typically neither too long nor overly complex, this model is a suitable choice. It converts text into a 384-dimensional embedding vector.
Additionally, you can also use an AI model from Ollama for vectorization without needing to import external libraries.
def embed_text(text):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    return model.encode(text).astype(np.float32).tobytes()
We store emails in Redis as a hashmap, which includes information retrieved from Gmail along with an additional field containing the email embedding.
def store_in_redis(redis_client, email, embedding_vector):
    email_data = {
        "id": email['id'],
        "subject": email['subject'],
        "sender": email['from'],
        "date": email['date'],
        "snippet": email['snippet'],
        "body": email['body']
    }
    
    redis_client.hset(f"{REDIS_EMAIL_PREFIX_KEY}{email['id']}", mapping={
        **email,
        "embedding": embedding_vector
    }) 
Now, we will set up the pipeline to ingest data.
The embedding_vector is the representation vector mentioned earlier. It is generated from key email information, including the sender, subject, and snippet (a content preview).
g_service = get_gmail_service()
redis_client = get_redis()
list_emails = get_list_emails(g_service, '2024/01/01', '2024/12/31')
emails_generator = (get_detail_email(g_service, email['id']) for email in list_emails)
for email in emails_generate:
    embed_text = f'sender:{email_data["sender"]} subject:{email_data["subject"]} snippet:{email_data["snippet"]}'
    embedding_vector = embed_text(embed_text)
    store_in_redis(redis_client, email, embedding_vector)
After successfully ingesting the data, the next step is indexing to optimize search performance in Redis.
In the configuration, I define the embedding field as a 384-dimensional vector and use cosine similarity to measure the similarity between vectors. You can learn more about cosine similarity here:
https://en.wikipedia.org/wiki/Cosine_similarity
from redis.commands.search.field import TextField, VectorField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
def create_index_emails():
    r = utils.get_redis(0)
    vector_dim = 384
    # drop old index
    try:
        r.ft('email_idx').dropindex(delete_documents=False)
    except Exception:
        pass
    schema = (
        TextField("id"),
        TextField("subject"),
        TextField("sender"), 
        TextField("date"),
        TextField("snippet"),
        VectorField("embedding", 
                "FLAT", {
                    "TYPE": "FLOAT32",
                    "DIM": 384,
                    "DISTANCE_METRIC": "COSINE"
                })
    )
    definition = IndexDefinition(
        prefix=['emails:'],
        index_type=IndexType.HASH,
    )
    r.ft(REDIS_EMAIL_INDEX).create_index(schema, definition=definition)
We will create a function that allows the AI model to query data from RediSearch. This function will retrieve and return the top-k most relevant emails based on the input query.
def search_redis(query, k=5):
    r = get_redis(0)
    query_embedding = embed_text(query)
    q = (
        Query("*=>[KNN $K @embedding $BLOB AS score]")
        .return_fields("id", "subject", "sender", "date", "snippet", "body", "score")
        .dialect(2)
    )
    
    query_params = {
        "K": k,
        "BLOB": query_embedding,
    }
    results = r.ft(REDIS_EMAIL_INDEX).search(q, query_params)
    
    if not results.docs:
        return []
    
    # Format results
    emails = []
    for doc in results.docs:
        email_data = {
            'id': doc.id.replace(REDIS_EMAIL_PREFIX_KEY, ''),
            'subject': doc.subject,
            'sender': doc.sender,
            'date': doc.date,
            'snippet': doc.snippet,
            'body': doc.body,
            'score': doc.score
        }
        emails.append(email_data)
        
    return emails
We will use the AI model via the Ollama SDK for Python.
When receiving a query, the first step is to ask the AI whether it needs to search the database first:
try:
    query = input("Me: ")
    rich_query = f'''
    This is my query: {query}
    You can use function search_redis to search emails in redis database performantly.
    Do you want to use this function?
    Must responds only with yes or no.
    '''
    response = generate(
        model="llama3.1:8b",
        prompt=rich_query,
    )
    print('BOT: Hmm, Should I search the database? ', response.response)
    if response.response == "yes":
        results = search_redis(query, 2)
        context = '\n\n'.join([
            f'''
            Subject: {email['subject']}
            Snippet: {email['snippet']}
            From: {email['sender']}
            Date: {email['date']}
            '''
            for email in results
        ])
        query_with_context = f'''
        This is my query: {query}
        This is the context: {context} 
        Please use the results base on context
        '''
        print('AI: ', end='')
        for chunk in generate(
            model="llama3.1:8b",
            prompt=query_with_context,
            stream=True
        ):
            print(chunk.response, end='', flush=True)
        print('')
    else:
        print('AI: ', end='')
        for chunk in generate(
            model="llama3.1:8b",
            prompt=query,
            stream=True
        ):
            if 'response' in chunk:
                print(chunk['response'], end='', flush=True)
        print('')
        
except KeyboardInterrupt:
    exit(0)
The integration of AI with vector search, as described above, is simple but not highly efficient and hard to scale.
Therefore, in the next article, I will introduce a standardized integration approach that is gradually becoming a norm in the AI community: Model Context Protocol (MCP).
MCP has already been adopted in AI products like Claude Desktop and Cursor AI Editor. Stay tuned to explore this method in the next article! 🚀
If you’re interested in setting up a pure RAG system using only Python code without RediSearch, you can check out this resource:llama2 demo