bot
Thu Oct 10 2024 22:21:06 GMT+0000 (Coordinated Universal Time)
Saved by @johnsoap1
import logging
from telegram import Update
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from collections import defaultdict
import time
from threading import Thread
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Predefined admin user ID
ADMIN_ID = 123456789 # Replace with your actual Telegram user ID
# Store user data
user_media_count = defaultdict(int)
shared_content = defaultdict(list) # Store content temporarily
whitelist = set() # Whitelisted user IDs
banned_users = set() # Banned user IDs
user_deadlines = {} # Deadlines for media submissions
# Cleanup function to remove expired content
def cleanup_expired_content():
while True:
current_time = time.time()
for user_id in list(user_deadlines.keys()):
if current_time >= user_deadlines[user_id]:
banned_users.add(user_id)
del user_deadlines[user_id]
logger.info(f"User {user_id} has been banned for not submitting media.")
time.sleep(60) # Check every minute
# Start command
def start(update: Update, context: CallbackContext) -> None:
user_id = update.message.from_user.id
if user_id in banned_users:
return # No response for banned users
user_deadlines[user_id] = time.time() + 86400 # 24 hours in seconds
update.message.reply_text(
'Welcome to the Anonymous Sharing Bot! '
'You need to send 2 media files within 24 hours to remain in the bot. '
'Share media files to gain access to shared content or send a message anonymously.'
)
# Countdown command
def countdown(update: Update, context: CallbackContext) -> None:
user_id = update.message.from_user.id
if user_id in banned_users:
return # No response for banned users
if user_id in user_deadlines:
remaining_time = int(user_deadlines[user_id] - time.time())
if remaining_time > 0:
hours, remainder = divmod(remaining_time, 3600)
minutes, seconds = divmod(remainder, 60)
update.message.reply_text(f"You have {hours} hours, {minutes} minutes, and {seconds} seconds left to submit your media.")
else:
update.message.reply_text("Your time has expired. You have been banned.")
else:
update.message.reply_text("You haven't started the submission process yet.")
# Help command
def help_command(update: Update, context: CallbackContext) -> None:
commands = (
"/start - Start using the bot\n"
"/help - Show this help message\n"
"/countdown - Check remaining time to submit media\n"
"/wl_add <user_id> - Whitelist a user\n"
"/ban <user_id> - Ban a user\n"
"/delban <message_id> - Delete a message and ban the user\n"
"/forward <media_id> - Forward media to your own cloud or group"
)
update.message.reply_text(f"Available commands:\n{commands}")
# Acknowledgment for media uploads
def acknowledge_upload(update: Update, context: CallbackContext) -> None:
update.message.reply_text("Your media has been received successfully!")
# Add to whitelist command
def wl_add(update: Update, context: CallbackContext) -> None:
if update.message.from_user.id != ADMIN_ID:
update.message.reply_text("You do not have permission to whitelist users.")
return
if not context.args:
update.message.reply_text("Please provide a Telegram ID to whitelist.")
return
user_id = int(context.args[0])
whitelist.add(user_id)
update.message.reply_text(f"User {user_id} has been whitelisted.")
# Ban command
def ban_user(update: Update, context: CallbackContext) -> None:
if update.message.from_user.id != ADMIN_ID:
update.message.reply_text("You do not have permission to ban users.")
return
user_id = update.message.reply_to_message.from_user.id if update.message.reply_to_message else None
if user_id:
banned_users.add(user_id)
del user_deadlines[user_id] # Remove deadline if user is banned
update.message.reply_text(f"User {user_id} has been banned from using the bot.")
else:
update.message.reply_text("No user found to ban.")
# Delete and ban command
def delban(update: Update, context: CallbackContext) -> None:
if update.message.from_user.id != ADMIN_ID:
update.message.reply_text("You do not have permission to use this command.")
return
if not context.args:
update.message.reply_text("Please provide the message ID to delete and ban the user.")
return
message_id = int(context.args[0])
user_id = update.message.reply_to_message.from_user.id if update.message.reply_to_message else None
if user_id:
banned_users.add(user_id)
del user_deadlines[user_id] # Remove deadline if user is banned
try:
chat_id = update.effective_chat.id
context.bot.delete_message(chat_id=chat_id, message_id=message_id)
update.message.reply_text(f"Message {message_id} has been deleted and user {user_id} has been banned.")
except Exception as e:
logger.error(f"Error deleting message or banning user: {e}")
update.message.reply_text("An error occurred while trying to delete the message or ban the user.")
else:
update.message.reply_text("No user found to ban.")
# Handle media messages
def handle_media(update: Update, context: CallbackContext) -> None:
user_id = update.message.from_user.id
if user_id in banned_users:
return # No response for banned users
user_media_count[user_id] += 1
try:
if update.message.photo:
media_type = "image"
media = update.message.photo[-1].file_id
elif update.message.video:
media_type = "video"
media = update.message.video.file_id
else:
update.message.reply_text('Please share an image or video.')
return
shared_content[user_id].append({'type': media_type, 'media': media, 'timestamp': time.time()})
acknowledge_upload(update, context) # Acknowledge the upload
# Reset timer if the user has sent enough media
if user_media_count[user_id] <= 2:
user_deadlines[user_id] = time.time() + 86400 # Reset deadline to 24 hours from now
if user_media_count[user_id] == 2:
update.message.reply_text('Thank you for sharing 2 media files! You are allowed to stay in the bot.')
if user_media_count[user_id] >= 5:
update.message.reply_text('Thank you for sharing! You now have access to all shared content.')
# Notify user of their media count
update.message.reply_text(f"You have uploaded {user_media_count[user_id]} media files.")
except Exception as e:
logger.error(f"Error handling media from user {user_id}: {e}")
update.message.reply_text("An error occurred while processing your media. Please try again.")
# Forward media to user-defined destinations
def forward_media(update: Update, context: CallbackContext) -> None:
if not context.args:
update.message.reply_text("Please provide the media ID you want to forward.")
return
media_id = context.args[0]
user_id = update.message.from_user.id
# Check if media exists for the user
if user_id in shared_content:
for content in shared_content[user_id]:
if content['media'] == media_id:
if content['type'] == 'image':
context.bot.send_photo(chat_id=update.effective_chat.id, photo=media_id)
elif content['type'] == 'video':
context.bot.send_video(chat_id=update.effective_chat.id, video=media_id)
update.message.reply_text("Media has been forwarded successfully.")
return
update.message.reply_text("Media not found.")
# Handle text messages
def handle_text(update: Update, context: CallbackContext) -> None:
user_id = update.message.from_user.id
if user_id in banned_users:
return # No response for banned users
if user_media_count[user_id] < 5:
update.message.reply_text('You need to share 5 media files first to access this feature.')
return
try:
shared_content[user_id].append({'type': 'text', 'message': update.message.text, 'timestamp': time.time()})
update.message.reply_text('Your anonymous message has been shared!')
except Exception as e:
logger.error(f"Error handling text from user {user_id}: {e}")
update.message.reply_text("An error occurred while sharing your message. Please try again.")
def main() -> None:
# Create Updater and pass it your bot's token
updater = Updater("YOUR_BOT_TOKEN") # Replace with your bot token
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Register handlers
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("countdown", countdown))
dp.add_handler(CommandHandler("help", help_command))
dp.add_handler(CommandHandler("wl_add", wl_add))
dp.add_handler(CommandHandler("ban", ban_user))
dp.add_handler(CommandHandler("delban", delban))
dp.add_handler(MessageHandler(Filters.photo | Filters.video, handle_media))
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, handle_text))
# Start the Bot
updater.start_polling()
# Start the cleanup thread
Thread(target=cleanup_expired_content, daemon=True).start()
# Run the bot until you send a signal to stop
updater.idle()
if __name__ == '__main__':
main()



Comments