data collector with transcript code
Tue Mar 18 2025 03:36:29 GMT+0000 (Coordinated Universal Time)
Saved by @piyushkumar121 #python
import os
import logging
import pandas as pd
from typing import List, Dict, Optional, Any, Union, Tuple
from datetime import datetime, timedelta
import re
import traceback
from langdetect import detect, LangDetectException
from langdetect.lang_detect_exception import ErrorCode
import pycountry
import iso639
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import NoTranscriptFound, TranscriptsDisabled, NoTranscriptAvailable
from config.settings import (
RAW_DATA_DIR,
PROCESSED_DATA_DIR,
VIDEO_SAMPLE_SIZE,
COMMENT_SAMPLE_SIZE
)
from src.scraper.youtube_api import YouTubeAPI
from src.analyzer.audience import AudienceAnalyzer
from src.analyzer.content import ContentAnalyzer
logger = logging.getLogger(__name__)
class DataCollector:
def __init__(self, api_key: Optional[str] = None):
self.api = YouTubeAPI(api_key)
self.audience_analyzer = AudienceAnalyzer()
self.content_analyzer = ContentAnalyzer()
logger.info("DataCollector initialized")
def collect_influencers_by_keywords(
self,
keywords: List[str],
channels_per_keyword: int = 50,
videos_per_channel: int = 10,
comments_per_video: int = 100,
save_intermediate: bool = True
) -> pd.DataFrame:
logger.info(f"Starting influencer data collection for {len(keywords)} keywords")
# Search for channels by keywords
all_channels = pd.DataFrame()
for keyword in keywords:
logger.info(f"Collecting channels for keyword: {keyword}")
channels = self.api.search_channels_by_keyword(
keyword=keyword,
max_results=channels_per_keyword
)
all_channels = pd.concat([all_channels, channels], ignore_index=True)
# Remove duplicates
all_channels = all_channels.drop_duplicates(subset=['channel_id'])
if save_intermediate:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
channel_search_path = os.path.join(
PROCESSED_DATA_DIR,
f"channel_search_results_{timestamp}.csv"
)
all_channels.to_csv(channel_search_path, index=False)
logger.info(f"Saved channel search results to {channel_search_path}")
# Get channel statistics
channel_ids = all_channels['channel_id'].unique().tolist()
logger.info(f"Collecting detailed statistics for {len(channel_ids)} channels")
channel_stats = self.api.get_channel_statistics(channel_ids)
if save_intermediate:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
channel_stats_path = os.path.join(
PROCESSED_DATA_DIR,
f"channel_statistics_{timestamp}.csv"
)
channel_stats.to_csv(channel_stats_path, index=False)
logger.info(f"Saved channel statistics to {channel_stats_path}")
# Collect videos and comments
all_videos = pd.DataFrame()
all_video_stats = pd.DataFrame()
all_comments = pd.DataFrame()
for _, channel in channel_stats.iterrows():
channel_id = channel['channel_id']
playlist_id = channel.get('playlist_id')
if not playlist_id:
logger.warning(f"No playlist ID found for channel {channel_id}")
continue
logger.info(f"Collecting videos for channel: {channel['title']} ({channel_id})")
# Get videos for channel
try:
video_ids = self.api.get_channel_videos(
playlist_id=playlist_id,
max_results=videos_per_channel
)
if not video_ids:
logger.warning(f"No videos found for channel {channel_id}")
continue
# Get video details
video_details = self.api.get_video_details(video_ids)
all_video_stats = pd.concat([all_video_stats, video_details], ignore_index=True)
# Get comments for sample of videos
for video_id in video_ids[:min(3, len(video_ids))]:
try:
comments = self.api.get_video_comments(
video_id=video_id,
max_results=comments_per_video
)
all_comments = pd.concat([all_comments, comments], ignore_index=True)
except Exception as e:
logger.error(f"Error collecting comments for video {video_id}: {str(e)}")
except Exception as e:
logger.error(f"Error collecting videos for channel {channel_id}: {str(e)}")
if save_intermediate:
# Save video statistics
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
video_stats_path = os.path.join(
PROCESSED_DATA_DIR,
f"video_statistics_{timestamp}.csv"
)
all_video_stats.to_csv(video_stats_path, index=False)
logger.info(f"Saved video statistics to {video_stats_path}")
# Save comment data
if not all_comments.empty:
comments_path = os.path.join(
PROCESSED_DATA_DIR,
f"video_comments_{timestamp}.csv"
)
all_comments.to_csv(comments_path, index=False)
logger.info(f"Saved video comments to {comments_path}")
# Create comprehensive influencer dataset
logger.info("Creating combined influencer dataset")
try:
influencer_data = self._create_influencer_dataset(
channel_stats=channel_stats,
video_stats=all_video_stats,
comments=all_comments
)
# Save final dataset
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = os.path.join(
PROCESSED_DATA_DIR,
f"influencer_data_{timestamp}.csv"
)
influencer_data.to_csv(output_path, index=False)
logger.info(f"Saved influencer dataset to {output_path}")
return influencer_data
except Exception as e:
logger.error(f"Error creating influencer dataset: {str(e)}")
logger.error(traceback.format_exc())
if not influencer_data:
logger.warning("No influencer data was generated")
return pd.DataFrame(columns=[
"influencer_id", "name", "platform", "location", "languages",
"category_niche", "follower_count", "audience_demographics",
"engagement_rate", "audience_interests", "content_types",
"post_frequency_month", "avg_views", "collaboration_count",
"sponsored_ratio", "reputation_score", "follower_quality_score",
"content_originality_score", "comment_authenticity_score",
"cost_per_post", "negotiation_flexibility", "historical_performance",
"controversy_flag", "compliance_status"
])
return pd.DataFrame(influencer_data)
def _extract_content_types(self, videos_df: pd.DataFrame) -> List[str]:
"""Extract content types from video titles and descriptions."""
content_type_keywords = {
'review': ['review', 'unboxing', 'first look', 'hands-on'],
'tutorial': ['tutorial', 'how to', 'guide', 'tips', 'learn'],
'gameplay': ['gameplay', 'playthrough', 'gaming', 'let\'s play'],
'vlog': ['vlog', 'day in the life', 'follow me'],
'interview': ['interview', 'qa', 'q&a', 'questions'],
'reaction': ['reaction', 'reacting to', 'react'],
'podcast': ['podcast', 'talk show', 'discussion'],
'education': ['explained', 'educational', 'learn', 'course'],
'lifestyle': ['lifestyle', 'routine', 'tour'],
'recipes': ['recipe', 'cooking', 'baking', 'food'],
'workout': ['workout', 'exercise', 'fitness', 'training']
}
content_types_count = {ct: 0 for ct in content_type_keywords}
# Check each video title and description for content type keywords
for _, video in videos_df.iterrows():
title = video.get('title', '').lower() if isinstance(video.get('title'), str) else ''
description = video.get('description', '').lower() if isinstance(video.get('description'), str) else ''
for content_type, keywords in content_type_keywords.items():
for keyword in keywords:
if keyword in title or keyword in description:
content_types_count[content_type] += 1
break
# Get top content types by count
top_content_types = sorted(content_types_count.items(), key=lambda x: x[1], reverse=True)
return [ct for ct, count in top_content_types if count > 0][:3]
def _estimate_cost_per_post(self, followers: int, engagement_rate: float) -> float:
"""Estimate cost per post based on followers and engagement rate."""
try:
# Ensure we have valid numbers
followers = int(followers) if pd.notnull(followers) else 0
engagement_rate = float(engagement_rate) if pd.notnull(engagement_rate) else 0
# Base cost calculation by follower count
if followers < 10000: # Nano influencer
base_cost = 20 + (followers / 10000) * 80
elif followers < 100000: # Micro influencer
base_cost = 100 + (followers - 10000) * (400 / 90000)
elif followers < 500000: # Mid-tier influencer
base_cost = 500 + (followers - 100000) * (4500 / 400000)
elif followers < 1000000: # Macro influencer
base_cost = 5000 + (followers - 500000) * (5000 / 500000)
else: # Mega influencer
base_cost = 10000 + (followers - 1000000) * 0.005
# Adjust by engagement rate
avg_engagement = 0.02 # 2% is considered average
if engagement_rate > 0:
engagement_multiplier = max(0.5, min(3.0, engagement_rate / avg_engagement))
else:
engagement_multiplier = 0.5
return base_cost * engagement_multiplier
except Exception as e:
logger.error(f"Error estimating cost per post: {str(e)}")
return 100 # Default fallback cost
def _clean_category_urls(self, categories: List[str]) -> List[str]:
"""Clean category URLs to extract readable category names."""
cleaned_categories = []
if not categories:
return cleaned_categories
if not isinstance(categories, list):
if isinstance(categories, str):
categories = [categories]
else:
return cleaned_categories
for category in categories:
if isinstance(category, str):
# Try to extract category name from URL
match = re.search(r'/([^/]+)$', category)
if match:
# Convert underscores to spaces and capitalize
category_name = match.group(1).replace('_', ' ').title()
cleaned_categories.append(category_name)
else:
# If it's not a URL, use as is
if not category.startswith('http'):
cleaned_categories.append(category)
else:
# Last resort: split by slashes and take last part
parts = category.split('/')
if parts:
category_name = parts[-1].replace('_', ' ').title()
cleaned_categories.append(category_name)
return cleaned_categories
def _get_transcript_for_video(self, video_id: str, max_chars: int = 10000) -> str:
"""
Get transcript text for a video using YouTube Transcript API.
Returns empty string if transcript is not available.
"""
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# First try to get a manual transcript (usually more accurate)
try:
transcript = transcript_list.find_manually_created_transcript()
transcript_data = transcript.fetch()
except:
# Fall back to generated transcript
try:
transcript = transcript_list.find_generated_transcript()
transcript_data = transcript.fetch()
except:
# Try any available transcript
transcript = transcript_list.find_transcript(['en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh-Hans'])
transcript_data = transcript.fetch()
# Get the text from transcript entries
full_text = " ".join([entry['text'] for entry in transcript_data])
# Limit text length to prevent processing very long transcripts
return full_text[:max_chars]
except (NoTranscriptFound, TranscriptsDisabled, NoTranscriptAvailable) as e:
logger.warning(f"No transcript available for video {video_id}: {str(e)}")
return ""
except Exception as e:
logger.error(f"Error fetching transcript for video {video_id}: {str(e)}")
return ""
def _detect_language_from_transcripts(self, video_ids: List[str], max_videos: int = 3) -> Tuple[str, str]:
"""
Detect language from video transcripts.
Returns a tuple of (language_code, language_name)
"""
logger.info(f"Detecting language from transcripts of {min(len(video_ids), max_videos)} videos")
transcript_texts = []
# Try to get transcripts from up to max_videos videos
for video_id in video_ids[:max_videos]:
transcript_text = self._get_transcript_for_video(video_id)
if transcript_text:
transcript_texts.append(transcript_text)
# If we get a good transcript, we might not need more
if len(transcript_text) > 1000:
break
if not transcript_texts:
logger.warning("No transcripts found for language detection")
return "en", "English" # Default fallback
# Combine transcript texts and detect language
combined_text = " ".join(transcript_texts)
try:
lang_code = detect(combined_text)
try:
language = iso639.languages.get(part1=lang_code)
lang_name = language.name
except (KeyError, AttributeError):
try:
language = pycountry.languages.get(alpha_2=lang_code)
lang_name = language.name if language else lang_code
except (KeyError, AttributeError):
lang_name = f"Unknown ({lang_code})"
logger.info(f"Detected language from transcript: {lang_name} ({lang_code})")
return lang_code, lang_name
except LangDetectException as e:
logger.warning(f"Could not detect language from transcript: {e}")
return "en", "English" # Default fallback
def _detect_language(self, text_samples: List[str]) -> Tuple[str, str]:
"""
Detect the language from a list of text samples.
Returns a tuple of (language_code, language_name)
"""
if not text_samples:
return "en", "English" # Default fallback
# Combine text samples for better detection
combined_text = " ".join(text_samples)[:10000]
try:
# Detect language from text
lang_code = detect(combined_text)
# Get language name
try:
language = iso639.languages.get(part1=lang_code)
lang_name = language.name
except (KeyError, AttributeError):
try:
language = pycountry.languages.get(alpha_2=lang_code)
lang_name = language.name if language else lang_code
except (KeyError, AttributeError):
lang_name = f"Unknown ({lang_code})"
return lang_code, lang_name
except LangDetectException as e:
if hasattr(e, "code") and e.code == ErrorCode.CantDetectLanguage:
logger.warning(f"Could not detect language: {e}")
else:
logger.error(f"Language detection error: {e}")
return "en", "English" # Default fallback
def _create_influencer_dataset(
self,
channel_stats: pd.DataFrame,
video_stats: pd.DataFrame,
comments: pd.DataFrame
) -> pd.DataFrame:
"""Create a comprehensive dataset of influencer information."""
logger.info("Creating influencer dataset")
influencer_data = []
for i, (_, channel) in enumerate(channel_stats.iterrows()):
try:
channel_id = channel['channel_id']
# Generate influencer ID
influencer_id = f"I{(i+1):03d}"
# Get videos for this channel
channel_videos = video_stats[video_stats['channel_id'] == channel_id].copy()
if channel_videos.empty:
logger.warning(f"No videos found for channel {channel_id} in the collected data")
continue
# Calculate basic engagement metrics
total_views = channel_videos['view_count'].sum()
total_likes = channel_videos['like_count'].sum()
total_comments = channel_videos['comment_count'].sum()
avg_views = channel_videos['view_count'].mean()
avg_likes = channel_videos['like_count'].mean()
avg_comments = channel_videos['comment_count'].mean()
# Ensure numeric values
total_views = float(total_views) if pd.notnull(total_views) else 0
total_likes = float(total_likes) if pd.notnull(total_likes) else 0
total_comments = float(total_comments) if pd.notnull(total_comments) else 0
# Calculate engagement rate
if total_views > 0:
engagement_rate = ((total_likes + total_comments) / total_views) * 100
else:
engagement_rate = 0
# Format engagement rate for later calculations
engagement_rate_formatted = round(engagement_rate / 100, 3)
# Calculate post frequency
if len(channel_videos) >= 2:
try:
# Convert published_at to datetime
channel_videos['published_at'] = pd.to_datetime(channel_videos['published_at'], errors='coerce')
# Filter out videos with invalid dates
valid_dates = channel_videos[channel_videos['published_at'].notna()]
if len(valid_dates) >= 2:
# Sort by date
sorted_videos = valid_dates.sort_values('published_at')
# Calculate date range
first_video_date = sorted_videos['published_at'].iloc[0]
last_video_date = sorted_videos['published_at'].iloc[-1]
date_diff = (last_video_date - first_video_date).days
# Calculate posts per month
if date_diff > 0:
post_frequency = (len(channel_videos) / (date_diff / 30))
else:
post_frequency = len(channel_videos)
else:
post_frequency = len(channel_videos)
except Exception as e:
logger.error(f"Error calculating post frequency for channel {channel_id}: {str(e)}")
post_frequency = len(channel_videos)
else:
post_frequency = len(channel_videos)
# Extract categories
categories = []
for _, video in channel_videos.iterrows():
category = video.get('topic_categories')
if isinstance(category, list):
categories.extend(self._clean_category_urls(category))
# Get country information
country = channel.get('country')
if country and isinstance(country, str):
country_name = country
else:
# Try to determine from comments
channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])]
if not channel_comments.empty and 'author_country' in channel_comments.columns:
# Get most common country from comments
country_counts = channel_comments['author_country'].value_counts()
country_name = country_counts.index[0] if len(country_counts) > 0 else "Unknown"
else:
country_name = "Global"
# Language detection - with improved transcript-based detection
# 1. First try from channel metadata
language_code = channel.get('default_language')
language_name = None
# 2. If available in metadata, get language name
if language_code and isinstance(language_code, str):
try:
# Try to get language name from ISO 639-1 code
language = iso639.languages.get(part1=language_code)
language_name = language.name
except (KeyError, AttributeError):
try:
# Try pycountry as fallback
language = pycountry.languages.get(alpha_2=language_code)
language_name = language.name if language else None
except (KeyError, AttributeError):
language_name = None
# 3. If language not determined from metadata, try transcript-based detection
if not language_name:
# Get video IDs to analyze
video_ids = channel_videos['video_id'].tolist()
# Try to detect language from transcripts
transcript_lang_code, transcript_lang_name = self._detect_language_from_transcripts(video_ids)
# If we got a valid language from transcript, use it
if transcript_lang_code != "en" or (transcript_lang_code == "en" and len(video_ids) > 0):
language_code, language_name = transcript_lang_code, transcript_lang_name
logger.info(f"Using transcript-based language detection for channel {channel_id}: {language_name}")
else:
# 4. As last resort, fall back to text-based detection
text_samples = []
# Collect text samples from video titles and descriptions
for _, video in channel_videos.iterrows():
title = video.get('title')
desc = video.get('description')
if isinstance(title, str) and len(title) > 10:
text_samples.append(title)
if isinstance(desc, str) and len(desc) > 20:
# Limit description length
text_samples.append(desc[:500])
# Add channel description
channel_desc = channel.get('description')
if isinstance(channel_desc, str) and len(channel_desc) > 20:
text_samples.append(channel_desc)
# Add comments as text samples
channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])]
if not channel_comments.empty:
for comment_text in channel_comments['text'].head(30):
if isinstance(comment_text, str) and len(comment_text) > 15:
text_samples.append(comment_text)
# Detect language from text samples
if text_samples:
language_code, language_name = self._detect_language(text_samples)
else:
language_code, language_name = "en", "English"
# Extract channel keywords and video tags
channel_keywords = channel.get('keywords', '')
video_tags = []
for tags in channel_videos['tags']:
if isinstance(tags, list):
video_tags.extend(tags)
# Detect sponsored content
sponsored_keywords = ['sponsored', 'ad', 'advertisement', 'partner', 'paid', '#ad', '#sponsored']
sponsored_count = 0
total_analyzed = 0
for title in channel_videos['title']:
if isinstance(title, str):
total_analyzed += 1
if any(kw.lower() in title.lower() for kw in sponsored_keywords):
sponsored_count += 1
for desc in channel_videos['description']:
if isinstance(desc, str):
# Only count unique videos
if total_analyzed < len(channel_videos):
total_analyzed += 1
if any(kw.lower() in desc.lower() for kw in sponsored_keywords):
sponsored_count += 1
# Calculate sponsored content ratio
sponsored_ratio = sponsored_count / max(1, total_analyzed)
# Analyze audience sentiment and authenticity
comment_sentiment = 0.5
comment_authenticity = 0.5
if not comments.empty:
channel_comments = comments[comments['video_id'].isin(channel_videos['video_id'])].copy()
if not channel_comments.empty:
try:
audience_analysis = self.audience_analyzer.analyze_audience_from_comments(channel_comments)
comment_sentiment = audience_analysis.get('sentiment_score', 0.5)
comment_authenticity = audience_analysis.get('authenticity_score', 0.5)
except Exception as e:
logger.warning(f"Could not analyze audience for channel {channel_id}: {e}")
# Estimate audience demographics
audience_type = "Unknown"
if len(categories) > 0:
# Use audience analyzer if available
if hasattr(self.audience_analyzer, 'estimate_demographics'):
try:
demographics = self.audience_analyzer.estimate_demographics(
channel_data=channel.to_dict(),
video_stats=channel_videos,
comments=channel_comments if 'channel_comments' in locals() else pd.DataFrame()
)
# Extract primary demographic info
primary_age = max(demographics.get('age_groups', {}).items(), key=lambda x: x[1])[0]
primary_gender = max(demographics.get('gender_split', {}).items(), key=lambda x: x[1])[0]
if primary_gender == 'male' and primary_age in ['13-17', '18-24']:
audience_type = "Young Male Adults"
elif primary_gender == 'female' and primary_age in ['13-17', '18-24']:
audience_type = "Young Female Adults"
elif primary_age in ['25-34', '35-44']:
audience_type = "Adults 25-44"
elif primary_age in ['45-54', '55+']:
audience_type = "Adults 45+"
else:
# Fall back to category-based audience type
if any('gaming' in c.lower() for c in categories):
audience_type = "Gaming Enthusiasts"
elif any('beauty' in c.lower() for c in categories):
audience_type = "Beauty Enthusiasts"
elif any('tech' in c.lower() for c in categories):
audience_type = "Tech Enthusiasts"
else:
audience_type = "General Audience"
except Exception as e:
logger.warning(f"Error estimating demographics for channel {channel_id}: {e}")
else:
# Use category-based audience type as fallback
if any('gaming' in c.lower() for c in categories):
audience_type = "Gaming Enthusiasts"
elif any('beauty' in c.lower() for c in categories):
audience_type = "Beauty Enthusiasts"
elif any('tech' in c.lower() for c in categories):
audience_type = "Tech Enthusiasts"
else:
audience_type = "General Audience"
# Extract category and content types
category_niche = "/".join(set(categories[:3])) if categories else "general"
content_types = "/".join(self._extract_content_types(channel_videos)) if len(channel_videos) > 0 else "general"
# Extract audience interests
audience_interests = []
if hasattr(self.audience_analyzer, 'analyze_audience_interests'):
try:
audience_interests = self.audience_analyzer.analyze_audience_interests(channel_videos)
except Exception as e:
logger.warning(f"Error analyzing audience interests for channel {channel_id}: {e}")
# Fallback to video tags for audience interests
if not audience_interests and video_tags:
tag_counts = {}
for tag in video_tags:
if isinstance(tag, str):
tag_counts[tag] = tag_counts.get(tag, 0) + 1
sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
audience_interests = [tag for tag, _ in sorted_tags[:5]]
audience_interests_str = "/".join(audience_interests) if audience_interests else "general"
# Set platform
platform = "YouTube"
# Detect collaborations
collaboration_count = 0
collab_keywords = ['collab', 'featuring', 'feat', 'ft.', 'with', 'x ', ' x ']
for title in channel_videos['title']:
if isinstance(title, str) and any(kw.lower() in title.lower() for kw in collab_keywords):
collaboration_count += 1
for desc in channel_videos['description']:
if isinstance(desc, str) and any(kw.lower() in desc.lower() for kw in collab_keywords):
# Avoid double counting
if collaboration_count < len(channel_videos):
collaboration_count += 1
# Calculate reputation score based on comment sentiment
# Calculate reputation score based on comment sentiment
reputation_score = round(comment_sentiment, 2)
# Calculate follower quality score
avg_platform_er = 0.015 # Average engagement rate on YouTube
if engagement_rate_formatted > 0:
follower_quality_score = round(min(0.99, max(0.1, engagement_rate_formatted / avg_platform_er * 0.5)), 2)
else:
follower_quality_score = 0.1
# Calculate content originality score
if hasattr(self.content_analyzer, 'calculate_content_originality'):
try:
content_originality_raw = self.content_analyzer.calculate_content_originality(channel_videos)
content_originality_score = round(min(0.99, max(0.1, content_originality_raw / 10)), 2)
except Exception as e:
logger.warning(f"Error calculating content originality for channel {channel_id}: {e}")
# Fallback method for content originality
title_word_set = set()
title_word_count = 0
for title in channel_videos['title']:
if isinstance(title, str):
words = re.findall(r'\b\w+\b', title.lower())
title_word_set.update(words)
title_word_count += len(words)
title_uniqueness = len(title_word_set) / max(1, title_word_count)
content_originality_score = round(min(0.99, max(0.1, 0.5 + title_uniqueness * 0.4)), 2)
else:
# Fallback if content analyzer method not available
title_word_set = set()
title_word_count = 0
for title in channel_videos['title']:
if isinstance(title, str):
words = re.findall(r'\b\w+\b', title.lower())
title_word_set.update(words)
title_word_count += len(words)
title_uniqueness = len(title_word_set) / max(1, title_word_count)
content_originality_score = round(min(0.99, max(0.1, 0.5 + title_uniqueness * 0.4)), 2)
# Calculate comment authenticity score
if not comments.empty and 'channel_comments' in locals() and not channel_comments.empty:
unique_commenters = len(channel_comments['author'].unique())
total_comments = len(channel_comments)
if total_comments > 0:
# Calculate ratio of unique commenters to total comments
uniqueness_ratio = unique_commenters / total_comments
comment_authenticity_score = round(min(0.99, max(0.1, 0.3 + uniqueness_ratio * 0.6)), 2)
else:
comment_authenticity_score = 0.5
else:
comment_authenticity_score = 0.5
# Get subscriber count for cost estimation
subscriber_count = channel.get('subscriber_count', 0)
if not isinstance(subscriber_count, (int, float)) or pd.isna(subscriber_count):
subscriber_count = 0
# Calculate cost per post
cost_per_post = round(self._estimate_cost_per_post(subscriber_count, engagement_rate_formatted))
# Determine negotiation flexibility
try:
channel_age_days = (datetime.now() - pd.to_datetime(channel['published_at'])).days
# New channels or very active ones tend to be more flexible
if channel_age_days < 365 or post_frequency > 8:
negotiation_flexibility = "flexible"
# Well-established channels with high engagement tend to be strict
elif channel_age_days > 1825 and engagement_rate > 5:
negotiation_flexibility = "strict"
# Moderate flexibility for channels with good engagement
elif engagement_rate > 3:
negotiation_flexibility = "medium"
else:
negotiation_flexibility = "negotiable"
except:
# Default if we can't calculate
negotiation_flexibility = "negotiable"
# Calculate historical performance
if subscriber_count > 0:
historical_perf = round(min(0.99, avg_views / subscriber_count), 2)
else:
# Fallback based on engagement rate
historical_perf = round(min(0.99, max(0.01, engagement_rate_formatted * 10)), 2)
# Check for controversy flags
controversy_flag = "false"
if 'like_count' in channel_videos.columns and 'dislike_count' in channel_videos.columns:
# YouTube API doesn't expose dislikes anymore, but keeping this code for future reference
total_likes = channel_videos['like_count'].sum()
total_dislikes = channel_videos['dislike_count'].sum() if 'dislike_count' in channel_videos.columns else 0
if total_likes + total_dislikes > 0:
dislike_ratio = total_dislikes / (total_likes + total_dislikes)
if dislike_ratio > 0.25: # More than 25% dislikes indicates controversy
controversy_flag = "true"
# Check compliance status
compliance_status = "verified"
if any(channel_videos['made_for_kids'] == True) and any(title.lower().find('adult') >= 0 for title in channel_videos['title'] if isinstance(title, str)):
# Potential mismatch between content marking and actual content
compliance_status = "review_needed"
# Create influencer entry
influencer = {
"influencer_id": influencer_id,
"name": str(channel.get('title', f"Channel {channel_id}")),
"platform": platform,
"location": country_name,
"languages": language_name,
"category_niche": category_niche,
"follower_count": int(subscriber_count),
"audience_demographics": audience_type,
"engagement_rate": engagement_rate_formatted,
"audience_interests": audience_interests_str,
"content_types": content_types,
"post_frequency_month": round(post_frequency, 1),
"avg_views": int(avg_views),
"collaboration_count": collaboration_count,
"sponsored_ratio": round(sponsored_ratio, 2),
"reputation_score": reputation_score,
"follower_quality_score": follower_quality_score,
"content_originality_score": content_originality_score,
"comment_authenticity_score": comment_authenticity_score,
"cost_per_post": int(cost_per_post),
"negotiation_flexibility": negotiation_flexibility,
"historical_performance": historical_perf,
"controversy_flag": controversy_flag,
"compliance_status": compliance_status
}
influencer_data.append(influencer)
logger.info(f"Processed influencer: {influencer['name']} ({influencer_id})")
except Exception as e:
logger.error(f"Error processing channel {channel.get('channel_id')}: {str(e)}")
logger.error(traceback.format_exc())
if not influencer_data:
logger.warning("No influencer data was generated")
# Return empty DataFrame with expected columns
return pd.DataFrame(columns=[
"influencer_id", "name", "platform", "location", "languages",
"category_niche", "follower_count", "audience_demographics",
"engagement_rate", "audience_interests", "content_types",
"post_frequency_month", "avg_views", "collaboration_count",
"sponsored_ratio", "reputation_score", "follower_quality_score",
"content_originality_score", "comment_authenticity_score",
"cost_per_post", "negotiation_flexibility", "historical_performance",
"controversy_flag", "compliance_status"
])
return pd.DataFrame(influencer_data)
def _extract_content_types(self, videos_df: pd.DataFrame) -> List[str]:
content_type_keywords = {
'review': ['review', 'unboxing', 'first look', 'hands-on'],
'tutorial': ['tutorial', 'how to', 'guide', 'tips', 'learn'],
'gameplay': ['gameplay', 'playthrough', 'gaming', 'let\'s play'],
'vlog': ['vlog', 'day in the life', 'follow me'],
'interview': ['interview', 'qa', 'q&a', 'questions'],
'reaction': ['reaction', 'reacting to', 'react'],
'podcast': ['podcast', 'talk show', 'discussion'],
'education': ['explained', 'educational', 'learn', 'course'],
'lifestyle': ['lifestyle', 'routine', 'tour'],
'recipes': ['recipe', 'cooking', 'baking', 'food'],
'workout': ['workout', 'exercise', 'fitness', 'training']
}
content_types_count = {ct: 0 for ct in content_type_keywords}
for _, video in videos_df.iterrows():
title = video.get('title', '').lower() if isinstance(video.get('title'), str) else ''
description = video.get('description', '').lower() if isinstance(video.get('description'), str) else ''
for content_type, keywords in content_type_keywords.items():
for keyword in keywords:
if keyword in title or keyword in description:
content_types_count[content_type] += 1
break
top_content_types = sorted(content_types_count.items(), key=lambda x: x[1], reverse=True)
return [ct for ct, count in top_content_types if count > 0][:3]
def _estimate_cost_per_post(self, followers: int, engagement_rate: float) -> float:
try:
followers = int(followers) if pd.notnull(followers) else 0
engagement_rate = float(engagement_rate) if pd.notnull(engagement_rate) else 0
if followers < 10000:
base_cost = 20 + (followers / 10000) * 80
elif followers < 100000:
base_cost = 100 + (followers - 10000) * (400 / 90000)
elif followers < 500000:
base_cost = 500 + (followers - 100000) * (4500 / 400000)
elif followers < 1000000:
base_cost = 5000 + (followers - 500000) * (5000 / 500000)
else:
base_cost = 10000 + (followers - 1000000) * 0.005
avg_engagement = 0.02
if engagement_rate > 0:
engagement_multiplier = max(0.5, min(3.0, engagement_rate / avg_engagement))
else:
engagement_multiplier = 0.5
return base_cost * engagement_multiplier
except Exception as e:
logger.error(f"Error estimating cost per post: {str(e)}")
return 100



Comments