LangChain Framework

Use YugabyteDB as a vector store with LangChain

Get started using YugabyteDB as a vector store with LangChain for developing Retrieval-Augmented Generation (RAG) apps.

LangChain is a powerful framework for developing large language model-powered applications. It provides a comprehensive toolkit for building context-aware LLM applications by managing the communication between LLMs and various data sources, including databases and vector stores.

YugabyteDB supports the pgvector extension in a distributed SQL architecture, providing resilience and seamless scalability for buildling generative AI (GAI) applications.

The langchain-yugabytedb Python package provides capabilities for GAI applications to use YugabyteDB as a vector store, using the LangChain framework's vectorstore retrieval for storing and retrieving vector data.

langchain-yugabytedb is available as a PyPi module.

For detailed information of all YugabyteDBVectorStore features and configurations, head to the langchain-yugabytedb GitHub repo.

Quick Start - Complete Working Example

Prerequisites

  • Python 3.9 or later
  • Docker
  • Create an OpenAI API Key. Export it as an environment variable with the name OPENAI_API_KEY.

Step 1: Setup Environment

Install all dependencies (including the missing langchain-postgres package):

pip install --upgrade --quiet langchain langchain-openai langchain-community langchain-postgres tiktoken psycopg-binary langchain-yugabytedb

Start YugabyteDB with proper vector extension support:

docker run -d --name yugabyte_node01 --hostname yugabyte01 \
  -p 7000:7000 -p 9000:9000 -p 15433:15433 -p 5433:5433 -p 9042:9042 \
  yugabytedb/yugabyte:2.25.2.0-b359 bin/yugabyted start --background=false \
  --master_flags="allowed_preview_flags_csv=ysql_yb_enable_advisory_locks,ysql_yb_enable_advisory_locks=true" \
  --tserver_flags="allowed_preview_flags_csv=ysql_yb_enable_advisory_locks,ysql_yb_enable_advisory_locks=true"

Enable the vector extension and verify it's working:

# Enable vector extension
docker exec -it yugabyte_node01 bin/ysqlsh -h yugabyte01 -c "CREATE extension if not exists vector;"

# Verify vector extension is installed
docker exec -it yugabyte_node01 bin/ysqlsh -h yugabyte01 -c "SELECT * FROM pg_extension WHERE extname = 'vector';"

# Test vector functionality
docker exec -it yugabyte_node01 bin/ysqlsh -h yugabyte01 -c "SELECT '[1,2,3]'::vector;"

Step 2: Create a sample langchain-yugabytedb application

Create a file called langchain_example.py with the following complete code:

#!/usr/bin/env python3
"""
Complete LangChain + YugabyteDB Integration Example
This script demonstrates a full end-to-end RAG application.
"""

import os
import getpass
from langchain_yugabytedb import YBEngine, YugabyteDBVectorStore
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.documents import Document
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

def setup_connection():
    """Set up connection parameters and initialize the vector store."""
    print("Setting up connection to YugabyteDB...")

    # Connection parameters
    YUGABYTEDB_USER = "yugabyte"
    YUGABYTEDB_PASSWORD = ""
    YUGABYTEDB_HOST = "localhost"
    YUGABYTEDB_PORT = "5433"
    YUGABYTEDB_DB = "yugabyte"

    # Table and vector parameters
    TABLE_NAME = "yugabyte_docs_collection"
    VECTOR_SIZE = 1536

    # Create connection string - using psycopg instead of asyncpg for better vector support
    CONNECTION_STRING = (
        f"postgresql+psycopg://{YUGABYTEDB_USER}:{YUGABYTEDB_PASSWORD}@{YUGABYTEDB_HOST}"
        f":{YUGABYTEDB_PORT}/{YUGABYTEDB_DB}"
    )

    # Initialize engine
    engine = YBEngine.from_connection_string(url=CONNECTION_STRING)

    # Initialize embeddings
    print("Initializing OpenAI embeddings...")
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        api_key = getpass.getpass("Enter your OpenAI API Key: ")

    embeddings = OpenAIEmbeddings(api_key=api_key)

    # Initialize vector store table
    print("Creating vector store table...")
    engine.init_vectorstore_table(
        table_name=TABLE_NAME,
        vector_size=VECTOR_SIZE,
    )

    # Create vector store
    vectorstore = YugabyteDBVectorStore.create_sync(
        engine=engine,
        table_name=TABLE_NAME,
        embedding_service=embeddings,
    )

    print("Connection setup complete!")
    return vectorstore, embeddings, api_key

def test_basic_operations(vectorstore):
    """Test basic vector store operations."""
    print("\nTesting basic vector store operations...")

    # Add test documents
    print("Adding test documents...")
    docs = [
        Document(page_content="Apples and oranges are delicious fruits"),
        Document(page_content="Cars and airplanes are modes of transportation"),
        Document(page_content="Trains are efficient for long-distance travel"),
        Document(page_content="YugabyteDB is a distributed SQL database"),
        Document(page_content="Vector databases store embeddings for similarity search"),
    ]

    vectorstore.add_documents(docs)
    print(f"Added {len(docs)} documents to vector store")

    # Test similarity search
    print("\nTesting similarity search...")
    queries = [
        "I'd like to eat some fruit",
        "What can I use to travel long distances?",
        "Tell me about database technology"
    ]

    for query in queries:
        print(f"\nQuery: '{query}'")
        results = vectorstore.similarity_search(query, k=2)
        for i, doc in enumerate(results, 1):
            print(f"  Result {i}: {doc.page_content}")

    return vectorstore

def create_rag_chain(vectorstore, api_key):
    """Create a RAG chain for advanced question answering."""
    print("\nSetting up RAG chain...")

    # Create retriever
    retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

    # Initialize LLM
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=api_key)

    # Define RAG prompt template
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant named 'YugaAI'. Answer questions based on the provided context. If you don't know the answer based on the context, say so."),
        ("human", "Context: {context}\n\nQuestion: {question}")
    ])

    # Build RAG chain
    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    print("RAG chain created successfully!")
    return rag_chain

def test_rag_chain(rag_chain):
    """Test the RAG chain with various questions."""
    print("\nTesting RAG chain...")

    test_questions = [
        "What are some examples of fruits?",
        "What transportation methods were mentioned?",
        "What is YugabyteDB?",
        "How do vector databases work?"
    ]

    for question in test_questions:
        print(f"\nQuestion: {question}")
        try:
            response = rag_chain.invoke(question)
            print(f"YugaAI: {response}")
        except Exception as e:
            print(f"Error: {e}")

def load_yugabyte_docs(vectorstore, embeddings, api_key):
    """Load YugabyteDB documentation and create a more comprehensive vector store."""
    print("\nLoading YugabyteDB documentation...")

    # Load documentation from web
    url = "https://docs.yugabyte.com/preview/releases/ybdb-releases/v2.25/"
    loader = WebBaseLoader(url)
    documents = loader.load()

    print(f"Loaded {len(documents)} documents from web")

    # Split documents into chunks
    text_splitter = CharacterTextSplitter(
        separator="\n\n",
        chunk_size=1000,
        chunk_overlap=200,
        length_function=len,
        is_separator_regex=False,
    )

    chunks = text_splitter.split_documents(documents)
    print(f"Split into {len(chunks)} chunks")

    # Add chunks to vector store
    print("Adding documentation chunks to vector store...")
    vectorstore.add_documents(chunks)
    print(f"Added {len(chunks)} documentation chunks")

    return vectorstore

def main():
    """Main function to run the complete example."""
    print("Starting LangChain + YugabyteDB Complete Example")
    print("=" * 60)

    try:
        # Step 1: Setup
        vectorstore, embeddings, api_key = setup_connection()

        # Step 2: Test basic operations
        vectorstore = test_basic_operations(vectorstore)

        # Step 3: Create RAG chain
        rag_chain = create_rag_chain(vectorstore, api_key)

        # Step 4: Test RAG chain
        test_rag_chain(rag_chain)

        # Step 5: Load additional documentation (optional)
        print("\n" + "=" * 60)
        print("OPTIONAL: Loading YugabyteDB documentation...")
        try:
            vectorstore = load_yugabyte_docs(vectorstore, embeddings, api_key)
            print("Documentation loaded successfully!")
            print("\nTesting with documentation...")
            test_rag_chain(rag_chain)
        except Exception as e:
            print(f"Could not load documentation: {e}")
            print("Continuing with basic example...")

        print("\n" + "=" * 60)
        print("Complete example finished successfully!")
        print("You now have a working RAG application with YugabyteDB!")

    except Exception as e:
        print(f"Error: {e}")
        print("Please check your setup and try again.")

if __name__ == "__main__":
    main()

Step 3: Run the Example

Execute the script:

python langchain_example.py

Expected Output:


Starting LangChain + YugabyteDB Complete Example
============================================================
Setting up connection to YugabyteDB...
Initializing OpenAI embeddings...
Creating vector store table...
Connection setup complete!

Testing basic vector store operations...
Adding test documents...
Added 5 documents to vector store

Testing similarity search...

Query: 'I'd like to eat some fruit'
  Result 1: Apples and oranges are delicious fruits
  Result 2: Vector databases store embeddings for similarity search

Query: 'What can I use to travel long distances?'
  Result 1: Trains are efficient for long-distance travel
  Result 2: Cars and airplanes are modes of transportation

Query: 'Tell me about database technology'
  Result 1: YugabyteDB is a distributed SQL database
  Result 2: Vector databases store embeddings for similarity search

Setting up RAG chain...
RAG chain created successfully!

Testing RAG chain...

Question: What are some examples of fruits?
YugaAI: Based on the context, apples and oranges are mentioned as examples of delicious fruits.

Question: What transportation methods were mentioned?
YugaAI: The context mentions cars, airplanes, and trains as modes of transportation.

Question: What is YugabyteDB?
YugaAI: YugabyteDB is a distributed SQL database.

Question: How do vector databases work?
YugaAI: Vector databases store embeddings for similarity search, which allows for finding similar content based on vector representations.

============================================================
Complete example finished successfully!
You now have a working RAG application with YugabyteDB!

Understanding the Example

Now let's break down how this complete example works:

Connection Setup

The script starts by setting up the connection to YugabyteDB:

# Connection parameters
YUGABYTEDB_USER = "yugabyte"
YUGABYTEDB_PASSWORD = ""
YUGABYTEDB_HOST = "localhost"
YUGABYTEDB_PORT = "5433"
YUGABYTEDB_DB = "yugabyte"

# Create connection string - using psycopg instead of asyncpg for better vector support
CONNECTION_STRING = (
    f"postgresql+psycopg://{YUGABYTEDB_USER}:{YUGABYTEDB_PASSWORD}@{YUGABYTEDB_HOST}"
    f":{YUGABYTEDB_PORT}/{YUGABYTEDB_DB}"
)

Vector Store Initialization

The script initializes the vector store with OpenAI embeddings:

engine = YBEngine.from_connection_string(url=CONNECTION_STRING)
embeddings = OpenAIEmbeddings(api_key=api_key)
engine.init_vectorstore_table(table_name=TABLE_NAME, vector_size=VECTOR_SIZE)
vectorstore = YugabyteDBVectorStore.create_sync(
    engine=engine,
    table_name=TABLE_NAME,
    embedding_service=embeddings,
)

Basic Operations

The script demonstrates adding documents and performing similarity searches:

# Add documents
docs = [
    Document(page_content="Apples and oranges are delicious fruits"),
    Document(page_content="Cars and airplanes are modes of transportation"),
    # ... more documents
]
vectorstore.add_documents(docs)

# Perform similarity search
query = "I'd like to eat some fruit"
results = vectorstore.similarity_search(query)

RAG Chain Creation

The script creates a complete RAG (Retrieval-Augmented Generation) chain:

retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=api_key)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant..."),
    ("human", "Context: {context}\nQuestion: {question}")
])

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

Troubleshooting

Common Issues

  1. Vector extension not working: Make sure you ran all the verification commands
  2. Connection issues: Verify YugabyteDB is running and accessible on localhost:5433
  3. API key issues: Ensure your OpenAI API key is set correctly
  4. Driver compatibility: Use psycopg instead of asyncpg for better vector support

Verification Steps

Check if everything is working:

# Check if YugabyteDB is running
docker ps | grep yugabyte

# Check if vector extension is installed
docker exec -it yugabyte_node01 bin/ysqlsh -h yugabyte01 -c "SELECT * FROM pg_extension WHERE extname = 'vector';"

# Test vector functionality
docker exec -it yugabyte_node01 bin/ysqlsh -h yugabyte01 -c "SELECT '[1,2,3]'::vector;"

# Test Python imports
python -c "from langchain_yugabytedb import YBEngine, YugabyteDBVectorStore; print('Imports successful')"

Detailed API Reference

Now let's explore the individual components in detail:

Manage vector store

Add items to the vector store

from langchain_core.documents import Document

docs = [
    Document(page_content="Apples and oranges"),
    Document(page_content="Cars and airplanes"),
    Document(page_content="Train"),
]

yugabyteDBVectorStore.add_documents(docs)

Delete items from the vector store

yugabyteDBVectorStore.delete(ids=["275823d2-1a47-440d-904b-c07b132fd72b"])

Update items in the vector store

Note that the Update operation is not currently supported by YugabyteDBVectorStore.

Query the vector store

Once your vector store has been created and the relevant documents have been added, you will likely wish to query it during the running of your chain or agent.

Query directly

Perform a basic similarity search as follows:

query = "I'd like a fruit."
docs = yugabyteDBVectorStore.similarity_search(query)
print(docs)

To execute a similarity search and receive the corresponding scores, run the following:

query = "I'd like a fruit."
docs = yugabyteDBVectorStore.similarity_search(query, k=1)
print(docs)

Query by turning into retriever

You can also transform the vector store into a retriever for easier use in your chains.

retriever = yugabyteDBVectorStore.as_retriever(search_kwargs={"k": 1})
retriever.invoke("I'd like a fruit.")

ChatMessageHistory

The chat message history abstraction helps to persist chat message history in a YugabyteDB table.YugabyteDBChatMessageHistory is parameterized using a table_name and a session_id:

  • table_name is the name of the table in the database where the chat messages will be stored.
  • session_id is a unique identifier for the chat session. It can be assigned by the caller using uuid.uuid4().
import uuid

from langchain_core.messages import SystemMessage, AIMessage, HumanMessage
from langchain_yugabytedb import YugabyteDBChatMessageHistory
import psycopg

# Establish a synchronous connection to the database
# (or use psycopg.AsyncConnection for async)
conn_info = "dbname=yugabyte user=yugabyte host=localhost port=5433"
sync_connection = psycopg.connect(conn_info)

# Create the table schema (only needs to be done once)
table_name = "chat_history"
YugabyteDBChatMessageHistory.create_tables(sync_connection, table_name)

session_id = str(uuid.uuid4())

# Initialize the chat history manager
chat_history = YugabyteDBChatMessageHistory(
    table_name, session_id, sync_connection=sync_connection
)

# Add messages to the chat history
chat_history.add_messages(
    [
        SystemMessage(content="Meow"),
        AIMessage(content="woof"),
        HumanMessage(content="bark"),
    ]
)

print(chat_history.messages)

RAG example

One of the primary advantages of vector stores is they provide contextual data to LLMs. LLMs often are trained with stale data and might not have the relevant domain specific knowledge, resulting in halucinations in LLM responses.

Take the following example:

import getpass
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage

my_api_key = getpass.getpass("Enter your API Key: ")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, api_key=my_api_key)
# Start with a system message to set the persona/behavior of the AI
messages = [
    SystemMessage(
        content="You are a helpful and friendly assistant named 'YugaAI'. You love to answer questions about YugabyteDB and distributed sql."
    ),
    # First human turn
    HumanMessage(content="Hi YugaAI! Where's the headquarters of YugabyteDB?"),
]

print("--- First Interaction ---")
print(f"Human: {messages[1].content}")  # Print the human message
response1 = llm.invoke(messages)
print(f"YugaAI: {response1.content}")

print("\n--- Second Interaction ---")
print(f"Human: {messages[2].content}")  # Print the new human message
response2 = llm.invoke(messages)  # Send the *entire* message history
print(f"YugaAI: {response2.content}")

# Add the second AI response to the history
messages.append(AIMessage(content=response2.content))

# --- 5. Another Turn with a different topic ---
messages.append(
    HumanMessage(
        content="Can you tell me the current preview release version of YugabyteDB?"
    )
)

