import pandas as pd import ta # For technical indicators import tensorflow as tf from stable_baselines3 import PPO # Deep Reinforcement Learning Model import asyncio # For asynchronous operations import aiohttp # For asynchronous HTTP requests from transformers import pipeline # Huggingface for news sentiment from datetime import datetime, timedelta import numpy as np from sklearn.preprocessing import MinMaxScaler import gym import MetaTrader5 as mt5 # Assuming MetaTrader5 is imported # Replace with your login details account = 165142091 # Your account number password = "pec100demo@xm" server = "XMGlobal-MT5 2" # Initialize MT5 connection if not mt5.initialize(login=account, password=password, server=server): print("Failed to initialize MT5, error:", mt5.last_error()) quit() else: print("Logged in successfully!") account_info = mt5.account_info() if account_info is None: print("Failed to get account info, error:", mt5.last_error()) else: print(account_info) # Async function to fetch OHLC data for a symbol async def fetch_data(symbol, timeframe, count=1000): loop = asyncio.get_event_loop() rates = await loop.run_in_executor(None, lambda: mt5.copy_rates_from_pos(symbol, timeframe, 0, count)) data = pd.DataFrame(rates) data['time'] = pd.to_datetime(data['time'], unit='s') data.set_index('time', inplace=True) # Calculate technical indicators data['EMA_50'] = ta.trend.ema_indicator(data['close'], window=50) data['SMA_100'] = ta.trend.sma_indicator(data['close'], window=100) data['RSI'] = ta.momentum.rsi(data['close'], window=14) data['MACD'] = ta.trend.macd(data['close']) data['SuperTrend'] = ta.volatility.AverageTrueRange(high=data['high'], low=data['low'], close=data['close']).average_true_range() data['AwesomeOscillator'] = ta.momentum.awesome_oscillator(data['high'], data['low']) return data # Async function to fetch news async def fetch_news(symbol): api_url = f"https://newsapi.org/v2/everything?q={symbol}&apiKey=YOUR_API_KEY" async with aiohttp.ClientSession() as session: async with session.get(api_url) as response: news_data = await response.json() return news_data['articles'] # Analyze the sentiment of news def analyze_news(articles): sentiment_pipeline = pipeline('sentiment-analysis') sentiment_scores = [] for article in articles: sentiment = sentiment_pipeline(article['description'])[0] sentiment_scores.append(sentiment['score'] if sentiment['label'] == 'POSITIVE' else -sentiment['score']) return np.mean(sentiment_scores) # Asynchronously process news for multiple assets async def process_news_for_assets(symbols): news_sentiments = {} for symbol in symbols: articles = await fetch_news(symbol) news_sentiments[symbol] = analyze_news(articles) return news_sentiments # Fetch and process data for multiple assets asynchronously async def fetch_all_data(symbols, timeframe): tasks = [fetch_data(symbol, timeframe) for symbol in symbols] results = await asyncio.gather(*tasks) return dict(zip(symbols, results)) # Class representing the trading environment for Reinforcement Learning class TradingEnv(gym.Env): def __init__(self, data, news_sentiment, target_profit, time_limit): super(TradingEnv, self).__init__() self.data = data self.news_sentiment = news_sentiment # News sentiment incorporated into the state self.current_step = 0 self.initial_balance = 10000 self.balance = self.initial_balance self.positions = [] self.done = False self.target_profit = target_profit # Desired profit to achieve self.start_time = datetime.now() self.time_limit = timedelta(minutes=time_limit) # Time frame to reach the target profit # Define action and observation space self.action_space = gym.spaces.Discrete(3) # 0 = Hold, 1 = Buy, 2 = Sell self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(len(self.data.columns) + 1,), dtype=np.float16) def reset(self): self.balance = self.initial_balance self.positions = [] self.current_step = 0 self.start_time = datetime.now() return self._next_observation() def _next_observation(self): obs = self.data.iloc[self.current_step].values obs = np.append(obs, self.news_sentiment) # Add sentiment as part of the observation scaler = MinMaxScaler() obs = scaler.fit_transform([obs])[0] return obs def step(self, action): reward = 0 self.current_step += 1 current_price = self.data['close'].iloc[self.current_step] if action == 1: # Buy self.positions.append(current_price) elif action == 2: # Sell if len(self.positions) > 0: bought_price = self.positions.pop(0) profit = current_price - bought_price reward += profit self.balance += profit # Add trailing stop-loss and take-profit logic here # Example: Adjust TP based on current price trailing_distance = 50 if len(self.positions) > 0: bought_price = self.positions[0] new_tp = current_price - trailing_distance * 0.0001 if new_tp > bought_price + trailing_distance * 0.0001: # Adjust TP logic pass # Check if target profit or time limit is reached if self.balance - self.initial_balance >= self.target_profit: self.done = True print(f"Target profit reached: {self.balance - self.initial_balance}") elif datetime.now() - self.start_time > self.time_limit: self.done = True print(f"Time limit reached: Balance = {self.balance}") return self._next_observation(), reward, self.done, {} # Initialize environment and PPO model async def main(): symbols = ['EURUSD', 'BTCUSD'] timeframe = mt5.TIMEFRAME_H1 target_profit = 500 # User-defined target profit time_limit = 60 # Time limit in minutes # Fetch data and news asynchronously asset_data = await fetch_all_data(symbols, timeframe) news_sentiment = await process_news_for_assets(symbols) # Initialize trading environment env = TradingEnv(asset_data['EURUSD'], news_sentiment['EURUSD'], target_profit, time_limit) # Initialize PPO model model = PPO('MlpPolicy', env, verbose=1) model.learn(total_timesteps=10000) # Run the async main function asyncio.run(main())
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter