cached embadding code
Tue Feb 25 2025 05:43:41 GMT+0000 (Coordinated Universal Time)
Saved by @piyushkumar121 #pytho
import pandas as pd import numpy as np from typing import List, Dict, Tuple, Optional, Union, Any from sklearn.metrics.pairwise import cosine_similarity import logging import os import time import json from functools import lru_cache import hashlib # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class TextEmbedder: def __init__(self, api_key: str, cache_dir: Optional[str] = None, batch_size: int = 10, max_retries: int = 3, retry_delay: int = 2): self.api_key = 'AIzaSyCoNC4SCFhrO8QvD34a9KMqyNQ-mudMtQ4' self.model = "models/text-embedding-004" self.cache_dir = cache_dir self.embedding_cache = {} self.batch_size = batch_size self.max_retries = max_retries self.retry_delay = retry_delay self._genai = None if cache_dir: os.makedirs(cache_dir, exist_ok=True) self._load_cache() def _get_genai(self): if self._genai is None: import google.generativeai as genai genai.configure(api_key=self.api_key) self._genai = genai return self._genai def _load_cache(self): if not self.cache_dir: return cache_file = os.path.join(self.cache_dir, "embedding_cache.json") if os.path.exists(cache_file): try: with open(cache_file, 'r') as f: self.embedding_cache = json.load(f) logger.info(f"Loaded {len(self.embedding_cache)} cached embeddings") except Exception as e: logger.error(f"Error loading cache: {str(e)}") def _save_cache(self): if not self.cache_dir: return cache_file = os.path.join(self.cache_dir, "embedding_cache.json") try: cache_subset = dict(list(self.embedding_cache.items())[-10000:]) with open(cache_file, 'w') as f: json.dump(cache_subset, f) logger.info(f"Saved {len(cache_subset)} embeddings to cache") except Exception as e: logger.error(f"Error saving cache: {str(e)}") def _hash_text(self, text: str) -> str: return hashlib.md5(text.encode('utf-8')).hexdigest() def _combine_text_features(self, row: Union[pd.Series, Dict], text_columns: List[str]) -> str: text_values = [] if isinstance(row, pd.Series): for col in text_columns: if col in row.index and pd.notna(row[col]): text_values.append(f"{col}: {str(row[col])}") else: for col in text_columns: if col in row and row[col] is not None: text_values.append(f"{col}: {str(row[col])}") return " | ".join(text_values) def get_brand_text_features(self, brand: Union[pd.Series, Dict]) -> str: if isinstance(brand, pd.Series): brand_dict = brand.to_dict() else: brand_dict = brand text_columns = [ 'industry', 'target_audience', 'brand_messaging', 'tone_voice', 'category_alignment', 'brand_alignment_keywords', 'content_type' ] text = self._combine_text_features(brand_dict, text_columns) return text def get_influencer_text_features(self, influencer: Union[pd.Series, Dict]) -> str: if isinstance(influencer, pd.Series): influencer_dict = influencer.to_dict() else: influencer_dict = influencer text_columns = [ 'category_niche', 'audience_demographics', 'audience_interests', 'content_types' ] text = self._combine_text_features(influencer_dict, text_columns) return text def get_embedding(self, text: str) -> np.ndarray: if not text or text.strip() == "": return np.zeros(1024) text_hash = self._hash_text(text) if text_hash in self.embedding_cache: return np.array(self.embedding_cache[text_hash]) for attempt in range(self.max_retries): try: genai = self._get_genai() result = genai.embed_content( model=self.model, content=text ) embedding = np.array(result['embedding']) self.embedding_cache[text_hash] = embedding.tolist() if len(self.embedding_cache) % 100 == 0: self._save_cache() return embedding except Exception as e: logger.error(f"Error getting embedding (attempt {attempt+1}/{self.max_retries}): {str(e)}") if attempt < self.max_retries - 1: time.sleep(self.retry_delay) logger.error(f"All embedding attempts failed for text: {text[:100]}...") return np.zeros(1024) def batch_get_embeddings(self, texts: List[str]) -> List[np.ndarray]: results = [] for i in range(0, len(texts), self.batch_size): batch = texts[i:i+self.batch_size] batch_results = [] for text in batch: embedding = self.get_embedding(text) batch_results.append(embedding) results.extend(batch_results) if i + self.batch_size < len(texts): time.sleep(0.5) return results def calculate_text_similarity(self, brand_text: str, influencer_text: str) -> float: brand_embedding = self.get_embedding(brand_text) influencer_embedding = self.get_embedding(influencer_text) similarity = cosine_similarity( brand_embedding.reshape(1, -1), influencer_embedding.reshape(1, -1) )[0][0] return float(similarity) def calculate_batch_similarities(self, brand_texts: List[str], influencer_texts: List[str]) -> np.ndarray: brand_embeddings = self.batch_get_embeddings(brand_texts) influencer_embeddings = self.batch_get_embeddings(influencer_texts) brand_matrix = np.vstack(brand_embeddings) influencer_matrix = np.vstack(influencer_embeddings) similarity_matrix = cosine_similarity(brand_matrix, influencer_matrix) return similarity_matrix def print_detailed_match_analysis(self, brand: Union[pd.Series, Dict], influencer: Union[pd.Series, Dict], similarity_score: float): logger.info("=" * 80) brand_id = brand.name if isinstance(brand, pd.Series) else brand.get('brand_id', 'Unknown') brand_name = brand.get('name', 'Unknown Brand') if isinstance(brand, pd.Series) else brand.get('name', 'Unknown Brand') influencer_id = influencer.name if isinstance(influencer, pd.Series) else influencer.get('influencer_id', 'Unknown') influencer_name = influencer.get('name', 'Unknown Influencer') if isinstance(influencer, pd.Series) else influencer.get('name', 'Unknown Influencer') logger.info("Brand Details:") logger.info(f" ID: {brand_id}") logger.info(f" Name: {brand_name}") logger.info("\nInfluencer Details:") logger.info(f" ID: {influencer_id}") logger.info(f" Name: {influencer_name}") logger.info("-" * 80) logger.info("\nBrand Text Features:") brand_text = self.get_brand_text_features(brand) for feature in brand_text.split(" | "): logger.info(f" - {feature}") logger.info("\nInfluencer Text Features:") influencer_text = self.get_influencer_text_features(influencer) for feature in influencer_text.split(" | "): logger.info(f" - {feature}") logger.info("\nText Similarity Analysis:") logger.info(f" Score: {similarity_score:.4f}") logger.info("\nScore Interpretation:") if similarity_score >= 0.8: logger.info(" Excellent Match (≥0.8):") logger.info(" - Very strong text similarity") logger.info(" - High potential for successful collaboration") logger.info(" - Strong alignment in multiple areas") elif similarity_score >= 0.6: logger.info(" Good Match (≥0.6):") logger.info(" - Significant text similarity") logger.info(" - Good potential for collaboration") logger.info(" - Notable alignment in key areas") elif similarity_score >= 0.4: logger.info(" Moderate Match (≥0.4):") logger.info(" - Some text similarity") logger.info(" - Potential for collaboration with careful consideration") logger.info(" - Partial alignment in some areas") else: logger.info(" Weak Match (<0.4):") logger.info(" - Limited text similarity") logger.info(" - May need to reconsider match") logger.info(" - Limited alignment in key areas") logger.info("=" * 80) def get_text_similarity_matrix(self, brands_df: pd.DataFrame, influencers_df: pd.DataFrame, batch_size: int = 10) -> np.ndarray: start_time = time.time() logger.info(f"Calculating text similarity matrix for {len(brands_df)} brands and {len(influencers_df)} influencers") if self.cache_dir: cache_path = os.path.join(self.cache_dir, f"text_similarity_{len(brands_df)}_{len(influencers_df)}.npz") if os.path.exists(cache_path): logger.info(f"Loading text similarity matrix from cache: {cache_path}") data = np.load(cache_path) matrix = data['matrix'] logger.info(f"Loaded text similarity matrix in {time.time() - start_time:.2f} seconds") return matrix similarity_matrix = np.zeros((len(brands_df), len(influencers_df))) for i in range(0, len(brands_df), batch_size): brand_chunk = brands_df.iloc[i:i+batch_size] brand_texts = [self.get_brand_text_features(brand) for _, brand in brand_chunk.iterrows()] for j in range(0, len(influencers_df), batch_size): influencer_chunk = influencers_df.iloc[j:j+batch_size] influencer_texts = [self.get_influencer_text_features(influencer) for _, influencer in influencer_chunk.iterrows()] batch_similarities = self.calculate_batch_similarities(brand_texts, influencer_texts) for bi, (brand_idx, _) in enumerate(brand_chunk.iterrows()): for ii, (influencer_idx, _) in enumerate(influencer_chunk.iterrows()): global_brand_idx = brands_df.index.get_loc(brand_idx) global_influencer_idx = influencers_df.index.get_loc(influencer_idx) similarity_matrix[global_brand_idx, global_influencer_idx] = batch_similarities[bi, ii] logger.info(f"Processed batch: brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}") if self.cache_dir: logger.info(f"Saving text similarity matrix to cache: {cache_path}") np.savez_compressed(cache_path, matrix=similarity_matrix) logger.info(f"Text similarity matrix calculation completed in {time.time() - start_time:.2f} seconds") return similarity_matrix def save_similarity_scores(self, brands_df: pd.DataFrame, influencers_df: pd.DataFrame, output_path: str): logger.info(f"Calculating and saving similarity scores to {output_path}") start_time = time.time() all_scores = [] batch_size = 5 for i in range(0, len(brands_df), batch_size): brand_chunk = brands_df.iloc[i:i+batch_size] for j in range(0, len(influencers_df), batch_size): influencer_chunk = influencers_df.iloc[j:j+batch_size] # Calculate batch scores for _, brand in brand_chunk.iterrows(): brand_text = self.get_brand_text_features(brand) for _, influencer in influencer_chunk.iterrows(): influencer_text = self.get_influencer_text_features(influencer) similarity = self.calculate_text_similarity(brand_text, influencer_text) all_scores.append({ 'brand_id': brand.name, 'brand_name': brand.get('name', 'Unknown Brand'), 'influencer_id': influencer.name, 'influencer_name': influencer.get('name', 'Unknown Influencer'), 'similarity_score': similarity, 'brand_text': brand_text, 'influencer_text': influencer_text }) logger.info(f"Processed scores for brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}") scores_df = pd.DataFrame(all_scores) scores_df = scores_df.sort_values('similarity_score', ascending=False) os.makedirs(os.path.dirname(output_path), exist_ok=True) scores_df.to_csv(output_path, index=False) logger.info(f"Saved {len(scores_df)} similarity scores to {output_path} in {time.time() - start_time:.2f} seconds")
Comments