Migrating Data between Pinecone Indexes: A High-Performance Approach
Pinecone is a powerful vector database that allows you to efficiently store and retrieve high-dimensional vectors. We use Pinecone at host of things at Archie AI.
We recently migrate Pinecone index to GCP marketplace offering. In this blog post, I will share out approach and code. We’ll walk through the code step by step and highlight the performance optimizations employed to ensure a smooth and efficient migration process.
Step 1: Imports and Setup
! pip install langchain-pinecone
import numpy as np
from pinecone import Pinecone
from concurrent.futures import ThreadPoolExecutor, as_completed
# Source Pinecone setup
source_pinecone_environment = "us-east4-gcp"
source_pinecone_index_name = "source-index"
# Target Pinecone setup
target_pinecone_environment = "us-east4-gcp"
target_pinecone_index_name = "target-index"
# Initialize source and target Pinecone clients
source_pc = Pinecone(api_key="YOUR_SOURCE_API_KEY")
target_pc = Pinecone(api_key="YOUR_TARGET_API_KEY")
# Initialize source and target Pinecone indexes
source_index = source_pc.Index(source_pinecone_index_name)
target_index = target_pc.Index(target_pinecone_index_name)
# Set the number of dimensions for your vectors
num_dimensions = 1536
In this step, we import the necessary libraries and set up the source and target Pinecone environments, indexes, and clients. Make sure to replace "YOUR_SOURCE_API_KEY"
and "YOUR_TARGET_API_KEY"
with your actual API keys.
Step 2: Define Helper Functions
def get_namespace_names(index):
"""Fetches all namespace names from the given Pinecone index."""
response = index.describe_index_stats()
return list(response['namespaces'].keys())
def get_ids_from_namespace(index, namespace, num_dimensions, batch_size=10000):
"""Fetches all vector IDs from the given namespace in the Pinecone index."""
stats = index.describe_index_stats()
num_vectors = stats['namespaces'].get(namespace, {}).get('vector_count', 0)
all_ids = set()
while len(all_ids) < num_vectors:
input_vector = np.random.rand(num_dimensions).tolist()
results = index.query(
vector=input_vector,
top_k=batch_size,
namespace=namespace,
include_values=False,
filter={
"id": {"$nin": list(all_ids)} # Exclude already fetched IDs
}
)
new_ids = {result['id'] for result in results['matches']}
all_ids.update(new_ids)
print(f"Collected {len(all_ids)} ids out of {num_vectors} in namespace '{namespace}'.")
if len(new_ids) < batch_size:
# If we received fewer results than requested, we've likely fetched all vectors
break
return all_ids
These helper functions are used to fetch namespace names and vector IDs from a Pinecone index. The get_ids_from_namespace
function performs efficient retrieval of all vector IDs by querying the index with random vectors and collecting the matching IDs.
Note: This method doesn’t work great if namespace has >10K vectors. In this case, I had to re-index from scratch.
Step 3: Step 3: Define Migration Function
def migrate_namespace(source_index, target_index, namespace, num_dimensions, batch_size=200):
"""Migrates vectors and metadata from a single namespace in the source index to the target index."""
print(f'Starting migration for namespace: {namespace}')
all_ids = get_ids_from_namespace(source_index, namespace, num_dimensions)
total_vectors = len(all_ids)
migrated_vectors = 0
for i in range(0, total_vectors, batch_size):
batch_ids = list(all_ids)[i:i + batch_size]
vectors_data = source_index.fetch(ids=batch_ids, namespace=namespace).get('vectors', {})
vectors_to_upsert = []
for vector_id, vector_info in vectors_data.items():
vectors_to_upsert.append((vector_id, vector_info['values'], vector_info.get('metadata', {})))
if vectors_to_upsert:
batch_count = len(vectors_to_upsert)
migrated_vectors += batch_count
percentage_complete = (migrated_vectors / total_vectors) * 100
print(f'Namespace {namespace}: Upserting batch of {batch_count} vectors ({migrated_vectors}/{total_vectors}, {percentage_complete:.2f}%)')
response = target_index.upsert(vectors=vectors_to_upsert, namespace=namespace)
else:
print(f"No vectors found for current batch in namespace {namespace}.")
print(f"Migration completed for namespace: {namespace}")
return namespace, migrated_vectors
The migrate_namespace
function handles the migration of vectors and metadata from a single namespace in the source index to the target index. It fetches the vector IDs from the namespace, retrieves the vector data and associated metadata in batches, and upserts them into the target index.
It’s important to note that along with the vector values, we also migrate the metadata associated with each vector. This ensures that any additional contextual information stored with the vectors is preserved during the migration process.
Batching is used to optimize performance by reducing the number of API calls.
Step 4: Define Parallel Migration Function
def parallel_migrate_namespaces(source_index, target_index, num_dimensions, max_workers=5):
"""Migrates all namespaces from source to target index in parallel."""
namespaces = get_namespace_names(source_index)
print(f"Found {len(namespaces)} namespaces to migrate.")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_namespace = {executor.submit(migrate_namespace, source_index, target_index, namespace, num_dimensions): namespace for namespace in namespaces}
for future in as_completed(future_to_namespace):
namespace = future_to_namespace[future]
try:
namespace, migrated_vectors = future.result()
print(f"Completed migration for namespace {namespace}. Migrated {migrated_vectors} vectors.")
except Exception as exc:
print(f"Migration for namespace {namespace} generated an exception: {exc}")
To further optimize performance, the parallel_migrate_namespaces
function migrates all namespaces from the source index to the target index in parallel. It uses a ThreadPoolExecutor
to submit migration tasks for each namespace concurrently. This approach allows for efficient utilization of system resources and reduces the overall migration time.
Step 5: Execute Migration
parallel_migrate_namespaces(source_index, target_index, num_dimensions)
print("Full migration completed.")
To start the migration process, simply call the parallel_migrate_namespaces
function with the appropriate arguments. This will initiate the parallel migration of all namespaces, including both vectors and metadata, from the source index to the target index.
Step 6: Verify Migration
source_namespaces = get_namespace_names(source_index)
target_namespaces = get_namespace_names(target_index)
print(f"Source index has {len(source_namespaces)} namespaces")
print(f"Target index has {len(target_namespaces)} namespaces")
# Check if all source namespaces are in the target
missing_namespaces = set(source_namespaces) - set(target_namespaces)
if missing_namespaces:
print(f"Warning: The following namespaces are missing in the target index: {missing_namespaces}")
else:
print("All source namespaces are present in the target index.")
# Compare vector counts
for namespace in source_namespaces:
source_count = source_index.describe_index_stats()['namespaces'][namespace]['vector_count']
target_count = target_index.describe_index_stats()['namespaces'].get(namespace, {}).get('vector_count', 0)
print(f"Namespace: {namespace}")
print(f" Source vector count: {source_count}")
print(f" Target vector count: {target_count}")
if source_count != target_count:
print(f" Warning: Vector counts do not match for namespace {namespace}")
print()
After the migration is complete, it’s crucial to verify the results. This step compares the namespaces and vector counts between the source and target indexes to ensure data integrity. It checks for missing namespaces and reports any discrepancies in vector counts.
Additional Guidance
If you find that some vectors were missed during verification, you can run an additional step to correct this. The following code will identify and migrate any missing vectors from the source index to the target index:
Conclusion
In this blog post, we shared our approach to migrating data between Pinecone indexes using Python, specifically in the context of moving our index to the GCP Marketplace offering at Archie AI. By leveraging parallel processing and optimized data retrieval techniques, we were able to efficiently migrate large volumes of vectors and their associated metadata.
The provided code serves as a starting point and can be adapted to suit specific migration requirements.
Reminder, to use this script:
Ensure that the API keys, environment names, and index names are correct for both source and target.
- Set the correct
num_dimensions
for your vectors (currently set to 1536). - You can adjust the
max_workers
parameter in theparallel_migrate_namespaces
function call to control the level of parallelism. The default is set to 5, but you can increase or decrease this based on your system's capabilities and the rate limits of your Pinecone account.