import os import logging import pandas as pd from typing import List, Dict, Optional, Any, Union, Tuple from datetime import datetime, timedelta import re import traceback from langdetect import detect, LangDetectException from langdetect.lang_detect_exception import ErrorCode import pycountry import iso639 from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import NoTranscriptFound, TranscriptsDisabled, NoTranscriptAvailable from config.settings import ( RAW_DATA_DIR, PROCESSED_DATA_DIR, VIDEO_SAMPLE_SIZE, COMMENT_SAMPLE_SIZE ) from src.scraper.youtube_api import YouTubeAPI from src.analyzer.audience import AudienceAnalyzer from src.analyzer.content import ContentAnalyzer logger = logging.getLogger(__name__) class DataCollector: def __init__(self, api_key: Optional[str] = None): self.api = YouTubeAPI(api_key) self.audience_analyzer = AudienceAnalyzer() self.content_analyzer = ContentAnalyzer() logger.info("DataCollector initialized") def collect_influencers_by_keywords( self, keywords: List[str], channels_per_keyword: int = 50, videos_per_channel: int = 10, comments_per_video: int = 100, save_intermediate: bool = True ) -> pd.DataFrame: logger.info(f"Starting influencer data collection for {len(keywords)} keywords") # Search for channels by keywords all_channels = pd.DataFrame() for keyword in keywords: logger.info(f"Collecting channels for keyword: {keyword}") channels = self.api.search_channels_by_keyword( keyword=keyword, max_results=channels_per_keyword ) all_channels = pd.concat([all_channels, channels], ignore_index=True) # Remove duplicates all_channels = all_channels.drop_duplicates(subset=['channel_id']) if save_intermediate: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') channel_search_path = os.path.join( PROCESSED_DATA_DIR, f"channel_search_results_{timestamp}.csv" ) all_channels.to_csv(channel_search_path, index=False) logger.info(f"Saved channel search results to {channel_search_path}") # Get channel statistics channel_ids = all_channels['channel_id'].unique().tolist() logger.info(f"Collecting detailed statistics for {len(channel_ids)} channels") channel_stats = self.api.get_channel_statistics(channel_ids) if save_intermediate: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') channel_stats_path = os.path.join( PROCESSED_DATA_DIR, f"channel_statistics_{timestamp}.csv" ) channel_stats.to_csv(channel_stats_path, index=False) logger.info(f"Saved channel statistics to {channel_stats_path}") # Collect videos and comments all_videos = pd.DataFrame() all_video_stats = pd.DataFrame() all_comments = pd.DataFrame() for _, channel in channel_stats.iterrows(): channel_id = channel['channel_id'] playlist_id = channel.get('playlist_id') if not playlist_id: logger.warning(f"No playlist ID found for channel {channel_id}") continue logger.info(f"Collecting videos for channel: {channel['title']} ({channel_id})") # Get videos for channel try: video_ids = self.api.get_channel_videos( playlist_id=playlist_id, max_results=videos_per_channel ) if not video_ids: logger.warning(f"No videos found for channel {channel_id}") continue # Get video details video_details = self.api.get_video_details(video_ids) all_video_stats = pd.concat([all_video_stats, video_details], ignore_index=True) # Get comments for sample of videos for video_id in video_ids[:min(3, len(video_ids))]: try: comments = self.api.get_video_comments( video_id=video_id, max_results=comments_per_video ) all_comments = pd.concat([all_comments, comments], ignore_index=True) except Exception as e: logger.error(f"Error collecting comments for video {video_id}: {str(e)}") except Exception as e: logger.error(f"Error collecting videos for channel {channel_id}: {str(e)}") if save_intermediate: # Save video statistics timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') video_stats_path = os.path.join( PROCESSED_DATA_DIR, f"video_statistics_{timestamp}.csv" ) all_video_stats.to_csv(video_stats_path, index=False) logger.info(f"Saved video statistics to {video_stats_path}") # Save comment data if not all_comments.empty: comments_path = os.path.join( PROCESSED_DATA_DIR, f"video_comments_{timestamp}.csv" ) all_comments.to_csv(comments_path, index=False) logger.info(f"Saved video comments to {comments_path}") # Create comprehensive influencer dataset logger.info("Creating combined influencer dataset") try: influencer_data = self._create_influencer_dataset( channel_stats=channel_stats, video_stats=all_video_stats, comments=all_comments ) # Save final dataset timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_path = os.path.join( PROCESSED_DATA_DIR, f"influencer_data_{timestamp}.csv" ) influencer_data.to_csv(output_path, index=False) logger.info(f"Saved influencer dataset to {output_path}") return influencer_data except Exception as e: logger.error(f"Error creating influencer dataset: {str(e)}") logger.error(traceback.format_exc()) if not influencer_data: logger.warning("No influencer data was generated") return pd.DataFrame(columns=[ "influencer_id", "name", "platform", "location", "languages", "category_niche", "follower_count", "audience_demographics", "engagement_rate", "audience_interests", "content_types", "post_frequency_month", "avg_views", "collaboration_count", "sponsored_ratio", "reputation_score", "follower_quality_score", "content_originality_score", "comment_authenticity_score", "cost_per_post", "negotiation_flexibility", "historical_performance", "controversy_flag", "compliance_status" ]) return pd.DataFrame(influencer_data) def _extract_content_types(self, videos_df: pd.DataFrame) -> List[str]: """Extract content types from video titles and descriptions.""" content_type_keywords = { 'review': ['review', 'unboxing', 'first look', 'hands-on'], 'tutorial': ['tutorial', 'how to', 'guide', 'tips', 'learn'], 'gameplay': ['gameplay', 'playthrough', 'gaming', 'let\'s play'], 'vlog': ['vlog', 'day in the life', 'follow me'], 'interview': ['interview', 'qa', 'q&a', 'questions'], 'reaction': ['reaction', 'reacting to', 'react'], 'podcast': ['podcast', 'talk show', 'discussion'], 'education': ['explained', 'educational', 'learn', 'course'], 'lifestyle': ['lifestyle', 'routine', 'tour'], 'recipes': ['recipe', 'cooking', 'baking', 'food'], 'workout': ['workout', 'exercise', 'fitness', 'training'] } content_types_count = {ct: 0 for ct in content_type_keywords} # Check each video title and description for content type keywords for _, video in videos_df.iterrows(): title = video.get('title', '').lower() if isinstance(video.get('title'), str) else '' description = video.get('description', '').lower() if isinstance(video.get('description'), str) else '' for content_type, keywords in content_type_keywords.items(): for keyword in keywords: if keyword in title or keyword in description: content_types_count[content_type] += 1 break # Get top content types by count top_content_types = sorted(content_types_count.items(), key=lambda x: x[1], reverse=True) return [ct for ct, count in top_content_types if count > 0][:3] def _estimate_cost_per_post(self, followers: int, engagement_rate: float) -> float: """Estimate cost per post based on followers and engagement rate.""" try: # Ensure we have valid numbers followers = int(followers) if pd.notnull(followers) else 0 engagement_rate = float(engagement_rate) if pd.notnull(engagement_rate) else 0 # Base cost calculation by follower count if followers < 10000: # Nano influencer base_cost = 20 + (followers / 10000) * 80 elif followers < 100000: # Micro influencer base_cost = 100 + (followers - 10000) * (400 / 90000) elif followers < 500000: # Mid-tier influencer base_cost = 500 + (followers - 100000) * (4500 / 400000) elif followers < 1000000: # Macro influencer base_cost = 5000 + (followers - 500000) * (5000 / 500000) else: # Mega influencer base_cost = 10000 + (followers - 1000000) * 0.005 # Adjust by engagement rate avg_engagement = 0.02 # 2% is considered average if engagement_rate > 0: engagement_multiplier = max(0.5, min(3.0, engagement_rate / avg_engagement)) else: engagement_multiplier = 0.5 return base_cost * engagement_multiplier except Exception as e: logger.error(f"Error estimating cost per post: {str(e)}") return 100 # Default fallback cost def _clean_category_urls(self, categories: List[str]) -> List[str]: """Clean category URLs to extract readable category names.""" cleaned_categories = [] if not categories: return cleaned_categories if not isinstance(categories, list): if isinstance(categories, str): categories = [categories] else: return cleaned_categories for category in categories: if isinstance(category, str): # Try to extract category name from URL match = re.search(r'/([^/]+)$', category) if match: # Convert underscores to spaces and capitalize category_name = match.group(1).replace('_', ' ').title() cleaned_categories.append(category_name) else: # If it's not a URL, use as is if not category.startswith('http'): cleaned_categories.append(category) else: # Last resort: split by slashes and take last part parts = category.split('/') if parts: category_name = parts[-1].replace('_', ' ').title() cleaned_categories.append(category_name) return cleaned_categories def _get_transcript_for_video(self, video_id: str, max_chars: int = 10000) -> str: """ Get transcript text for a video using YouTube Transcript API. Returns empty string if transcript is not available. """ try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) # First try to get a manual transcript (usually more accurate) try: transcript = transcript_list.find_manually_created_transcript() transcript_data = transcript.fetch() except: # Fall back to generated transcript try: transcript = transcript_list.find_generated_transcript() transcript_data = transcript.fetch() except: # Try any available transcript transcript = transcript_list.find_transcript(['en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh-Hans']) transcript_data = transcript.fetch() # Get the text from transcript entries full_text = " ".join([entry['text'] for entry in transcript_data]) # Limit text length to prevent processing very long transcripts return full_text[:max_chars] except (NoTranscriptFound, TranscriptsDisabled, NoTranscriptAvailable) as e: logger.warning(f"No transcript available for video {video_id}: {str(e)}") return "" except Exception as e: logger.error(f"Error fetching transcript for video {video_id}: {str(e)}") return "" def _detect_language_from_transcripts(self, video_ids: List[str], max_videos: int = 3) -> Tuple[str, str]: """ Detect language from video transcripts. Returns a tuple of (language_code, language_name) """ logger.info(f"Detecting language from transcripts of {min(len(video_ids), max_videos)} videos") transcript_texts = [] # Try to get transcripts from up to max_videos videos for video_id in video_ids[:max_videos]: transcript_text = self._get_transcript_for_video(video_id) if transcript_text: transcript_texts.append(transcript_text) # If we get a good transcript, we might not need more if len(transcript_text) > 1000: break if not transcript_texts: logger.warning("No transcripts found for language detection") return "en", "English" # Default fallback # Combine transcript texts and detect language combined_text = " ".join(transcript_texts) try: lang_code = detect(combined_text) try: language = iso639.languages.get(part1=lang_code) lang_name = language.name except (KeyError, AttributeError): try: language = pycountry.languages.get(alpha_2=lang_code) lang_name = language.name if language else lang_code except (KeyError, AttributeError): lang_name = f"Unknown ({lang_code})" logger.info(f"Detected language from transcript: {lang_name} ({lang_code})") return lang_code, lang_name except LangDetectException as e: logger.warning(f"Could not detect language from transcript: {e}") return "en", "English" # Default fallback def _detect_language(self, text_samples: List[str]) -> Tuple[str, str]: """ Detect the language from a list of text samples. Returns a tuple of (language_code, language_name) """ if not text_samples: return "en", "English" # Default fallback # Combine text samples for better detection combined_text = " ".join(text_samples)[:10000] try: # Detect language from text lang_code = detect(combined_text) # Get language name try: language = iso639.languages.get(part1=lang_code) lang_name = language.name except (KeyError, AttributeError): try: language = pycountry.languages.get(alpha_2=lang_code) lang_name = language.name if language else lang_code except (KeyError, AttributeError): lang_name = f"Unknown ({lang_code})" return lang_code, lang_name except LangDetectException as e: if hasattr(e, "code") and e.code == ErrorCode.CantDetectLanguage: logger.warning(f"Could not detect language: {e}") else: logger.error(f"Language detection error: {e}") return "en", "English" # Default fallback def _create_influencer_dataset( self, channel_stats: pd.DataFrame, video_stats: pd.DataFrame, comments: pd.DataFrame ) -> pd.DataFrame: """Create a comprehensive dataset of influencer information.""" logger.info("Creating influencer dataset") influencer_data = [] for i, (_, channel) in enumerate(channel_stats.iterrows()): try: channel_id = channel['channel_id'] # Generate influencer ID influencer_id = f"I{(i+1):03d}" # Get videos for this channel channel_videos = video_stats[video_stats['channel_id'] == channel_id].copy() if channel_videos.empty: logger.warning(f"No videos found for channel {channel_id} in the collected data") continue # Calculate basic engagement metrics total_views = channel_videos['view_count'].sum() total_likes = channel_videos['like_count'].sum() total_comments = channel_videos['comment_count'].sum() avg_views = channel_videos['view_count'].mean() avg_likes = channel_videos['like_count'].mean() avg_comments = channel_videos['comment_count'].mean() # Ensure numeric values total_views = float(total_views) if pd.notnull(total_views) else 0 total_likes = float(total_likes) if pd.notnull(total_likes) else 0 total_comments = float(total_comments) if pd.notnull(total_comments) else 0 # Calculate engagement rate if total_views > 0: engagement_rate = ((total_likes + total_comments) / total_views) * 100 else: engagement_rate = 0 # Format engagement rate for later calculations engagement_rate_formatted = round(engagement_rate / 100, 3) # Calculate post frequency if len(channel_videos) >= 2: try: # Convert published_at to datetime channel_videos['published_at'] = pd.to_datetime(channel_videos['published_at'], errors='coerce') # Filter out videos with invalid dates valid_dates = channel_videos[channel_videos['published_at'].notna()] if len(valid_dates) >= 2: # Sort by date sorted_videos = valid_dates.sort_values('published_at') # Calculate date range first_video_date = sorted_videos['published_at'].iloc[0] last_video_date = sorted_videos['published_at'].iloc[-1] date_diff = (last_video_date - first_video_date).days # Calculate posts per month if date_diff > 0: post_frequency = (len(channel_videos) / (date_diff / 30)) else: post_frequency = len(channel_videos) else: post_frequency = len(channel_videos) except Exception as e: logger.error(f"Error calculating post frequency for channel {channel_id}: {str(e)}") post_frequency = len(channel_videos) else: post_frequency = len(channel_videos) # Extract categories categories = [] for _, video in channel_videos.iterrows(): category = video.get('topic_categories') if isinstance(category, list): categories.extend(self._clean_category_urls(category)) # Get country information country = channel.get('country') if country and isinstance(country, str): country_name = country else: # Try to determine from comments channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])] if not channel_comments.empty and 'author_country' in channel_comments.columns: # Get most common country from comments country_counts = channel_comments['author_country'].value_counts() country_name = country_counts.index[0] if len(country_counts) > 0 else "Unknown" else: country_name = "Global" # Language detection - with improved transcript-based detection # 1. First try from channel metadata language_code = channel.get('default_language') language_name = None # 2. If available in metadata, get language name if language_code and isinstance(language_code, str): try: # Try to get language name from ISO 639-1 code language = iso639.languages.get(part1=language_code) language_name = language.name except (KeyError, AttributeError): try: # Try pycountry as fallback language = pycountry.languages.get(alpha_2=language_code) language_name = language.name if language else None except (KeyError, AttributeError): language_name = None # 3. If language not determined from metadata, try transcript-based detection if not language_name: # Get video IDs to analyze video_ids = channel_videos['video_id'].tolist() # Try to detect language from transcripts transcript_lang_code, transcript_lang_name = self._detect_language_from_transcripts(video_ids) # If we got a valid language from transcript, use it if transcript_lang_code != "en" or (transcript_lang_code == "en" and len(video_ids) > 0): language_code, language_name = transcript_lang_code, transcript_lang_name logger.info(f"Using transcript-based language detection for channel {channel_id}: {language_name}") else: # 4. As last resort, fall back to text-based detection text_samples = [] # Collect text samples from video titles and descriptions for _, video in channel_videos.iterrows(): title = video.get('title') desc = video.get('description') if isinstance(title, str) and len(title) > 10: text_samples.append(title) if isinstance(desc, str) and len(desc) > 20: # Limit description length text_samples.append(desc[:500]) # Add channel description channel_desc = channel.get('description') if isinstance(channel_desc, str) and len(channel_desc) > 20: text_samples.append(channel_desc) # Add comments as text samples channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])] if not channel_comments.empty: for comment_text in channel_comments['text'].head(30): if isinstance(comment_text, str) and len(comment_text) > 15: text_samples.append(comment_text) # Detect language from text samples if text_samples: language_code, language_name = self._detect_language(text_samples) else: language_code, language_name = "en", "English" # Extract channel keywords and video tags channel_keywords = channel.get('keywords', '') video_tags = [] for tags in channel_videos['tags']: if isinstance(tags, list): video_tags.extend(tags) # Detect sponsored content sponsored_keywords = ['sponsored', 'ad', 'advertisement', 'partner', 'paid', '#ad', '#sponsored'] sponsored_count = 0 total_analyzed = 0 for title in channel_videos['title']: if isinstance(title, str): total_analyzed += 1 if any(kw.lower() in title.lower() for kw in sponsored_keywords): sponsored_count += 1 for desc in channel_videos['description']: if isinstance(desc, str): # Only count unique videos if total_analyzed < len(channel_videos): total_analyzed += 1 if any(kw.lower() in desc.lower() for kw in sponsored_keywords): sponsored_count += 1 # Calculate sponsored content ratio sponsored_ratio = sponsored_count / max(1, total_analyzed) # Analyze audience sentiment and authenticity comment_sentiment = 0.5 comment_authenticity = 0.5 if not comments.empty: channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])].copy() if not channel_comments.empty: try: audience_analysis = self.audience_analyzer.analyze_audience_from_comments(channel_comments) comment_sentiment = audience_analysis.get('sentiment_score', 0.5) comment_authenticity = audience_analysis.get('authenticity_score', 0.5) except Exception as e: logger.warning(f"Could not analyze audience for channel {channel_id}: {e}") # Estimate audience demographics audience_type = "Unknown" if len(categories) > 0: # Use audience analyzer if available if hasattr(self.audience_analyzer, 'estimate_demographics'): try: demographics = self.audience_analyzer.estimate_demographics( channel_data=channel.to_dict(), video_stats=channel_videos, comments=channel_comments if 'channel_comments' in locals() else pd.DataFrame() ) # Extract primary demographic info primary_age = max(demographics.get('age_groups', {}).items(), key=lambda x: x[1])[0] primary_gender = max(demographics.get('gender_split', {}).items(), key=lambda x: x[1])[0] if primary_gender == 'male' and primary_age in ['13-17', '18-24']: audience_type = "Young Male Adults" elif primary_gender == 'female' and primary_age in ['13-17', '18-24']: audience_type = "Young Female Adults" elif primary_age in ['25-34', '35-44']: audience_type = "Adults 25-44" elif primary_age in ['45-54', '55+']: audience_type = "Adults 45+" else: # Fall back to category-based audience type if any('gaming' in c.lower() for c in categories): audience_type = "Gaming Enthusiasts" elif any('beauty' in c.lower() for c in categories): audience_type = "Beauty Enthusiasts" elif any('tech' in c.lower() for c in categories): audience_type = "Tech Enthusiasts" else: audience_type = "General Audience" except Exception as e: logger.warning(f"Error estimating demographics for channel {channel_id}: {e}") else: # Use category-based audience type as fallback if any('gaming' in c.lower() for c in categories): audience_type = "Gaming Enthusiasts" elif any('beauty' in c.lower() for c in categories): audience_type = "Beauty Enthusiasts" elif any('tech' in c.lower() for c in categories): audience_type = "Tech Enthusiasts" else: audience_type = "General Audience" # Extract category and content types category_niche = "/".join(set(categories[:3])) if categories else "general" content_types = "/".join(self._extract_content_types(channel_videos)) if len(channel_videos) > 0 else "general" # Extract audience interests audience_interests = [] if hasattr(self.audience_analyzer, 'analyze_audience_interests'): try: audience_interests = self.audience_analyzer.analyze_audience_interests(channel_videos) except Exception as e: logger.warning(f"Error analyzing audience interests for channel {channel_id}: {e}") # Fallback to video tags for audience interests if not audience_interests and video_tags: tag_counts = {} for tag in video_tags: if isinstance(tag, str): tag_counts[tag] = tag_counts.get(tag, 0) + 1 sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True) audience_interests = [tag for tag, _ in sorted_tags[:5]] audience_interests_str = "/".join(audience_interests) if audience_interests else "general" # Set platform platform = "YouTube" # Detect collaborations collaboration_count = 0 collab_keywords = ['collab', 'featuring', 'feat', 'ft.', 'with', 'x ', ' x '] for title in channel_videos['title']: if isinstance(title, str) and any(kw.lower() in title.lower() for kw in collab_keywords): collaboration_count += 1 for desc in channel_videos['description']: if isinstance(desc, str) and any(kw.lower() in desc.lower() for kw in collab_keywords): # Avoid double counting if collaboration_count < len(channel_videos): collaboration_count += 1 # Calculate reputation score based on comment sentiment # Calculate reputation score based on comment sentiment reputation_score = round(comment_sentiment, 2) # Calculate follower quality score avg_platform_er = 0.015 # Average engagement rate on YouTube if engagement_rate_formatted > 0: follower_quality_score = round(min(0.99, max(0.1, engagement_rate_formatted / avg_platform_er * 0.5)), 2) else: follower_quality_score = 0.1 # Calculate content originality score if hasattr(self.content_analyzer, 'calculate_content_originality'): try: content_originality_raw = self.content_analyzer.calculate_content_originality(channel_videos) content_originality_score = round(min(0.99, max(0.1, content_originality_raw / 10)), 2) except Exception as e: logger.warning(f"Error calculating content originality for channel {channel_id}: {e}") # Fallback method for content originality title_word_set = set() title_word_count = 0 for title in channel_videos['title']: if isinstance(title, str): words = re.findall(r'\b\w+\b', title.lower()) title_word_set.update(words) title_word_count += len(words) title_uniqueness = len(title_word_set) / max(1, title_word_count) content_originality_score = round(min(0.99, max(0.1, 0.5 + title_uniqueness * 0.4)), 2) else: # Fallback if content analyzer method not available title_word_set = set() title_word_count = 0 for title in channel_videos['title']: if isinstance(title, str): words = re.findall(r'\b\w+\b', title.lower()) title_word_set.update(words) title_word_count += len(words) title_uniqueness = len(title_word_set) / max(1, title_word_count) content_originality_score = round(min(0.99, max(0.1, 0.5 + title_uniqueness * 0.4)), 2) # Calculate comment authenticity score if not comments.empty and 'channel_comments' in locals() and not channel_comments.empty: unique_commenters = len(channel_comments['author'].unique()) total_comments = len(channel_comments) if total_comments > 0: # Calculate ratio of unique commenters to total comments uniqueness_ratio = unique_commenters / total_comments comment_authenticity_score = round(min(0.99, max(0.1, 0.3 + uniqueness_ratio * 0.6)), 2) else: comment_authenticity_score = 0.5 else: comment_authenticity_score = 0.5 # Get subscriber count for cost estimation subscriber_count = channel.get('subscriber_count', 0) if not isinstance(subscriber_count, (int, float)) or pd.isna(subscriber_count): subscriber_count = 0 # Calculate cost per post cost_per_post = round(self._estimate_cost_per_post(subscriber_count, engagement_rate_formatted)) # Determine negotiation flexibility try: channel_age_days = (datetime.now() - pd.to_datetime(channel['published_at'])).days # New channels or very active ones tend to be more flexible if channel_age_days < 365 or post_frequency > 8: negotiation_flexibility = "flexible" # Well-established channels with high engagement tend to be strict elif channel_age_days > 1825 and engagement_rate > 5: negotiation_flexibility = "strict" # Moderate flexibility for channels with good engagement elif engagement_rate > 3: negotiation_flexibility = "medium" else: negotiation_flexibility = "negotiable" except: # Default if we can't calculate negotiation_flexibility = "negotiable" # Calculate historical performance if subscriber_count > 0: historical_perf = round(min(0.99, avg_views / subscriber_count), 2) else: # Fallback based on engagement rate historical_perf = round(min(0.99, max(0.01, engagement_rate_formatted * 10)), 2) # Check for controversy flags controversy_flag = "false" if 'like_count' in channel_videos.columns and 'dislike_count' in channel_videos.columns: # YouTube API doesn't expose dislikes anymore, but keeping this code for future reference total_likes = channel_videos['like_count'].sum() total_dislikes = channel_videos['dislike_count'].sum() if 'dislike_count' in channel_videos.columns else 0 if total_likes + total_dislikes > 0: dislike_ratio = total_dislikes / (total_likes + total_dislikes) if dislike_ratio > 0.25: # More than 25% dislikes indicates controversy controversy_flag = "true" # Check compliance status compliance_status = "verified" if any(channel_videos['made_for_kids'] == True) and any(title.lower().find('adult') >= 0 for title in channel_videos['title'] if isinstance(title, str)): # Potential mismatch between content marking and actual content compliance_status = "review_needed" # Create influencer entry influencer = { "influencer_id": influencer_id, "name": str(channel.get('title', f"Channel {channel_id}")), "platform": platform, "location": country_name, "languages": language_name, "category_niche": category_niche, "follower_count": int(subscriber_count), "audience_demographics": audience_type, "engagement_rate": engagement_rate_formatted, "audience_interests": audience_interests_str, "content_types": content_types, "post_frequency_month": round(post_frequency, 1), "avg_views": int(avg_views), "collaboration_count": collaboration_count, "sponsored_ratio": round(sponsored_ratio, 2), "reputation_score": reputation_score, "follower_quality_score": follower_quality_score, "content_originality_score": content_originality_score, "comment_authenticity_score": comment_authenticity_score, "cost_per_post": int(cost_per_post), "negotiation_flexibility": negotiation_flexibility, "historical_performance": historical_perf, "controversy_flag": controversy_flag, "compliance_status": compliance_status } influencer_data.append(influencer) logger.info(f"Processed influencer: {influencer['name']} ({influencer_id})") except Exception as e: logger.error(f"Error processing channel {channel.get('channel_id')}: {str(e)}") logger.error(traceback.format_exc()) if not influencer_data: logger.warning("No influencer data was generated") # Return empty DataFrame with expected columns return pd.DataFrame(columns=[ "influencer_id", "name", "platform", "location", "languages", "category_niche", "follower_count", "audience_demographics", "engagement_rate", "audience_interests", "content_types", "post_frequency_month", "avg_views", "collaboration_count", "sponsored_ratio", "reputation_score", "follower_quality_score", "content_originality_score", "comment_authenticity_score", "cost_per_post", "negotiation_flexibility", "historical_performance", "controversy_flag", "compliance_status" ]) return pd.DataFrame(influencer_data) def _extract_content_types(self, videos_df: pd.DataFrame) -> List[str]: content_type_keywords = { 'review': ['review', 'unboxing', 'first look', 'hands-on'], 'tutorial': ['tutorial', 'how to', 'guide', 'tips', 'learn'], 'gameplay': ['gameplay', 'playthrough', 'gaming', 'let\'s play'], 'vlog': ['vlog', 'day in the life', 'follow me'], 'interview': ['interview', 'qa', 'q&a', 'questions'], 'reaction': ['reaction', 'reacting to', 'react'], 'podcast': ['podcast', 'talk show', 'discussion'], 'education': ['explained', 'educational', 'learn', 'course'], 'lifestyle': ['lifestyle', 'routine', 'tour'], 'recipes': ['recipe', 'cooking', 'baking', 'food'], 'workout': ['workout', 'exercise', 'fitness', 'training'] } content_types_count = {ct: 0 for ct in content_type_keywords} for _, video in videos_df.iterrows(): title = video.get('title', '').lower() if isinstance(video.get('title'), str) else '' description = video.get('description', '').lower() if isinstance(video.get('description'), str) else '' for content_type, keywords in content_type_keywords.items(): for keyword in keywords: if keyword in title or keyword in description: content_types_count[content_type] += 1 break top_content_types = sorted(content_types_count.items(), key=lambda x: x[1], reverse=True) return [ct for ct, count in top_content_types if count > 0][:3] def _estimate_cost_per_post(self, followers: int, engagement_rate: float) -> float: try: followers = int(followers) if pd.notnull(followers) else 0 engagement_rate = float(engagement_rate) if pd.notnull(engagement_rate) else 0 if followers < 10000: base_cost = 20 + (followers / 10000) * 80 elif followers < 100000: base_cost = 100 + (followers - 10000) * (400 / 90000) elif followers < 500000: base_cost = 500 + (followers - 100000) * (4500 / 400000) elif followers < 1000000: base_cost = 5000 + (followers - 500000) * (5000 / 500000) else: base_cost = 10000 + (followers - 1000000) * 0.005 avg_engagement = 0.02 if engagement_rate > 0: engagement_multiplier = max(0.5, min(3.0, engagement_rate / avg_engagement)) else: engagement_multiplier = 0.5 return base_cost * engagement_multiplier except Exception as e: logger.error(f"Error estimating cost per post: {str(e)}") return 100