import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional, Union, Any
from sklearn.metrics.pairwise import cosine_similarity
import logging
import os
import time
import json
from functools import lru_cache
import hashlib
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class TextEmbedder:
def __init__(self, api_key: str, cache_dir: Optional[str] = None,
batch_size: int = 10, max_retries: int = 3,
retry_delay: int = 2):
self.api_key = 'AIzaSyCoNC4SCFhrO8QvD34a9KMqyNQ-mudMtQ4'
self.model = "models/text-embedding-004"
self.cache_dir = cache_dir
self.embedding_cache = {}
self.batch_size = batch_size
self.max_retries = max_retries
self.retry_delay = retry_delay
self._genai = None
if cache_dir:
os.makedirs(cache_dir, exist_ok=True)
self._load_cache()
def _get_genai(self):
if self._genai is None:
import google.generativeai as genai
genai.configure(api_key=self.api_key)
self._genai = genai
return self._genai
def _load_cache(self):
if not self.cache_dir:
return
cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
self.embedding_cache = json.load(f)
logger.info(f"Loaded {len(self.embedding_cache)} cached embeddings")
except Exception as e:
logger.error(f"Error loading cache: {str(e)}")
def _save_cache(self):
if not self.cache_dir:
return
cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
try:
cache_subset = dict(list(self.embedding_cache.items())[-10000:])
with open(cache_file, 'w') as f:
json.dump(cache_subset, f)
logger.info(f"Saved {len(cache_subset)} embeddings to cache")
except Exception as e:
logger.error(f"Error saving cache: {str(e)}")
def _hash_text(self, text: str) -> str:
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _combine_text_features(self, row: Union[pd.Series, Dict], text_columns: List[str]) -> str:
text_values = []
if isinstance(row, pd.Series):
for col in text_columns:
if col in row.index and pd.notna(row[col]):
text_values.append(f"{col}: {str(row[col])}")
else:
for col in text_columns:
if col in row and row[col] is not None:
text_values.append(f"{col}: {str(row[col])}")
return " | ".join(text_values)
def get_brand_text_features(self, brand: Union[pd.Series, Dict]) -> str:
if isinstance(brand, pd.Series):
brand_dict = brand.to_dict()
else:
brand_dict = brand
text_columns = [
'industry',
'target_audience',
'brand_messaging',
'tone_voice',
'category_alignment',
'brand_alignment_keywords',
'content_type'
]
text = self._combine_text_features(brand_dict, text_columns)
return text
def get_influencer_text_features(self, influencer: Union[pd.Series, Dict]) -> str:
if isinstance(influencer, pd.Series):
influencer_dict = influencer.to_dict()
else:
influencer_dict = influencer
text_columns = [
'category_niche',
'audience_demographics',
'audience_interests',
'content_types'
]
text = self._combine_text_features(influencer_dict, text_columns)
return text
def get_embedding(self, text: str) -> np.ndarray:
if not text or text.strip() == "":
return np.zeros(1024)
text_hash = self._hash_text(text)
if text_hash in self.embedding_cache:
return np.array(self.embedding_cache[text_hash])
for attempt in range(self.max_retries):
try:
genai = self._get_genai()
result = genai.embed_content(
model=self.model,
content=text
)
embedding = np.array(result['embedding'])
self.embedding_cache[text_hash] = embedding.tolist()
if len(self.embedding_cache) % 100 == 0:
self._save_cache()
return embedding
except Exception as e:
logger.error(f"Error getting embedding (attempt {attempt+1}/{self.max_retries}): {str(e)}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
logger.error(f"All embedding attempts failed for text: {text[:100]}...")
return np.zeros(1024)
def batch_get_embeddings(self, texts: List[str]) -> List[np.ndarray]:
results = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i+self.batch_size]
batch_results = []
for text in batch:
embedding = self.get_embedding(text)
batch_results.append(embedding)
results.extend(batch_results)
if i + self.batch_size < len(texts):
time.sleep(0.5)
return results
def calculate_text_similarity(self, brand_text: str, influencer_text: str) -> float:
brand_embedding = self.get_embedding(brand_text)
influencer_embedding = self.get_embedding(influencer_text)
similarity = cosine_similarity(
brand_embedding.reshape(1, -1),
influencer_embedding.reshape(1, -1)
)[0][0]
return float(similarity)
def calculate_batch_similarities(self, brand_texts: List[str],
influencer_texts: List[str]) -> np.ndarray:
brand_embeddings = self.batch_get_embeddings(brand_texts)
influencer_embeddings = self.batch_get_embeddings(influencer_texts)
brand_matrix = np.vstack(brand_embeddings)
influencer_matrix = np.vstack(influencer_embeddings)
similarity_matrix = cosine_similarity(brand_matrix, influencer_matrix)
return similarity_matrix
def print_detailed_match_analysis(self, brand: Union[pd.Series, Dict],
influencer: Union[pd.Series, Dict],
similarity_score: float):
logger.info("=" * 80)
brand_id = brand.name if isinstance(brand, pd.Series) else brand.get('brand_id', 'Unknown')
brand_name = brand.get('name', 'Unknown Brand') if isinstance(brand, pd.Series) else brand.get('name', 'Unknown Brand')
influencer_id = influencer.name if isinstance(influencer, pd.Series) else influencer.get('influencer_id', 'Unknown')
influencer_name = influencer.get('name', 'Unknown Influencer') if isinstance(influencer, pd.Series) else influencer.get('name', 'Unknown Influencer')
logger.info("Brand Details:")
logger.info(f" ID: {brand_id}")
logger.info(f" Name: {brand_name}")
logger.info("\nInfluencer Details:")
logger.info(f" ID: {influencer_id}")
logger.info(f" Name: {influencer_name}")
logger.info("-" * 80)
logger.info("\nBrand Text Features:")
brand_text = self.get_brand_text_features(brand)
for feature in brand_text.split(" | "):
logger.info(f" - {feature}")
logger.info("\nInfluencer Text Features:")
influencer_text = self.get_influencer_text_features(influencer)
for feature in influencer_text.split(" | "):
logger.info(f" - {feature}")
logger.info("\nText Similarity Analysis:")
logger.info(f" Score: {similarity_score:.4f}")
logger.info("\nScore Interpretation:")
if similarity_score >= 0.8:
logger.info(" Excellent Match (≥0.8):")
logger.info(" - Very strong text similarity")
logger.info(" - High potential for successful collaboration")
logger.info(" - Strong alignment in multiple areas")
elif similarity_score >= 0.6:
logger.info(" Good Match (≥0.6):")
logger.info(" - Significant text similarity")
logger.info(" - Good potential for collaboration")
logger.info(" - Notable alignment in key areas")
elif similarity_score >= 0.4:
logger.info(" Moderate Match (≥0.4):")
logger.info(" - Some text similarity")
logger.info(" - Potential for collaboration with careful consideration")
logger.info(" - Partial alignment in some areas")
else:
logger.info(" Weak Match (<0.4):")
logger.info(" - Limited text similarity")
logger.info(" - May need to reconsider match")
logger.info(" - Limited alignment in key areas")
logger.info("=" * 80)
def get_text_similarity_matrix(self, brands_df: pd.DataFrame,
influencers_df: pd.DataFrame,
batch_size: int = 10) -> np.ndarray:
start_time = time.time()
logger.info(f"Calculating text similarity matrix for {len(brands_df)} brands and {len(influencers_df)} influencers")
if self.cache_dir:
cache_path = os.path.join(self.cache_dir, f"text_similarity_{len(brands_df)}_{len(influencers_df)}.npz")
if os.path.exists(cache_path):
logger.info(f"Loading text similarity matrix from cache: {cache_path}")
data = np.load(cache_path)
matrix = data['matrix']
logger.info(f"Loaded text similarity matrix in {time.time() - start_time:.2f} seconds")
return matrix
similarity_matrix = np.zeros((len(brands_df), len(influencers_df)))
for i in range(0, len(brands_df), batch_size):
brand_chunk = brands_df.iloc[i:i+batch_size]
brand_texts = [self.get_brand_text_features(brand) for _, brand in brand_chunk.iterrows()]
for j in range(0, len(influencers_df), batch_size):
influencer_chunk = influencers_df.iloc[j:j+batch_size]
influencer_texts = [self.get_influencer_text_features(influencer) for _, influencer in influencer_chunk.iterrows()]
batch_similarities = self.calculate_batch_similarities(brand_texts, influencer_texts)
for bi, (brand_idx, _) in enumerate(brand_chunk.iterrows()):
for ii, (influencer_idx, _) in enumerate(influencer_chunk.iterrows()):
global_brand_idx = brands_df.index.get_loc(brand_idx)
global_influencer_idx = influencers_df.index.get_loc(influencer_idx)
similarity_matrix[global_brand_idx, global_influencer_idx] = batch_similarities[bi, ii]
logger.info(f"Processed batch: brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}")
if self.cache_dir:
logger.info(f"Saving text similarity matrix to cache: {cache_path}")
np.savez_compressed(cache_path, matrix=similarity_matrix)
logger.info(f"Text similarity matrix calculation completed in {time.time() - start_time:.2f} seconds")
return similarity_matrix
def save_similarity_scores(self, brands_df: pd.DataFrame,
influencers_df: pd.DataFrame,
output_path: str):
logger.info(f"Calculating and saving similarity scores to {output_path}")
start_time = time.time()
all_scores = []
batch_size = 5
for i in range(0, len(brands_df), batch_size):
brand_chunk = brands_df.iloc[i:i+batch_size]
for j in range(0, len(influencers_df), batch_size):
influencer_chunk = influencers_df.iloc[j:j+batch_size]
# Calculate batch scores
for _, brand in brand_chunk.iterrows():
brand_text = self.get_brand_text_features(brand)
for _, influencer in influencer_chunk.iterrows():
influencer_text = self.get_influencer_text_features(influencer)
similarity = self.calculate_text_similarity(brand_text, influencer_text)
all_scores.append({
'brand_id': brand.name,
'brand_name': brand.get('name', 'Unknown Brand'),
'influencer_id': influencer.name,
'influencer_name': influencer.get('name', 'Unknown Influencer'),
'similarity_score': similarity,
'brand_text': brand_text,
'influencer_text': influencer_text
})
logger.info(f"Processed scores for brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}")
scores_df = pd.DataFrame(all_scores)
scores_df = scores_df.sort_values('similarity_score', ascending=False)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
scores_df.to_csv(output_path, index=False)
logger.info(f"Saved {len(scores_df)} similarity scores to {output_path} in {time.time() - start_time:.2f} seconds")