import pandas as pd
import ta # For technical indicators
import tensorflow as tf
from stable_baselines3 import PPO # Deep Reinforcement Learning Model
import asyncio # For asynchronous operations
import aiohttp # For asynchronous HTTP requests
from transformers import pipeline # Huggingface for news sentiment
from datetime import datetime, timedelta
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import gym
import MetaTrader5 as mt5 # Assuming MetaTrader5 is imported
# Replace with your login details
account = 165142091 # Your account number
password = "pec100demo@xm"
server = "XMGlobal-MT5 2"
# Initialize MT5 connection
if not mt5.initialize(login=account, password=password, server=server):
print("Failed to initialize MT5, error:", mt5.last_error())
quit()
else:
print("Logged in successfully!")
account_info = mt5.account_info()
if account_info is None:
print("Failed to get account info, error:", mt5.last_error())
else:
print(account_info)
# Async function to fetch OHLC data for a symbol
async def fetch_data(symbol, timeframe, count=1000):
loop = asyncio.get_event_loop()
rates = await loop.run_in_executor(None, lambda: mt5.copy_rates_from_pos(symbol, timeframe, 0, count))
data = pd.DataFrame(rates)
data['time'] = pd.to_datetime(data['time'], unit='s')
data.set_index('time', inplace=True)
# Calculate technical indicators
data['EMA_50'] = ta.trend.ema_indicator(data['close'], window=50)
data['SMA_100'] = ta.trend.sma_indicator(data['close'], window=100)
data['RSI'] = ta.momentum.rsi(data['close'], window=14)
data['MACD'] = ta.trend.macd(data['close'])
data['SuperTrend'] = ta.volatility.AverageTrueRange(high=data['high'], low=data['low'], close=data['close']).average_true_range()
data['AwesomeOscillator'] = ta.momentum.awesome_oscillator(data['high'], data['low'])
return data
# Async function to fetch news
async def fetch_news(symbol):
api_url = f"https://newsapi.org/v2/everything?q={symbol}&apiKey=YOUR_API_KEY"
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
news_data = await response.json()
return news_data['articles']
# Analyze the sentiment of news
def analyze_news(articles):
sentiment_pipeline = pipeline('sentiment-analysis')
sentiment_scores = []
for article in articles:
sentiment = sentiment_pipeline(article['description'])[0]
sentiment_scores.append(sentiment['score'] if sentiment['label'] == 'POSITIVE' else -sentiment['score'])
return np.mean(sentiment_scores)
# Asynchronously process news for multiple assets
async def process_news_for_assets(symbols):
news_sentiments = {}
for symbol in symbols:
articles = await fetch_news(symbol)
news_sentiments[symbol] = analyze_news(articles)
return news_sentiments
# Fetch and process data for multiple assets asynchronously
async def fetch_all_data(symbols, timeframe):
tasks = [fetch_data(symbol, timeframe) for symbol in symbols]
results = await asyncio.gather(*tasks)
return dict(zip(symbols, results))
# Class representing the trading environment for Reinforcement Learning
class TradingEnv(gym.Env):
def __init__(self, data, news_sentiment, target_profit, time_limit):
super(TradingEnv, self).__init__()
self.data = data
self.news_sentiment = news_sentiment # News sentiment incorporated into the state
self.current_step = 0
self.initial_balance = 10000
self.balance = self.initial_balance
self.positions = []
self.done = False
self.target_profit = target_profit # Desired profit to achieve
self.start_time = datetime.now()
self.time_limit = timedelta(minutes=time_limit) # Time frame to reach the target profit
# Define action and observation space
self.action_space = gym.spaces.Discrete(3) # 0 = Hold, 1 = Buy, 2 = Sell
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(len(self.data.columns) + 1,), dtype=np.float16)
def reset(self):
self.balance = self.initial_balance
self.positions = []
self.current_step = 0
self.start_time = datetime.now()
return self._next_observation()
def _next_observation(self):
obs = self.data.iloc[self.current_step].values
obs = np.append(obs, self.news_sentiment) # Add sentiment as part of the observation
scaler = MinMaxScaler()
obs = scaler.fit_transform([obs])[0]
return obs
def step(self, action):
reward = 0
self.current_step += 1
current_price = self.data['close'].iloc[self.current_step]
if action == 1: # Buy
self.positions.append(current_price)
elif action == 2: # Sell
if len(self.positions) > 0:
bought_price = self.positions.pop(0)
profit = current_price - bought_price
reward += profit
self.balance += profit
# Add trailing stop-loss and take-profit logic here
# Example: Adjust TP based on current price
trailing_distance = 50
if len(self.positions) > 0:
bought_price = self.positions[0]
new_tp = current_price - trailing_distance * 0.0001
if new_tp > bought_price + trailing_distance * 0.0001:
# Adjust TP logic
pass
# Check if target profit or time limit is reached
if self.balance - self.initial_balance >= self.target_profit:
self.done = True
print(f"Target profit reached: {self.balance - self.initial_balance}")
elif datetime.now() - self.start_time > self.time_limit:
self.done = True
print(f"Time limit reached: Balance = {self.balance}")
return self._next_observation(), reward, self.done, {}
# Initialize environment and PPO model
async def main():
symbols = ['EURUSD', 'BTCUSD']
timeframe = mt5.TIMEFRAME_H1
target_profit = 500 # User-defined target profit
time_limit = 60 # Time limit in minutes
# Fetch data and news asynchronously
asset_data = await fetch_all_data(symbols, timeframe)
news_sentiment = await process_news_for_assets(symbols)
# Initialize trading environment
env = TradingEnv(asset_data['EURUSD'], news_sentiment['EURUSD'], target_profit, time_limit)
# Initialize PPO model
model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)
# Run the async main function
asyncio.run(main())
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter