cached embadding code

PHOTO EMBED

Tue Feb 25 2025 05:43:41 GMT+0000 (Coordinated Universal Time)

Saved by @piyushkumar121 #pytho

import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional, Union, Any
from sklearn.metrics.pairwise import cosine_similarity
import logging
import os
import time
import json
from functools import lru_cache
import hashlib

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class TextEmbedder:
    def __init__(self, api_key: str, cache_dir: Optional[str] = None, 
                batch_size: int = 10, max_retries: int = 3, 
                retry_delay: int = 2):

        self.api_key = 'AIzaSyCoNC4SCFhrO8QvD34a9KMqyNQ-mudMtQ4'
        self.model = "models/text-embedding-004"
        self.cache_dir = cache_dir
        self.embedding_cache = {}
        self.batch_size = batch_size
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        
      
        self._genai = None
        
      
        if cache_dir:
            os.makedirs(cache_dir, exist_ok=True)
            self._load_cache()
    
    def _get_genai(self):
       
        if self._genai is None:
            import google.generativeai as genai
            genai.configure(api_key=self.api_key)
            self._genai = genai
        return self._genai
    
    def _load_cache(self):
      
        if not self.cache_dir:
            return
            
        cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
        if os.path.exists(cache_file):
            try:
                with open(cache_file, 'r') as f:
                    self.embedding_cache = json.load(f)
                logger.info(f"Loaded {len(self.embedding_cache)} cached embeddings")
            except Exception as e:
                logger.error(f"Error loading cache: {str(e)}")
    
    def _save_cache(self):
       
        if not self.cache_dir:
            return
            
        cache_file = os.path.join(self.cache_dir, "embedding_cache.json")
        try:
          
            cache_subset = dict(list(self.embedding_cache.items())[-10000:])
            with open(cache_file, 'w') as f:
                json.dump(cache_subset, f)
            logger.info(f"Saved {len(cache_subset)} embeddings to cache")
        except Exception as e:
            logger.error(f"Error saving cache: {str(e)}")
    
    def _hash_text(self, text: str) -> str:
      
        return hashlib.md5(text.encode('utf-8')).hexdigest()
    
    def _combine_text_features(self, row: Union[pd.Series, Dict], text_columns: List[str]) -> str:
       
        text_values = []
        
      
        if isinstance(row, pd.Series):
            for col in text_columns:
                if col in row.index and pd.notna(row[col]):
                    text_values.append(f"{col}: {str(row[col])}")
        else:
            for col in text_columns:
                if col in row and row[col] is not None:
                    text_values.append(f"{col}: {str(row[col])}")
                    
        return " | ".join(text_values)
    
  
    def get_brand_text_features(self, brand: Union[pd.Series, Dict]) -> str:
      
        if isinstance(brand, pd.Series):
            brand_dict = brand.to_dict()
        else:
            brand_dict = brand
            
        text_columns = [
            'industry',
            'target_audience',
            'brand_messaging',
            'tone_voice',
            'category_alignment',
            'brand_alignment_keywords',
            'content_type'
        ]
        
        text = self._combine_text_features(brand_dict, text_columns)
        return text
    
  
    def get_influencer_text_features(self, influencer: Union[pd.Series, Dict]) -> str:
       
        if isinstance(influencer, pd.Series):
            influencer_dict = influencer.to_dict()
        else:
            influencer_dict = influencer
            
        text_columns = [
            'category_niche',
            'audience_demographics',
            'audience_interests',
            'content_types'
        ]
        
        text = self._combine_text_features(influencer_dict, text_columns)
        return text
    
    def get_embedding(self, text: str) -> np.ndarray:
       
        if not text or text.strip() == "":
            return np.zeros(1024)
            
      
        text_hash = self._hash_text(text)
        if text_hash in self.embedding_cache:
            return np.array(self.embedding_cache[text_hash])
        
       
        for attempt in range(self.max_retries):
            try:
                genai = self._get_genai()
                result = genai.embed_content(
                    model=self.model,
                    content=text
                )
                
                embedding = np.array(result['embedding'])
                
           
                self.embedding_cache[text_hash] = embedding.tolist()
          
                if len(self.embedding_cache) % 100 == 0:
                    self._save_cache()
                    
                return embedding
            except Exception as e:
                logger.error(f"Error getting embedding (attempt {attempt+1}/{self.max_retries}): {str(e)}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
       
        logger.error(f"All embedding attempts failed for text: {text[:100]}...")
        return np.zeros(1024)
    
    def batch_get_embeddings(self, texts: List[str]) -> List[np.ndarray]:
      
        results = []
        
       
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i+self.batch_size]
            
         
            batch_results = []
            for text in batch:
                embedding = self.get_embedding(text)
                batch_results.append(embedding)
            
            results.extend(batch_results)
            
           
            if i + self.batch_size < len(texts):
                time.sleep(0.5) 
        
        return results
    
    def calculate_text_similarity(self, brand_text: str, influencer_text: str) -> float:
        
        brand_embedding = self.get_embedding(brand_text)
        influencer_embedding = self.get_embedding(influencer_text)
        
      
        similarity = cosine_similarity(
            brand_embedding.reshape(1, -1),
            influencer_embedding.reshape(1, -1)
        )[0][0]
        
        return float(similarity)
    
    def calculate_batch_similarities(self, brand_texts: List[str], 
                                  influencer_texts: List[str]) -> np.ndarray:
       
        brand_embeddings = self.batch_get_embeddings(brand_texts)
        influencer_embeddings = self.batch_get_embeddings(influencer_texts)
        
     
        brand_matrix = np.vstack(brand_embeddings)
        influencer_matrix = np.vstack(influencer_embeddings)
        
      
        similarity_matrix = cosine_similarity(brand_matrix, influencer_matrix)
        
        return similarity_matrix
    
    def print_detailed_match_analysis(self, brand: Union[pd.Series, Dict], 
                                   influencer: Union[pd.Series, Dict], 
                                   similarity_score: float):
  
        logger.info("=" * 80)
        
       
        brand_id = brand.name if isinstance(brand, pd.Series) else brand.get('brand_id', 'Unknown')
        brand_name = brand.get('name', 'Unknown Brand') if isinstance(brand, pd.Series) else brand.get('name', 'Unknown Brand')
        
       
        influencer_id = influencer.name if isinstance(influencer, pd.Series) else influencer.get('influencer_id', 'Unknown')
        influencer_name = influencer.get('name', 'Unknown Influencer') if isinstance(influencer, pd.Series) else influencer.get('name', 'Unknown Influencer')
        
      
        logger.info("Brand Details:")
        logger.info(f"  ID: {brand_id}")
        logger.info(f"  Name: {brand_name}")
        
      
        logger.info("\nInfluencer Details:")
        logger.info(f"  ID: {influencer_id}")
        logger.info(f"  Name: {influencer_name}")
        logger.info("-" * 80)
        
      
        logger.info("\nBrand Text Features:")
        brand_text = self.get_brand_text_features(brand)
        for feature in brand_text.split(" | "):
            logger.info(f"  - {feature}")
            
        logger.info("\nInfluencer Text Features:")
        influencer_text = self.get_influencer_text_features(influencer)
        for feature in influencer_text.split(" | "):
            logger.info(f"  - {feature}")
        
       
        logger.info("\nText Similarity Analysis:")
        logger.info(f"  Score: {similarity_score:.4f}")
        
       
        logger.info("\nScore Interpretation:")
        if similarity_score >= 0.8:
            logger.info("  Excellent Match (≥0.8):")
            logger.info("  - Very strong text similarity")
            logger.info("  - High potential for successful collaboration")
            logger.info("  - Strong alignment in multiple areas")
        elif similarity_score >= 0.6:
            logger.info("  Good Match (≥0.6):")
            logger.info("  - Significant text similarity")
            logger.info("  - Good potential for collaboration")
            logger.info("  - Notable alignment in key areas")
        elif similarity_score >= 0.4:
            logger.info("  Moderate Match (≥0.4):")
            logger.info("  - Some text similarity")
            logger.info("  - Potential for collaboration with careful consideration")
            logger.info("  - Partial alignment in some areas")
        else:
            logger.info("  Weak Match (<0.4):")
            logger.info("  - Limited text similarity")
            logger.info("  - May need to reconsider match")
            logger.info("  - Limited alignment in key areas")
        
        logger.info("=" * 80)
    
    def get_text_similarity_matrix(self, brands_df: pd.DataFrame, 
                                influencers_df: pd.DataFrame, 
                                batch_size: int = 10) -> np.ndarray:

        start_time = time.time()
        logger.info(f"Calculating text similarity matrix for {len(brands_df)} brands and {len(influencers_df)} influencers")
        
       
        if self.cache_dir:
            cache_path = os.path.join(self.cache_dir, f"text_similarity_{len(brands_df)}_{len(influencers_df)}.npz")
            if os.path.exists(cache_path):
                logger.info(f"Loading text similarity matrix from cache: {cache_path}")
                data = np.load(cache_path)
                matrix = data['matrix']
                logger.info(f"Loaded text similarity matrix in {time.time() - start_time:.2f} seconds")
                return matrix
        
      
        similarity_matrix = np.zeros((len(brands_df), len(influencers_df)))
        
   
        for i in range(0, len(brands_df), batch_size):
            brand_chunk = brands_df.iloc[i:i+batch_size]
            brand_texts = [self.get_brand_text_features(brand) for _, brand in brand_chunk.iterrows()]
            
            for j in range(0, len(influencers_df), batch_size):
                influencer_chunk = influencers_df.iloc[j:j+batch_size]
                influencer_texts = [self.get_influencer_text_features(influencer) for _, influencer in influencer_chunk.iterrows()]
                
             
                batch_similarities = self.calculate_batch_similarities(brand_texts, influencer_texts)
                
              
                for bi, (brand_idx, _) in enumerate(brand_chunk.iterrows()):
                    for ii, (influencer_idx, _) in enumerate(influencer_chunk.iterrows()):
                        global_brand_idx = brands_df.index.get_loc(brand_idx)
                        global_influencer_idx = influencers_df.index.get_loc(influencer_idx)
                        similarity_matrix[global_brand_idx, global_influencer_idx] = batch_similarities[bi, ii]
                
                logger.info(f"Processed batch: brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}")
        
      
        if self.cache_dir:
            logger.info(f"Saving text similarity matrix to cache: {cache_path}")
            np.savez_compressed(cache_path, matrix=similarity_matrix)
        
        logger.info(f"Text similarity matrix calculation completed in {time.time() - start_time:.2f} seconds")
        return similarity_matrix
    
    def save_similarity_scores(self, brands_df: pd.DataFrame, 
                            influencers_df: pd.DataFrame,
                            output_path: str):
       
        logger.info(f"Calculating and saving similarity scores to {output_path}")
        start_time = time.time()
        
        all_scores = []
        batch_size = 5  
        
     
        for i in range(0, len(brands_df), batch_size):
            brand_chunk = brands_df.iloc[i:i+batch_size]
            
            for j in range(0, len(influencers_df), batch_size):
                influencer_chunk = influencers_df.iloc[j:j+batch_size]
                
                # Calculate batch scores
                for _, brand in brand_chunk.iterrows():
                    brand_text = self.get_brand_text_features(brand)
                    
                    for _, influencer in influencer_chunk.iterrows():
                        influencer_text = self.get_influencer_text_features(influencer)
                        similarity = self.calculate_text_similarity(brand_text, influencer_text)
                        
                        all_scores.append({
                            'brand_id': brand.name,
                            'brand_name': brand.get('name', 'Unknown Brand'),
                            'influencer_id': influencer.name,
                            'influencer_name': influencer.get('name', 'Unknown Influencer'),
                            'similarity_score': similarity,
                            'brand_text': brand_text,
                            'influencer_text': influencer_text
                        })
                
                logger.info(f"Processed scores for brands {i}-{i+len(brand_chunk)-1}, influencers {j}-{j+len(influencer_chunk)-1}")
        
      
        scores_df = pd.DataFrame(all_scores)
        scores_df = scores_df.sort_values('similarity_score', ascending=False)
        
      
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        scores_df.to_csv(output_path, index=False)
        
        logger.info(f"Saved {len(scores_df)} similarity scores to {output_path} in {time.time() - start_time:.2f} seconds")
content_copyCOPY