import concurrent.futures
import functools
import multiprocessing as mp
import sys
import time
from datetime import date, datetime
from multiprocessing.synchronize import Event
from typing import Callable, Dict, List, Literal, Optional
import pandas as pd
from loguru import logger as log
from bbstrader.config import BBSTRADER_DIR
from bbstrader.core.strategy import Strategy, TradeAction
from bbstrader.metatrader.account import check_mt5_connection
from bbstrader.metatrader.trade import Trade
from bbstrader.trading.strategy import LiveStrategy
from bbstrader.trading.utils import send_message
try:
import MetaTrader5 as MT5
except ImportError:
import bbstrader.compat # noqa: F401
__all__ = ["Mt5ExecutionEngine", "RunMt5Engine", "RunMt5Engines"]
_TF_MAPPING = {
"1m": 1,
"3m": 3,
"5m": 5,
"10m": 10,
"15m": 15,
"30m": 30,
"1h": 60,
"2h": 120,
"4h": 240,
"6h": 360,
"8h": 480,
"12h": 720,
"D1": 1440,
}
MT5_ENGINE_TIMEFRAMES = list(_TF_MAPPING.keys())
TradingDays = ["monday", "tuesday", "wednesday", "thursday", "friday"]
WEEK_DAYS = TradingDays + ["saturday", "sunday"]
FRIDAY = "friday"
WEEK_ENDS = ["friday", "saturday", "sunday"]
BUYS = ["BMKT", "BLMT", "BSTP", "BSTPLMT"]
SELLS = ["SMKT", "SLMT", "SSTP", "SSTPLMT"]
ORDERS_TYPES = [
"orders",
"buy_stops",
"sell_stops",
"buy_limits",
"sell_limits",
"buy_stop_limits",
"sell_stop_limits",
]
POSITIONS_TYPES = ["positions", "buys", "sells", "profitables", "losings"]
ACTIONS = ["buys", "sells"]
STOPS = ["buy_stops", "sell_stops"]
LIMITS = ["buy_limits", "sell_limits"]
STOP_LIMITS = ["buy_stop_limits", "sell_stop_limits"]
EXIT_SIGNAL_ACTIONS = {
"EXIT": {a: a[:-1] for a in ACTIONS},
"EXIT_LONG": {"buys": "buy"},
"EXIT_SHORT": {"sells": "sell"},
"EXIT_STOP": {stop: stop for stop in STOPS},
"EXIT_LONG_STOP": {"buy_stops": "buy_stops"},
"EXIT_SHORT_STOP": {"sell_stops": "sell_stops"},
"EXIT_LIMIT": {limit: limit for limit in LIMITS},
"EXIT_LONG_LIMIT": {"buy_limits": "buy_limits"},
"EXIT_SHORT_LIMIT": {"sell_limits": "sell_limits"},
"EXIT_STOP_LIMIT": {sl: sl for sl in STOP_LIMITS},
"EXIT_LONG_STOP_LIMIT": {STOP_LIMITS[0]: STOP_LIMITS[0]},
"EXIT_SHORT_STOP_LIMIT": {STOP_LIMITS[1]: STOP_LIMITS[1]},
"EXIT_PROFITABLES": {"profitables": "profitable"},
"EXIT_LOSINGS": {"losings": "losing"},
"EXIT_ALL_POSITIONS": {"positions": "all"},
"EXIT_ALL_ORDERS": {"orders": "all"},
}
COMMON_RETCODES = [MT5.TRADE_RETCODE_MARKET_CLOSED, MT5.TRADE_RETCODE_CLOSE_ONLY]
NON_EXEC_RETCODES = {
"BMKT": [MT5.TRADE_RETCODE_SHORT_ONLY] + COMMON_RETCODES,
"SMKT": [MT5.TRADE_RETCODE_LONG_ONLY] + COMMON_RETCODES,
}
log.add(
f"{BBSTRADER_DIR}/logs/execution.log",
enqueue=True,
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name} | {message}",
)
[docs]
class Mt5ExecutionEngine:
"""
The `Mt5ExecutionEngine` class serves as the central hub for executing your trading strategies within the `bbstrader` framework.
It orchestrates the entire trading process, ensuring seamless interaction between your strategies, market data, and your chosen
trading platform.
Key Features
------------
- **Strategy Execution:** The `Mt5ExecutionEngine` is responsible for running your strategy, retrieving signals, and executing trades based on those signals.
- **Time Management:** You can define a specific time frame for your trades and set the frequency with which the engine checks for signals and manages trades.
- **Trade Period Control:** Define whether your strategy runs for a day, a week, or a month, allowing for flexible trading durations.
- **Money Management:** The engine supports optional money management features, allowing you to control risk and optimize your trading performance.
- **Trading Day Configuration:** You can customize the days of the week your strategy will execute, providing granular control over your trading schedule.
- **Platform Integration:** The `Mt5ExecutionEngine` is currently designed to work with MT5.
Examples
--------
>>> from bbstrader.metatrader import create_trade_instance
>>> from bbstrader.trading.execution import Mt5ExecutionEngine
>>> from examples.strategies import StockIndexSTBOTrading
>>> from bbstrader.config import config_logger
>>>
>>> if __name__ == '__main__':
>>> logger = config_logger(index_trade.log, console_log=True)
>>> # Define symbols
>>> ndx = '[NQ100]'
>>> spx = '[SP500]'
>>> dji = '[DJI30]'
>>> dax = 'GERMANY40'
>>>
>>> symbol_list = [spx, dax, dji, ndx]
>>>
>>> trade_kwargs = {
... 'expert_id': 5134,
... 'version': 2.0,
... 'time_frame': '15m',
... 'var_level': 0.99,
... 'start_time': '8:30',
... 'finishing_time': '19:30',
... 'ending_time': '21:30',
... 'max_risk': 5.0,
... 'daily_risk': 0.10,
... 'pchange_sl': 1.5,
... 'rr': 3.0,
... 'logger': logger
... }
>>> strategy_kwargs = {
... 'max_trades': {ndx: 3, spx: 3, dji: 3, dax: 3},
... 'expected_returns': {ndx: 1.5, spx: 1.5, dji: 1.0, dax: 1.0},
... 'strategy_name': 'SISTBO',
... 'logger': logger,
... 'expert_id': 5134
... }
>>> trades_instances = create_trade_instance(
... symbol_list, trade_kwargs,
... logger=logger,
... )
>>>
>>> engine = Mt5ExecutionEngine(
... symbol_list,
... trades_instances,
... StockIndexCFDTrading,
... time_frame='15m',
... iter_time=5,
... mm=True,
... period='week',
... comment='bbs_SISTBO_@2.0',
... **strategy_kwargs
... )
>>> engine.run()
"""
def __init__(
self,
symbol_list: List[str],
trades_instances: Dict[str, Trade],
strategy_cls: Strategy | LiveStrategy,
/,
mm: bool = True,
auto_trade: bool = True,
prompt_callback: Callable = None,
multithread: bool = False,
shutdown_event: Event = None,
optimizer: str = "equal",
trail: bool = True,
stop_trail: Optional[int] = None,
trail_after_points: int | str = None,
be_plus_points: Optional[int] = None,
show_positions_orders: bool = False,
iter_time: int | float = 5,
use_trade_time: bool = True,
period: Literal["24/7", "day", "week", "month"] = "month",
period_end_action: Literal["break", "sleep"] = "sleep",
closing_pnl: Optional[float] = None,
trading_days: Optional[List[str]] = None,
comment: Optional[str] = None,
**kwargs,
):
"""
Args:
symbol_list : List of symbols to trade
trades_instances : Dictionary of Trade instances
strategy_cls : Strategy class to use for trading
mm : Enable Money Management. Defaults to True.
optimizer : Risk management optimizer. Defaults to 'equal'.
See `bbstrader.models.optimization` module for more information.
auto_trade : If set to true, when signal are generated by the strategy class,
the Execution engine will automaticaly open position in other whise it will prompt
the user for confimation.
prompt_callback : Callback function to prompt the user for confirmation.
This is useful when integrating with GUI applications.
multithread : If True, use a thread pool to process signals in parallel.
If False, process them sequentially. Set this to True only if the engine
is running in a separate process. Default to False.
shutdown_event : Use to terminate the copy process when runs in a custum environment like web App or GUI.
show_positions_orders : Print open positions and orders. Defaults to False.
iter_time : Interval to check for signals and `mm`. Defaults to 5.
use_trade_time : Open trades after the time is completed. Defaults to True.
period : Period to trade ("24/7", "day", "week", "month"). Defaults to 'week'.
period_end_action : Action to take at the end of the period ("break", "sleep"). Defaults to 'break',
this only applies when period is 'day', 'week'.
closing_pnl : Minimum profit in percentage of target profit to close positions. Defaults to -0.001.
trading_days : Trading days in a week. Defaults to monday to friday.
comment: Comment for trades. Defaults to None.
**kwargs: Additional keyword arguments
_ time_frame : Time frame to trade. Defaults to '15m'.
- strategy_name (Optional[str]): Strategy name. Defaults to None.
- max_trades (Dict[str, int]): Maximum trades per symbol. Defaults to None.
- notify (bool): Enable notifications. Defaults to False.
- telegram (bool): Enable telegram notifications. Defaults to False.
- bot_token (str): Telegram bot token. Defaults to None.
- chat_id (Union[int, str, List] ): Telegram chat id. Defaults to None.
- MT5 connection arguments.
Note:
1. For `trail` , `stop_trail` , `trail_after_points` , `be_plus_points` see `bbstrader.metatrader.trade.Trade.break_even()` .
2. All Strategies must inherit from `bbstrader.btengine.strategy.MT5Strategy` class
and have a `calculate_signals` method that returns a List of ``bbstrader.metatrader.trade.TradingSignal``.
3. All strategies must have the following arguments in their `__init__` method:
- bars (DataHandler): DataHandler instance default to None
- events (Queue): Queue instance default to None
- symbol_list (List[str]): List of symbols to trade can be none for backtesting
- mode (str): Mode of the strategy. Must be either 'live' or 'backtest'
- **kwargs: Additional keyword arguments
The keyword arguments are all the additional arguments passed to the `Mt5ExecutionEngine` class,
the `Strategy` class, the `DataHandler` class, the `Portfolio` class and the `ExecutionHandler` class.
- The `bars` and `events` arguments are used for backtesting only.
4. All strategies must generate signals for backtesting and live trading.
See the `bbstrader.trading.strategies` module for more information on how to create custom strategies.
See `bbstrader.metatrader.account.check_mt5_connection()` for more details on how to connect to MT5 terminal.
"""
self.symbols = symbol_list.copy()
self.trades_instances = trades_instances
self.strategy_cls = strategy_cls
self.mm = mm
self.auto_trade = auto_trade
self.prompt_callback = prompt_callback
self.multithread = multithread
self.optimizer = optimizer
self.trail = trail
self.stop_trail = stop_trail
self.trail_after_points = trail_after_points
self.be_plus_points = be_plus_points
self.show_positions_orders = show_positions_orders
self.iter_time = iter_time
self.use_trade_time = use_trade_time
self.period = period.strip()
self.period_end_action = period_end_action
self.closing_pnl = closing_pnl
self.comment = comment
self.kwargs = kwargs
self.time_intervals = 0
self.time_frame = kwargs.get("time_frame", "15m")
self.trade_time = _TF_MAPPING[self.time_frame]
self.long_market = {symbol: False for symbol in self.symbols}
self.short_market = {symbol: False for symbol in self.symbols}
self._initialize_engine(**kwargs)
self.strategy = self._init_strategy(**kwargs)
self.shutdown_event = (
shutdown_event if shutdown_event is not None else mp.Event()
)
self._running = True
def __repr__(self):
symbols = self.trades_instances.keys()
strategy = self.strategy_cls.__name__
return (
f"{self.__class__.__name__}(Symbols={list(symbols)}, Strategy={strategy})"
)
def _initialize_engine(self, **kwargs):
global logger
logger = kwargs.get("logger", log)
try:
self.daily_risk = kwargs.get("daily_risk")
self.notify = kwargs.get("notify", False)
self.debug_mode = kwargs.get("debug_mode", False)
self.delay = kwargs.get("delay", 0)
self.STRATEGY = kwargs.get("strategy_name")
self.ACCOUNT = kwargs.get("account", "MT5 Account")
self.signal_tickers = kwargs.get("signal_tickers", self.symbols)
self.expert_ids = self._expert_ids(kwargs.get("expert_ids"))
self.max_trades = self._max_trades(kwargs.get("max_trades"))
if self.comment is None:
trade = self.trades_instances[self.symbols[0]]
self.comment = f"{trade.expert_name}@{trade.version}"
if kwargs.get("trading_days") is None:
if self.period.lower() == "24/7":
self.trading_days = WEEK_DAYS
else:
self.trading_days = TradingDays
else:
self.trading_days = kwargs.get("trading_days")
except Exception as e:
self._print_exc(
f"Initializing Execution Engine, STRATEGY={self.STRATEGY}, ACCOUNT={self.ACCOUNT}",
e,
)
return
def _print_exc(self, msg: str, e: Exception):
if isinstance(e, KeyboardInterrupt):
logger.info("Stopping the Execution Engine ...")
self.stop()
sys.exit(0)
if self.debug_mode:
raise ValueError(msg).with_traceback(e.__traceback__)
else:
logger.error(f"{msg}: {type(e).__name__}: {str(e)}")
def _max_trades(self, mtrades):
max_trades = {
symbol: mtrades[symbol]
if mtrades is not None and isinstance(mtrades, dict) and symbol in mtrades
else self.trades_instances[symbol].max_trade()
for symbol in self.symbols
}
return max_trades
def _expert_ids(self, expert_ids):
if expert_ids is None:
expert_ids = list(
set([trade.expert_id for trade in self.trades_instances.values()])
)
elif isinstance(expert_ids, int):
expert_ids = [expert_ids]
return expert_ids
def _init_strategy(self, **kwargs) -> LiveStrategy:
try:
check_mt5_connection(**kwargs)
strategy = self.strategy_cls(symbol_list=self.symbols, **kwargs)
except Exception as e:
self._print_exc(
f"Initializing strategy, STRATEGY={self.STRATEGY}, ACCOUNT={self.ACCOUNT}",
e,
)
return
logger.info(
f"Running {self.STRATEGY} Strategy in {self.time_frame} Interval ..., ACCOUNT={self.ACCOUNT}"
)
return strategy
def _get_signal_info(self, signal, symbol, price, stoplimit, sl, tp):
account = self.strategy.account
symbol_info = account.get_symbol_info(symbol)
common_data = {
"signal": signal,
"symbol": symbol,
"strategy": self.STRATEGY,
"timeframe": self.time_frame,
"account": self.ACCOUNT,
}
info = (
"SIGNAL={signal}, SYMBOL={symbol}, STRATEGY={strategy}, "
"TIMEFRAME={timeframe}, ACCOUNT={account}"
).format(**common_data)
sigmsg = (
"SIGNAL={signal}\n"
"SYMBOL={symbol}\n"
"TYPE={symbol_type}\n"
"DESCRIPTION={description}\n"
"PRICE={price}\n"
"STOPLIMIT={stoplimit}\n"
"STOP_LOSS={sl}\n"
"TAKE_PROFIT={tp}\n"
"STRATEGY={strategy}\n"
"TIMEFRAME={timeframe}\n"
"BROKER={broker}\n"
"TIMESTAMP={timestamp}"
).format(
**common_data,
symbol_type=account.get_symbol_type(symbol).value,
description=symbol_info.description if symbol_info else "N/A",
price=price if price else "MARKET",
stoplimit=stoplimit,
sl=sl if sl else "AUTO",
tp=tp if tp else "AUTO",
broker=account.broker.name,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
msg_template = "SYMBOL={symbol}, STRATEGY={strategy}, ACCOUNT={account}"
msg = f"Sending {signal} Order ... " + msg_template.format(**common_data)
tfmsg = "Time Frame Not completed !!! " + msg_template.format(**common_data)
riskmsg = "Risk not allowed !!! " + msg_template.format(**common_data)
return info, sigmsg, msg, tfmsg, riskmsg
def _check_retcode(self, trade: Trade, position):
if len(trade.retcodes) > 0:
for retcode in trade.retcodes:
if retcode in NON_EXEC_RETCODES[position]:
return True
return False
def _check_positions_orders(self):
positions_orders = {}
try:
check_mt5_connection(**self.kwargs)
for order_type in POSITIONS_TYPES + ORDERS_TYPES:
positions_orders[order_type] = {}
for symbol in self.symbols:
positions_orders[order_type][symbol] = None
for id in self.expert_ids:
func = getattr(
self.trades_instances[symbol], f"get_current_{order_type}"
)
func_value = func(id=id)
if func_value is not None:
if positions_orders[order_type][symbol] is None:
positions_orders[order_type][symbol] = func_value
else:
positions_orders[order_type][symbol] += func_value
return positions_orders
except Exception as e:
self._print_exc(
f"Checking positions and orders, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}",
e,
)
def _long_short_market(self, buys, sells):
long_market = {
symbol: buys[symbol] is not None
and len(buys[symbol]) >= self.max_trades[symbol]
for symbol in self.symbols
}
short_market = {
symbol: sells[symbol] is not None
and len(sells[symbol]) >= self.max_trades[symbol]
for symbol in self.symbols
}
return long_market, short_market
def _display_positions_orders(self, positions_orders):
for symbol in self.symbols:
for order_type in POSITIONS_TYPES + ORDERS_TYPES:
if positions_orders[order_type][symbol] is not None:
logger.info(
f"Current {order_type.upper()} SYMBOL={symbol}: \
{positions_orders[order_type][symbol]}, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
def _send_notification(self, signal, symbol):
telegram = self.kwargs.get("telegram", False)
bot_token = self.kwargs.get("bot_token")
chat_id = self.kwargs.get("chat_id")
notify = self.kwargs.get("notify", False)
if symbol in self.signal_tickers:
send_message(
message=signal,
notify_me=notify,
telegram=telegram,
token=bot_token,
chat_id=chat_id,
)
def _logmsg(self, period, symbol):
logger.info(
f"End of the {period} !!! SYMBOL={symbol}, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
def _logmsgif(self, period, symbol):
if len(self.symbols) <= 10:
self._logmsg(period, symbol)
elif len(self.symbols) > 10 and symbol == self.symbols[-1]:
logger.info(
f"End of the {period} !!! STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
def _sleepmsg(self, sleep_time):
logger.info(f"{self.ACCOUNT} Sleeping for {sleep_time} minutes ...\n")
def _sleep_over_night(self, sessionmsg):
sleep_time = self.trades_instances[self.symbols[-1]].sleep_time()
self._sleepmsg(sleep_time + self.delay)
time.sleep(60 * sleep_time + self.delay)
logger.info(sessionmsg)
def _sleep_over_weekend(self, sessionmsg):
sleep_time = self.trades_instances[self.symbols[-1]].sleep_time(weekend=True)
self._sleepmsg(sleep_time + self.delay)
time.sleep(60 * sleep_time + self.delay)
logger.info(sessionmsg)
def _check_is_day_ends(self, trade: Trade, symbol, period_type, today, closing):
if trade.days_end() or (today in WEEK_ENDS and today != FRIDAY):
self._logmsgif("Day", symbol) if today not in WEEK_ENDS else self._logmsgif(
"Week", symbol
)
if (
(period_type == "month" and self._is_month_ends() and closing)
or (period_type == "week" and today == FRIDAY and closing)
or (period_type == "day" and closing)
or (period_type == "24/7" and closing)
):
logger.info(
f"{self.ACCOUNT} Closing all positions and orders for {symbol} ..."
)
for id in self.expert_ids:
trade.close_positions(
position_type="all", id=id, comment=self.comment
)
trade.statistics(save=True)
def _is_month_ends(self):
today = pd.Timestamp(date.today())
last_business_day = today + pd.tseries.offsets.BMonthEnd(0)
return today == last_business_day
def _daily_end_checks(self, today, closing, sessionmsg):
self.strategy.perform_period_end_checks()
if self.period_end_action == "break" and closing:
sys.exit(0)
elif self.period_end_action == "sleep" and today not in WEEK_ENDS:
self._sleep_over_night(sessionmsg)
elif self.period_end_action == "sleep" and today in WEEK_ENDS:
self._sleep_over_weekend(sessionmsg)
def _weekly_end_checks(self, today, closing, sessionmsg):
if today not in WEEK_ENDS:
self._sleep_over_night(sessionmsg)
else:
self.strategy.perform_period_end_checks()
if self.period_end_action == "break" and closing:
sys.exit(0)
elif self.period_end_action == "sleep" or not closing:
self._sleep_over_weekend(sessionmsg)
def _monthly_end_cheks(self, today, closing, sessionmsg):
if today not in WEEK_ENDS and not self._is_month_ends():
self._sleep_over_night(sessionmsg)
elif self._is_month_ends() and closing:
self.strategy.perform_period_end_checks()
sys.exit(0)
else:
self._sleep_over_weekend(sessionmsg)
def _perform_period_end_actions(
self,
today,
day_end,
closing,
sessionmsg,
):
period = self.period.lower()
for symbol, trade in self.trades_instances.items():
self._check_is_day_ends(trade, symbol, period, today, closing)
if day_end:
self.time_intervals = 0
match period:
case "24/7":
self.strategy.perform_period_end_checks()
self._sleep_over_night(sessionmsg)
case "day":
self._daily_end_checks(today, closing, sessionmsg)
case "week":
self._weekly_end_checks(today, closing, sessionmsg)
case "month":
self._monthly_end_cheks(today, closing, sessionmsg)
case _:
raise ValueError(f"Invalid period {period}")
def _check(self, buys, sells, symbol):
if not self.mm:
return
if buys is not None or sells is not None:
self.trades_instances[symbol].break_even(
mm=self.mm,
trail=self.trail,
stop_trail=self.stop_trail,
trail_after_points=self.trail_after_points,
be_plus_points=self.be_plus_points,
)
def _get_signals_and_weights(self):
try:
check_mt5_connection(**self.kwargs)
signals = self.strategy.calculate_signals()
weights = (
self.strategy.apply_risk_management(self.optimizer)
if hasattr(self.strategy, "apply_risk_management")
else None
)
return signals, weights
except Exception as e:
self._print_exc(
f"Calculating Signals, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}",
e,
)
pass
def _update_risk(self, weights):
try:
check_mt5_connection(**self.kwargs)
if weights is not None and not all(v == 0 for v in weights.values()):
assert self.daily_risk is not None
for symbol in self.symbols:
if symbol not in weights:
continue
trade = self.trades_instances[symbol]
dailydd = round(weights[symbol] * self.daily_risk, 5)
trade.dailydd = dailydd
except Exception as e:
self._print_exc(
f"Updating Risk, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}",
e,
)
pass
def _auto_trade(self, sigmsg, symbol) -> bool:
if self.notify:
self._send_notification(sigmsg, symbol)
if self.auto_trade:
return True
if not self.auto_trade:
prompt = (
f"{sigmsg} \n Enter Y/Yes to accept or N/No to reject this order : "
)
if self.prompt_callback is not None:
auto_trade = self.prompt_callback(prompt)
else:
auto_trade = input(prompt)
if not auto_trade.upper().startswith("Y"):
info = f"Order Rejected !!! SYMBOL={symbol}, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
logger.info(info)
if self.notify:
self._send_notification(info, symbol)
return False
return True
def _open_buy(
self,
signal,
symbol,
id,
trade: Trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
):
if not self._auto_trade(sigmsg, symbol):
return
if not self._check_retcode(trade, "BMKT"):
logger.info(msg)
trade.open_buy_position(
action=signal,
price=price,
stoplimit=stoplimit,
sl=sl,
tp=tp,
id=id,
mm=self.mm,
trail=self.trail,
comment=comment,
)
def _open_sell(
self,
signal,
symbol,
id,
trade: Trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
):
if not self._auto_trade(sigmsg, symbol):
return
if not self._check_retcode(trade, "SMKT"):
logger.info(msg)
trade.open_sell_position(
action=signal,
price=price,
stoplimit=stoplimit,
sl=sl,
tp=tp,
id=id,
mm=self.mm,
trail=self.trail,
comment=comment,
)
def _handle_exit_signals(self, signal, symbol, id, trade: Trade, sigmsg, comment):
for exit_signal, actions in EXIT_SIGNAL_ACTIONS.items():
if signal == exit_signal:
for signal_attr, order_type in actions.items():
clos_func = getattr(
self.trades_instances[symbol], f"get_current_{signal_attr}"
)
if clos_func(id=id) is not None:
if self.notify:
self._send_notification(sigmsg, symbol)
close_method = (
trade.close_positions
if signal_attr in POSITIONS_TYPES
else trade.close_orders
)
close_method(order_type, id=id, comment=comment)
def _handle_buy_signal(
self,
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
buys,
sells,
sigmsg,
msg,
tfmsg,
riskmsg,
comment,
):
if not self.long_market[symbol]:
if self.use_trade_time:
if self.time_intervals % self.trade_time == 0 or buys[symbol] is None:
self._open_buy(
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
)
else:
logger.info(tfmsg)
self._check(buys[symbol], sells[symbol], symbol)
else:
self._open_buy(
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
)
else:
logger.info(riskmsg)
def _handle_sell_signal(
self,
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
buys,
sells,
sigmsg,
msg,
tfmsg,
riskmsg,
comment,
):
if not self.short_market[symbol]:
if self.use_trade_time:
if self.time_intervals % self.trade_time == 0 or sells[symbol] is None:
self._open_sell(
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
)
else:
logger.info(tfmsg)
self._check(buys[symbol], sells[symbol], symbol)
else:
self._open_sell(
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
sigmsg,
msg,
comment,
)
else:
logger.info(riskmsg)
def _run_trade_algorithm(
self,
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
buys,
sells,
comment,
):
signal = {"LONG": "BMKT", "BUY": "BMKT", "SHORT": "SMKT", "SELL": "SMKT"}.get(
signal, signal
)
if (
self.trades_instances[symbol].dailydd == 0
and signal not in EXIT_SIGNAL_ACTIONS
):
logger.info(
f"Daily Risk is set to 0 !!! No trades allowed for SYMBOL={symbol}, "
f"STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
return
info, sigmsg, msg, tfmsg, riskmsg = self._get_signal_info(
signal, symbol, price, stoplimit, sl, tp
)
if signal not in EXIT_SIGNAL_ACTIONS:
if signal in NON_EXEC_RETCODES and not self._check_retcode(trade, signal):
logger.info(info)
elif signal not in NON_EXEC_RETCODES:
logger.info(info)
signal_handler = None
if signal in EXIT_SIGNAL_ACTIONS:
self._handle_exit_signals(signal, symbol, id, trade, sigmsg, comment)
elif signal in BUYS:
signal_handler = self._handle_buy_signal
elif signal in SELLS:
signal_handler = self._handle_sell_signal
if signal_handler is not None:
signal_handler(
signal,
symbol,
id,
trade,
price,
stoplimit,
sl,
tp,
buys,
sells,
sigmsg,
msg,
tfmsg,
riskmsg,
comment,
)
def _is_closing(self):
closing = True
if self.closing_pnl is not None:
closing = all(
trade.positive_profit(id=trade.expert_id, th=self.closing_pnl)
for trade in self.trades_instances.values()
)
return closing
def _sleep(self):
time.sleep((60 * self.iter_time) - 1.0)
if self.iter_time == 1:
self.time_intervals += 1
elif self.trade_time % self.iter_time == 0:
self.time_intervals += self.iter_time
else:
if self.use_trade_time:
raise ValueError(
f"iter_time must be a multiple of the {self.time_frame} !!!"
f"(e.g., if time_frame is 15m, iter_time must be 1.5, 3, 5, 15 etc)"
)
def _handle_one_signal(self, signal, today, buys, sells):
try:
symbol = signal.symbol
trade: Trade = self.trades_instances[symbol]
if trade.trading_time() and today in self.trading_days:
if signal.action is not None:
action = (
signal.action.value
if isinstance(signal.action, TradeAction)
else signal.action
)
self._run_trade_algorithm(
action,
symbol,
signal.id,
trade,
signal.price,
signal.stoplimit,
signal.sl,
signal.tp,
buys,
sells,
signal.comment or self.comment,
)
else:
if len(self.symbols) >= 10:
if symbol == self.symbols[-1]:
logger.info(
f"Not trading Time !!!, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
else:
logger.info(
f"Not trading Time !!! SYMBOL={trade.symbol}, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
)
self._check(buys[symbol], sells[symbol], symbol)
except Exception as e:
msg = (
f"Error handling signal for SYMBOL={signal.symbol} (SIGNAL: {action}), "
f"STRATEGY={self.STRATEGY}, ACCOUNT={self.ACCOUNT}"
)
self._print_exc(msg, e)
def _handle_all_signals(self, today, signals, buys, sells, max_workers=50):
try:
check_mt5_connection(**self.kwargs)
except Exception as e:
msg = "Initial MT5 connection check failed. Aborting signal processing."
self._print_exc(msg, e)
return
if not signals:
return
# We want to create a temporary function that
# already has the 'today', 'buys', and 'sells' arguments filled in.
# This is necessary because executor.map only iterates over one sequence (signals).
signal_processor = functools.partial(
self._handle_one_signal, today=today, buys=buys, sells=sells
)
if self.multithread:
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# 'map' will apply our worker function to every item in the 'signals' list.
# It will automatically manage the distribution of tasks to the worker threads.
# We wrap it in list() to ensure all tasks are complete before moving on.
list(executor.map(signal_processor, signals))
else:
for signal in signals:
try:
signal_processor(signal)
except Exception as e:
self._print_exc(f"Failed to process signal {signal}: ", e)
def _handle_period_end_actions(self, today):
try:
check_mt5_connection(**self.kwargs)
day_end = (
all(trade.days_end() for trade in self.trades_instances.values())
or (today in WEEK_ENDS and today != FRIDAY)
and self.period != "24/7"
)
closing = self._is_closing()
sessionmsg = f"{self.ACCOUNT} STARTING NEW TRADING SESSION ...\n"
self._perform_period_end_actions(
today,
day_end,
closing,
sessionmsg,
)
except Exception as e:
msg = f"Handling period end actions, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
self._print_exc(msg, e)
pass
def _select_symbols(self):
for symbol in self.symbols:
if not MT5.symbol_select(symbol, True):
logger.error(
f"Failed to select symbol {symbol} error = {MT5.last_error()}"
)
[docs]
def run(self):
while self._running and not self.shutdown_event.is_set():
try:
check_mt5_connection(**self.kwargs)
self._select_symbols()
positions_orders = self._check_positions_orders()
if self.show_positions_orders:
self._display_positions_orders(positions_orders)
buys = positions_orders.get("buys")
sells = positions_orders.get("sells")
self.long_market, self.short_market = self._long_short_market(
buys, sells
)
today = datetime.now().strftime("%A").lower()
signals, weights = self._get_signals_and_weights()
if len(signals) == 0:
for symbol in self.symbols:
self._check(buys[symbol], sells[symbol], symbol)
else:
self._update_risk(weights)
self._handle_all_signals(today, signals, buys, sells)
self._sleep()
self._handle_period_end_actions(today)
except KeyboardInterrupt:
self.stop()
sys.exit(0)
except Exception as e:
msg = f"Running Execution Engine, STRATEGY={self.STRATEGY} , ACCOUNT={self.ACCOUNT}"
self._print_exc(msg, e)
self._sleep()
[docs]
def stop(self):
"""Stops the execution engine."""
if self._running:
logger.info(
f"Stopping Execution Engine for {self.STRATEGY} STRATEGY on {self.ACCOUNT} Account"
)
self._running = False
self.shutdown_event.set()
logger.info("Execution Engine stopped successfully.")
[docs]
def RunMt5Engine(account_id: str, **kwargs):
"""
Start an MT5 execution engine for a given account.
Parameters
----------
account_id : str
Account ID to run the execution engine on.
**kwargs : dict
Additional keyword arguments. Possible keys include:
* symbol_list : list
List of symbols to trade.
* trades_instances : dict
Dictionary of Trade instances.
* strategy_cls : class
Strategy class to use for trading.
Returns
-------
None
Initializes and runs the MT5 execution engine.
"""
log.info(f"Starting execution engine for {account_id}")
symbol_list = kwargs.pop("symbol_list")
trades_instances = kwargs.pop("trades_instances")
strategy_cls = kwargs.pop("strategy_cls")
if symbol_list is None or trades_instances is None or strategy_cls is None:
log.error(f"Missing required arguments for account {account_id}")
raise ValueError(f"Missing required arguments for account {account_id}")
try:
engine = Mt5ExecutionEngine(
symbol_list, trades_instances, strategy_cls, **kwargs
)
engine.run()
except KeyboardInterrupt:
log.info(f"Execution engine for {account_id} interrupted by user")
engine.stop()
sys.exit(0)
except Exception as e:
log.exception(f"Error running execution engine for {account_id}: {e}")
finally:
log.info(f"Execution for {account_id} completed")
[docs]
def RunMt5Engines(accounts: Dict[str, Dict], start_delay: float = 1.0):
"""Runs multiple MT5 execution engines in parallel using multiprocessing.
Args:
accounts: Dictionary of accounts to run the execution engines on.
Keys are the account names or IDs and values are the parameters for the execution engine.
The parameters are the same as the ones passed to the `Mt5ExecutionEngine` class.
start_delay: Delay in seconds between starting the processes. Defaults to 1.0.
"""
processes = {}
for account_id, params in accounts.items():
log.info(f"Starting process for {account_id}")
params["multithread"] = True
process = mp.Process(target=RunMt5Engine, args=(account_id,), kwargs=params)
process.start()
processes[process] = account_id
if start_delay:
time.sleep(start_delay)
for process, account_id in processes.items():
process.join()
log.info(f"Process for {account_id} joined")