print("\n--- Third Interaction ---")
print(f"Human: {messages[4].content}")  # Print the new human message
response3 = llm.invoke(messages)  # Send the *entire* message history
print(f"YugaAI: {response3.content}")

The current preview release of YugabyteDB is v2.25.2.0, however the LLM is providing stale information that is 2-3 years old. This is where the vector stores complement the LLMs by providing a way to store and retrive relevant information.

Construct a RAG to provide contextual information

You can provide the relevant information to the LLM by providing the YugabyteDB documentation. First read the YugabyteDB docs and add data into the YugabyteDB vectorstore by loading, splitting, and chuncking data from an HTML source. Then store the vector embeddings generated by OpenAI embeddings into the YugabyteDB vectorstore.

Step 1: Generate Embeddings

import getpass
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter
from langchain_yugabytedb import YBEngine, YugabyteDBVectorStore
from langchain_openai import OpenAIEmbeddings

my_api_key = getpass.getpass("Enter your API Key: ")
url = "https://docs.yugabyte.com/preview/releases/ybdb-releases/v2.25/"

loader = WebBaseLoader(url)

documents = loader.load()

print(f"Number of documents loaded: {len(documents)}")

# For very large HTML files, you'll want to split the text into smaller
# chunks before sending them to an LLM, as LLMs have token limits.
for i, doc in enumerate(documents):
    text_splitter = CharacterTextSplitter(
        separator="\n\n",  # Split by double newline (common paragraph separator)
        chunk_size=1000,  # Each chunk will aim for 1000 characters
        chunk_overlap=200,  # Allow 200 characters overlap between chunks
        length_function=len,
        is_separator_regex=False,
    )

    # Apply the splitter to the loaded documents
    chunks = text_splitter.split_documents(documents)

    print(f"\n--- After Splitting ({len(chunks)} chunks) ---")

    CONNECTION_STRING = "postgresql+psycopg://yugabyte:@localhost:5433/yugabyte"
    TABLE_NAME = "yb_relnotes_chunks"
    VECTOR_SIZE = 1536
    engine = YBEngine.from_connection_string(url=CONNECTION_STRING)
    engine.init_vectorstore_table(
        table_name=TABLE_NAME,
        vector_size=VECTOR_SIZE,
    )
    embeddings = OpenAIEmbeddings(api_key=my_api_key)

    # The PGVector.from_documents method handles:
    # 1. Creating the table if it doesn't exist (with 'embedding' column).
    # 2. Generating embeddings for each chunk using the provided embeddings model.
    # 3. Inserting the chunk text, metadata, and embeddings into the table.
    vectorstore = YugabyteDBVectorStore.from_documents(
        engine=engine, table_name=TABLE_NAME, documents=chunks, embedding=embeddings
    )

    print(f"Successfully stored {len(chunks)} chunks in PostgreSQL table: {TABLE_NAME}")

Step 2: Configure the YugabyteDB retriever

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
print(
    f"Retriever created, set to retrieve top {retriever.search_kwargs['k']} documents."
)

# Initialize the Chat Model (e.g., OpenAI's GPT-3.5 Turbo)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=my_api_key)

# Define the RAG prompt template
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful and friendly assistant named 'YugaAI'. You love to answer questions about YugabyteDB and distributed sql.",
        ),
        ("human", "Context: {context}\nQuestion: {question}"),
    ]
)
# Build the RAG chain
# 1. Take the input question.
# 2. Pass it to the retriever to get relevant documents.
# 3. Format the documents into a string for the context.
# 4. Pass the context and question to the prompt template.
# 5. Send the prompt to the LLM.
# 6. Parse the LLM's string output.
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

Now try asking the same question "Can you tell me the current preview release version of YugabyteDB?" again.

# Invoke the RAG chain with a question
rag_query = "Can you tell me the current preview release version of YugabyteDB?"
print(f"\nQuerying RAG chain: '{rag_query}'")
rag_response = rag_chain.invoke(rag_query)
print("\n--- RAG Chain Response ---")
print(rag_response)
Querying RAG chain: 'Can you tell me the current preview release version of YugabyteDB?'
--- RAG Chain Response ---
The current preview release version of YugabyteDB is v2.25.2.0.

Learn more