import concurrent.futures as cf
import configparser
import multiprocessing as mp
import threading
import time
from datetime import datetime
from enum import Enum
from multiprocessing.synchronize import Event
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from loguru import logger as log
from bbstrader.api.metatrader_client import TradeOrder, TradePosition # type: ignore
from bbstrader.config import BBSTRADER_DIR
from bbstrader.metatrader.account import Account
from bbstrader.metatrader.broker import check_mt5_connection
from bbstrader.metatrader.trade import FILLING_TYPE
from bbstrader.metatrader.utils import trade_retcode_message
try:
import MetaTrader5 as Mt5
except ImportError:
import bbstrader.compat # noqa: F401
__all__ = [
"TradeCopier",
"RunCopier",
"RunMultipleCopier",
"config_copier",
"copier_worker_process",
"get_symbols_from_string",
]
log.add(
f"{BBSTRADER_DIR}/logs/copier.log",
enqueue=True,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name} | {message}",
)
global logger
logger = log
ORDER_TYPE = {
0: (Mt5.ORDER_TYPE_BUY, "BUY"),
1: (Mt5.ORDER_TYPE_SELL, "SELL"),
2: (Mt5.ORDER_TYPE_BUY_LIMIT, "BUY LIMIT"),
3: (Mt5.ORDER_TYPE_SELL_LIMIT, "SELL LIMIT"),
4: (Mt5.ORDER_TYPE_BUY_STOP, "BUY STOP"),
5: (Mt5.ORDER_TYPE_SELL_STOP, "SELL STOP"),
6: (Mt5.ORDER_TYPE_BUY_STOP_LIMIT, "BUY STOP LIMIT"),
7: (Mt5.ORDER_TYPE_SELL_STOP_LIMIT, "SELL STOP LIMIT"),
}
STOP_RETCODES = [
Mt5.TRADE_RETCODE_TRADE_DISABLED,
Mt5.TRADE_RETCODE_NO_MONEY,
Mt5.TRADE_RETCODE_SERVER_DISABLES_AT,
Mt5.TRADE_RETCODE_CLIENT_DISABLES_AT,
Mt5.TRADE_RETCODE_ONLY_REAL,
]
RETURN_RETCODE = [
Mt5.TRADE_RETCODE_MARKET_CLOSED,
Mt5.TRADE_RETCODE_CONNECTION,
Mt5.TRADE_RETCODE_LIMIT_ORDERS,
Mt5.TRADE_RETCODE_LIMIT_VOLUME,
Mt5.TRADE_RETCODE_LIMIT_POSITIONS,
Mt5.TRADE_RETCODE_LONG_ONLY,
Mt5.TRADE_RETCODE_SHORT_ONLY,
Mt5.TRADE_RETCODE_CLOSE_ONLY,
Mt5.TRADE_RETCODE_FIFO_CLOSE,
Mt5.TRADE_RETCODE_INVALID_VOLUME,
Mt5.TRADE_RETCODE_INVALID_PRICE,
Mt5.TRADE_RETCODE_INVALID_STOPS,
Mt5.TRADE_RETCODE_NO_CHANGES,
]
class OrderAction(Enum):
COPY_NEW = "COPY_NEW"
MODIFY = "MODIFY"
CLOSE = "CLOSE"
SYNC_REMOVE = "SYNC_REMOVE"
SYNC_ADD = "SYNC_ADD"
CopyMode = Literal["fix", "multiply", "percentage", "dynamic", "replicate", "specific"]
def fix_lot(fixed):
if fixed == 0 or fixed is None:
raise ValueError("Fixed lot must be a number > 0")
return fixed
def multiply_lot(lot, multiplier):
if multiplier == 0 or multiplier is None:
raise ValueError("Multiplier lot must be a number > 0")
return lot * multiplier
def percentage_lot(lot, percentage):
if percentage == 0 or percentage is None:
raise ValueError("Percentage lot must be a number > 0")
return round(lot * percentage / 100, 2)
def dynamic_lot(source_lot, source_eqty: float, dest_eqty: float):
try:
ratio = dest_eqty / source_eqty
return round(source_lot * ratio, 2)
except ZeroDivisionError:
raise ValueError("Source or destination account equity is zero")
def specific_lot(symbol, value) -> float:
if not isinstance(value, dict):
raise ValueError(
"Specific lot size must be provided as a dictionary mapping symbols to lot sizes"
)
return value.get(symbol, 0.01)
def fixed_lot(lot, symbol, destination) -> float:
def _volume_step(value):
value_str = str(value)
if "." in value_str and value_str != "1.0":
decimal_index = value_str.index(".")
num_digits = len(value_str) - decimal_index - 1
return num_digits
elif value_str == "1.0":
return 0
else:
return 0
def _check_lot(lot: float, symbol_info) -> float:
if lot > symbol_info.volume_max:
return symbol_info.volume_max / 2
elif lot < symbol_info.volume_min:
return symbol_info.volume_min
return lot
s_info = Account(**destination).get_symbol_info(symbol)
volume_step = s_info.volume_step
steps = _volume_step(volume_step)
if float(steps) >= float(1):
return _check_lot(round(lot, steps), s_info)
else:
return _check_lot(round(lot), s_info)
def calculate_copy_lot(
source_lot,
symbol: str,
destination: dict,
mode: CopyMode = "dynamic",
value=None,
source_eqty: float = None,
dest_eqty: float = None,
):
match mode:
case "replicate":
return fixed_lot(source_lot, symbol, destination)
case "fix":
return fixed_lot(fix_lot(value), symbol, destination)
case "multiply":
lot = multiply_lot(source_lot, value)
return fixed_lot(lot, symbol, destination)
case "percentage":
lot = percentage_lot(source_lot, value)
return fixed_lot(lot, symbol, destination)
case "dynamic":
lot = dynamic_lot(source_lot, source_eqty, dest_eqty)
return fixed_lot(lot, symbol, destination)
case "specific":
lot = specific_lot(symbol, value)
return fixed_lot(lot, symbol, destination)
case _:
raise ValueError("Invalid mode selected")
[docs]
def get_symbols_from_string(symbols_string: str) -> List[str] | Dict[str, str]:
if not symbols_string:
raise ValueError("Input Error", "Tickers string cannot be empty.")
string = (
symbols_string.strip().replace("\n", "").replace(" ", "").replace('"""', "")
)
if ":" in string and "," in string:
if string.endswith(","):
string = string[:-1]
return dict(item.split(":") for item in string.split(","))
elif ":" in string and "," not in string:
raise ValueError("Each key pairs value must be separeted by ','")
elif "," in string and ":" not in string:
return string.split(",")
else:
raise ValueError("""
Invalid symbols format.
You can use comma separated symbols in one line or multiple lines using triple quotes.
You can also use a dictionary to map source symbols to destination symbols as shown below.
Or if you want to copy all symbols, use "all" or "*".
symbols = EURUSD,GBPUSD,USDJPY (comma separated)
symbols = EURUSD.s:EURUSD_i, GBPUSD.s:GBPUSD_i, USDJPY.s:USDJPY_i (dictionary)
symbols = all (copy all symbols)
symbols = * (copy all symbols) """)
def get_lots_from_string(lots_string: str) -> Dict[str, float]:
if not lots_string:
raise ValueError("Input Error", "Lots string cannot be empty.")
string = lots_string.strip().replace("\n", "").replace(" ", "").replace('"""', "")
if ":" in string and "," in string:
if string.endswith(","):
string = string[:-1]
lot_dict = {}
for item in string.split(","):
key, value = item.split(":")
lot_dict[key] = float(value)
return lot_dict
else:
raise ValueError("""
Invalid lots format.
You must use a dictionary to map symbols to lot sizes as shown below.
lots = EURUSD:0.1, GBPUSD:0.2, USDJPY:0.15 (dictionary)
""")
def get_copy_symbols(destination: dict, source: dict) -> List[str] | Dict[str, str]:
symbols = destination.get("symbols", "all")
if symbols == "all" or symbols == "*":
src_account = Account(**source)
src_symbols = src_account.get_symbols()
dest_account = Account(**destination)
dest_symbols = dest_account.get_symbols()
for s in src_symbols:
if s not in dest_symbols:
err_msg = (
f"To use 'all' or '*', Source account@{src_account.number} "
f"and destination account@{dest_account.number} "
f"must be the same type and have the same symbols"
f"If not Use a dictionary to map source symbols to destination symbols "
f"(e.g., EURUSD.s:EURUSD_i, GBPUSD.s:GBPUSD_i, USDJPY.s:USDJPY_i"
f"Where EURUSD.s is the source symbols and EURUSD_i is the corresponding symbol"
)
raise ValueError(err_msg)
return dest_symbols
elif isinstance(symbols, (list, dict)):
return symbols
elif isinstance(symbols, str):
return get_symbols_from_string(symbols)
else:
raise ValueError("Invalide symbols provided")
[docs]
class TradeCopier(object):
"""
``TradeCopier`` responsible for copying trading orders and positions from a source account to multiple destination accounts.
This class facilitates the synchronization of trades between a source account and multiple destination accounts.
It handles copying new orders, modifying existing orders, updating and closing positions based on updates from the source account.
"""
__slots__ = (
"source",
"source_id",
"source_isunique",
"destinations",
"errors",
"sleeptime",
"start_time",
"end_time",
"shutdown_event",
"custom_logger",
"log_queue",
"_last_session",
"_running",
)
source: Dict
source_id: int
source_isunique: bool
destinations: List[dict]
shutdown_event: Event
log_queue: mp.Queue
def __init__(
self,
source: Dict,
destinations: List[dict],
/,
sleeptime: float = 0.1,
start_time: str = None,
end_time: str = None,
*,
custom_logger=None,
shutdown_event=None,
log_queue=None,
):
"""
Initializes the ``TradeCopier`` instance, setting up the source and destination trading accounts for trade copying.
Args:
source (dict):
A dictionary containing the connection details for the source trading account. This dictionary
**must** include all parameters required to successfully connect to the source account.
Refer to the ``bbstrader.metatrader.check_mt5_connection`` function for a comprehensive list
of required keys and their expected values. Common parameters include, but are not limited to
- `login`: The account login ID (integer).
- `password`: The account password (string).
- `server`: The server address (string), e.g., "Broker-Demo".
- `path`: The path to the MetaTrader 5 installation directory (string).
- `portable`: A boolean indicating whether to open MetaTrader 5 installation in portable mode.
- `id`: A unique identifier for all trades opened buy the source source account.
This Must be a positive number greater than 0 and less than 2^32 / 2.
- `unique`: A boolean indication whehter to allow destination accounts to copy from other sources.
If Set to True, all destination accounts won't be allow to accept trades from other accounts even
manually opened positions or orders will be removed.
destinations (List[dict]):
A list of dictionaries, where each dictionary represents a destination trading account to which
trades will be copied. Each destination dictionary **must** contain the following keys
- Authentication details (e.g., `login`, `password`, `server`)
Identical in structure and requirements to the `source` dictionary,
ensuring a connection can be established to the destination account.
Refer to ``bbstrader.metatrader.check_mt5_connection``.
- `symbols` (Union[List[str], Dict[str, str], str])
Specifies which symbols should be copied from the source
account to this destination account. Possible values include
`List[str]` A list of strings, where each string is a symbol to be copied.
The same symbol will be traded on the destination account. Example `["EURUSD", "GBPUSD"]`
`Dict[str, str]` A dictionary mapping source symbols to destination symbols.
This allows for trading a different symbol on the destination account than the one traded on the source.
Example `{"EURUSD": "EURUSD_i", "GBPUSD": "GBPUSD_i"}`.
`"all"` or `"*"` Indicates that all symbols traded on the source account should be
copied to this destination account, using the same symbol name.
- `mode` (str) The risk management mode to use. Valid options are
`"fix"` Use a fixed lot size. The `value` key must specify the fixed lot size.
`"multiply"` Multiply the source account's lot size by a factor.
The `value` key must specify the multiplier.
`"percentage"` Trade a percentage of the source account's lot size.
The `value` key must specify the percentage (as a decimal, e.g., 50 for 50%).
`"dynamic"` Calculate the lot size dynamically based on account equity and risk parameters.
The `value` key is ignored.
`"replicate"` Copy the exact lot size from the source account. The `value` key is ignored.
`"specific"` Use a specific lot size defined in the `value` key for each symbol.
- `value` (float or dict, optional) A numerical value or dict used in conjunction with the selected `mode`.
Its meaning depends on the chosen `mode` (see above). Required for "fix", "multiply", specific
and "percentage" modes; optional for "dynamic".
- `slippage` (float, optional) The maximum allowed slippage in percentage when opening trades on the destination account,
defaults to 0.1% (0.1), if the slippage exceeds this value, the trade will not be copied.
- `comment` (str, optional) An optional comment to be added to trades opened on the destination account,
defaults to an empty string.
- ``copy_what`` (str, optional)
Specifies what to copy from the source account to the destination accounts. Valid options are
`"orders"` Copy only orders from the source account to the destination accounts.
`"positions"` Copy only positions from the source account to the destination accounts.
`"all"` Copy both orders and positions from the source account to the destination accounts.
Defaults to `"all"`.
sleeptime (float, optional):
The time interval in seconds between each iteration of the trade copying process.
Defaults to 0.1 seconds. It can be useful if you know the frequency of new trades on the source account.
start_time (str, optional): The time (HH:MM) from which the copier start copying from the source.
end_time (str, optional): The time (HH:MM) from which the copier stop copying from the source.
custom_logger (Any, Optional): Used to set a cutum logger (default is ``loguru.logger``)
shutdown_event (Any, Otional): Use to terminate the copy process when runs in a custum environment like web App or GUI.
log_queue (multiprocessing.Queue, Optional): Use to send log to an external program, usefule in GUI apps
Note:
The source account and the destination accounts must be connected to different MetaTrader 5 platforms.
you can copy the initial installation of MetaTrader 5 to a different directory and rename it to create a new instance
Then you can connect destination accounts to the new instance while the source account is connected to the original instance.
"""
self.source = source
self.source_id = source.get("id", 0)
self.source_isunique = source.get("unique", True)
self.destinations = destinations
self.sleeptime = sleeptime
self.start_time = start_time
self.end_time = end_time
self.errors = set()
self.log_queue = log_queue
self._add_logger(custom_logger)
self._validate_source()
self.shutdown_event = (
shutdown_event if shutdown_event is not None else mp.Event()
)
self._last_session = datetime.now().date()
self._running = True
@property
def running(self):
"""Check if the Trade Copier is running."""
return self._running
def _add_logger(self, custom_logger):
if custom_logger:
global logger
logger = custom_logger
[docs]
def log_message(
self, message, type: Literal["info", "error", "debug", "warning"] = "info"
):
logger.trace
if self.log_queue:
try:
now = datetime.now()
formatted = (
now.strftime("%Y-%m-%d %H:%M:%S.")
+ f"{int(now.microsecond / 1000):03d}"
)
space = len("exception") # longest log name
self.log_queue.put(
f"{formatted} |{type.upper()} {' ' * (space - len(type))} | - {message}"
)
except Exception:
pass
else:
getattr(logger, type)(message)
[docs]
def log_error(self, e, symbol=None):
if datetime.now().date() > self._last_session:
self._last_session = datetime.now().date()
self.errors.clear()
error_msg = repr(e)
if error_msg not in self.errors:
self.errors.add(error_msg)
add_msg = f", SYMBOL={symbol}" if symbol else ""
message = f"Error encountered: {error_msg}{add_msg}"
self.log_message(message, type="error")
def _validate_source(self):
if not self.source_isunique:
try:
assert self.source_id >= 1
except AssertionError:
raise ValueError(
"Non Unique source account must have a valide ID , (e.g., source['id'] = 1234)"
)
def _get_magic(self, ticket: int) -> int:
return int(str(self.source_id) + str(ticket)) if self.source_id >= 1 else ticket
def _select_symbol(self, symbol: str, destination: dict):
selected = Mt5.symbol_select(symbol, True)
if not selected:
self.log_message(
f"Failed to select {destination.get('login')}::{symbol}, error code = {Mt5.last_error()}",
type="error",
)
[docs]
def source_orders(self, symbol=None):
check_mt5_connection(**self.source)
return Account(**self.source).get_orders(symbol=symbol)
[docs]
def source_positions(self, symbol=None):
check_mt5_connection(**self.source)
return Account(**self.source).get_positions(symbol=symbol)
[docs]
def destination_orders(self, destination: dict, symbol=None):
check_mt5_connection(**destination)
return Account(**destination).get_orders(symbol=symbol)
[docs]
def destination_positions(self, destination: dict, symbol=None):
check_mt5_connection(**destination)
return Account(**destination).get_positions(symbol=symbol)
[docs]
def get_copy_symbol(self, symbol, destination: dict = None, type="destination"):
symbols = get_copy_symbols(destination, self.source)
if isinstance(symbols, list):
if symbol in symbols:
return symbol
if isinstance(symbols, dict):
if type == "destination":
if symbol in symbols.keys():
return symbols[symbol]
if type == "source":
for k, v in symbols.items():
if v == symbol:
return k
raise ValueError(f"Symbol {symbol} not found in {type} account")
[docs]
def isorder_modified(self, source: TradeOrder, dest: TradeOrder):
if source.type == dest.type and self._get_magic(source.ticket) == dest.magic:
return (
source.sl != dest.sl
or source.tp != dest.tp
or source.price_open != dest.price_open
or source.price_stoplimit != dest.price_stoplimit
)
return False
[docs]
def isposition_modified(self, source: TradePosition, dest: TradePosition):
if source.type == dest.type and self._get_magic(source.ticket) == dest.magic:
return source.sl != dest.sl or source.tp != dest.tp
return False
[docs]
def slippage(self, source: TradeOrder | TradePosition, destination: dict) -> bool:
slippage = destination.get("slippage", 0.1)
if slippage is None:
return False
if hasattr(source, "profit"):
if source.type in [0, 1] and source.profit < 0:
return False
delta = ((source.price_current - source.price_open) / source.price_open) * 100
if source.type in [0, 3, 4, 6] and delta > slippage:
return True
if source.type in [1, 2, 5, 7] and delta < -slippage:
return True
return False
[docs]
def iscopy_time(self):
if self.start_time is None or self.end_time is None:
return True
else:
start_time = datetime.strptime(self.start_time, "%H:%M").time()
end_time = datetime.strptime(self.end_time, "%H:%M").time()
if start_time <= datetime.now().time() <= end_time:
return True
return False
def _update_filling_type(self, request, result):
new_result = result
if result.retcode == Mt5.TRADE_RETCODE_INVALID_FILL:
for fill in FILLING_TYPE:
request["type_filling"] = fill
new_result = Mt5.order_send(request)
if new_result.retcode == Mt5.TRADE_RETCODE_DONE:
break
return new_result
[docs]
def handle_retcode(self, retcode) -> int:
if retcode in STOP_RETCODES:
msg = trade_retcode_message(retcode)
self.log_error(f"Critical Error on @{self.source['login']}: {msg} ")
self.stop()
if retcode in RETURN_RETCODE:
return 1
[docs]
def copy_new_trade(self, trade: TradeOrder | TradePosition, destination: dict):
if not self.iscopy_time():
return
check_mt5_connection(**destination)
symbol = self.get_copy_symbol(trade.symbol, destination)
self._select_symbol(symbol, destination)
volume = trade.volume if hasattr(trade, "volume") else trade.volume_initial
lot = calculate_copy_lot(
volume,
symbol,
destination,
mode=destination.get("mode", "fix"),
value=destination.get("value", 0.01),
source_eqty=Account(**self.source).get_account_info().margin_free,
dest_eqty=Account(**destination).get_account_info().margin_free,
)
trade_action = (
Mt5.TRADE_ACTION_DEAL if trade.type in [0, 1] else Mt5.TRADE_ACTION_PENDING
)
tick = Mt5.symbol_info_tick(symbol)
price = tick.bid if trade.type == 0 else tick.ask
try:
request = dict(
symbol=symbol,
action=trade_action,
volume=lot,
price=price,
sl=trade.sl,
tp=trade.tp,
type=ORDER_TYPE[trade.type][0],
magic=self._get_magic(trade.ticket),
deviation=Mt5.symbol_info(symbol).spread,
comment=destination.get("comment", trade.comment + "#bbstrader"),
type_time=Mt5.ORDER_TIME_GTC,
type_filling=Mt5.ORDER_FILLING_FOK,
)
if trade.type not in [0, 1]:
request["price"] = trade.price_open
if trade.type in [6, 7]:
request["stoplimit"] = trade.price_stoplimit
result = Mt5.order_send(request)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
result = self._update_filling_type(request, result)
action = ORDER_TYPE[trade.type][1]
copy_action = "Position" if trade.type in [0, 1] else "Order"
if result.retcode == Mt5.TRADE_RETCODE_DONE:
self.log_message(
f"Copy {action} {copy_action} #{trade.ticket} from @{self.source.get('login')}::{trade.symbol} "
f"to @{destination.get('login')}::{symbol}",
)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
if self.handle_retcode(result.retcode) == 1:
return
self.log_message(
f"Error copying {action} {copy_action} #{trade.ticket} from @{self.source.get('login')}::{trade.symbol} "
f"to @{destination.get('login')}::{symbol}, {trade_retcode_message(result.retcode)}",
type="error",
)
except Exception as e:
self.log_error(e, symbol=symbol)
[docs]
def copy_new_order(self, order: TradeOrder, destination: dict):
self.copy_new_trade(order, destination)
[docs]
def modify_order(self, ticket, symbol, source_order: TradeOrder, destination: dict):
check_mt5_connection(**destination)
self._select_symbol(symbol, destination)
request = {
"action": Mt5.TRADE_ACTION_MODIFY,
"order": ticket,
"symbol": symbol,
"price": source_order.price_open,
"sl": source_order.sl,
"tp": source_order.tp,
"stoplimit": source_order.price_stoplimit,
}
result = Mt5.order_send(request)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
result = self._update_filling_type(request, result)
if result.retcode == Mt5.TRADE_RETCODE_DONE:
self.log_message(
f"Modify {ORDER_TYPE[source_order.type][1]} Order #{ticket} on @{destination.get('login')}::{symbol}, "
f"SOURCE=@{self.source.get('login')}::{source_order.symbol}"
)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
if self.handle_retcode(result.retcode) == 1:
return
self.log_message(
f"Error modifying {ORDER_TYPE[source_order.type][1]} Order #{ticket} on @{destination.get('login')}::{symbol},"
f"SOURCE=@{self.source.get('login')}::{source_order.symbol}, {trade_retcode_message(result.retcode)}",
type="error",
)
[docs]
def remove_order(self, src_symbol, order: TradeOrder, destination: dict):
check_mt5_connection(**destination)
self._select_symbol(order.symbol, destination)
request = {
"action": Mt5.TRADE_ACTION_REMOVE,
"order": order.ticket,
}
result = Mt5.order_send(request)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
result = self._update_filling_type(request, result)
if result.retcode == Mt5.TRADE_RETCODE_DONE:
self.log_message(
f"Close {ORDER_TYPE[order.type][1]} Order #{order.ticket} on @{destination.get('login')}::{order.symbol}, "
f"SOURCE=@{self.source.get('login')}::{src_symbol}"
)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
if self.handle_retcode(result.retcode) == 1:
return
self.log_message(
f"Error closing {ORDER_TYPE[order.type][1]} Order #{order.ticket} on @{destination.get('login')}::{order.symbol}, "
f"SOURCE=@{self.source.get('login')}::{src_symbol}, {trade_retcode_message(result.retcode)}",
type="error",
)
[docs]
def copy_new_position(self, position: TradePosition, destination: dict):
self.copy_new_trade(position, destination)
[docs]
def modify_position(
self, ticket, symbol, source_pos: TradePosition, destination: dict
):
check_mt5_connection(**destination)
self._select_symbol(symbol, destination)
request = {
"action": Mt5.TRADE_ACTION_SLTP,
"position": ticket,
"symbol": symbol,
"sl": source_pos.sl,
"tp": source_pos.tp,
}
result = Mt5.order_send(request)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
result = self._update_filling_type(request, result)
if result.retcode == Mt5.TRADE_RETCODE_DONE:
self.log_message(
f"Modify {ORDER_TYPE[source_pos.type][1]} Position #{ticket} on @{destination.get('login')}::{symbol}, "
f"SOURCE=@{self.source.get('login')}::{source_pos.symbol}"
)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
if self.handle_retcode(result.retcode) == 1:
return
self.log_message(
f"Error modifying {ORDER_TYPE[source_pos.type][1]} Position #{ticket} on @{destination.get('login')}::{symbol}, "
f"SOURCE=@{self.source.get('login')}::{source_pos.symbol}, {trade_retcode_message(result.retcode)}",
type="error",
)
[docs]
def remove_position(self, src_symbol, position: TradePosition, destination: dict):
check_mt5_connection(**destination)
self._select_symbol(position.symbol, destination)
position_type = (
Mt5.ORDER_TYPE_SELL if position.type == 0 else Mt5.ORDER_TYPE_BUY
)
request = {
"action": Mt5.TRADE_ACTION_DEAL,
"symbol": position.symbol,
"volume": position.volume,
"type": position_type,
"position": position.ticket,
"price": position.price_current,
"deviation": int(Mt5.symbol_info(position.symbol).spread),
"type_time": Mt5.ORDER_TIME_GTC,
"type_filling": Mt5.ORDER_FILLING_FOK,
}
result = Mt5.order_send(request)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
result = self._update_filling_type(request, result)
if result.retcode == Mt5.TRADE_RETCODE_DONE:
self.log_message(
f"Close {ORDER_TYPE[position.type][1]} Position #{position.ticket} "
f"on @{destination.get('login')}::{position.symbol}, "
f"SOURCE=@{self.source.get('login')}::{src_symbol}"
)
if result.retcode != Mt5.TRADE_RETCODE_DONE:
if self.handle_retcode(result.retcode) == 1:
return
self.log_message(
f"Error closing {ORDER_TYPE[position.type][1]} Position #{position.ticket} "
f"on @{destination.get('login')}::{position.symbol}, "
f"SOURCE=@{self.source.get('login')}::{src_symbol}, {trade_retcode_message(result.retcode)}",
type="error",
)
[docs]
def filter_positions_and_orders(self, pos_or_orders, symbols=None):
if symbols is None:
return pos_or_orders
elif isinstance(symbols, list):
return [pos for pos in pos_or_orders if pos.symbol in symbols]
elif isinstance(symbols, dict):
return [
pos
for pos in pos_or_orders
if pos.symbol in symbols.keys() or pos.symbol in symbols.values()
]
[docs]
def get_positions(
self, destination: dict
) -> Tuple[List[TradePosition], List[TradePosition]]:
source_positions = self.source_positions() or []
dest_symbols = get_copy_symbols(destination, self.source)
dest_positions = self.destination_positions(destination) or []
source_positions = self.filter_positions_and_orders(
source_positions, symbols=dest_symbols
)
dest_positions = self.filter_positions_and_orders(
dest_positions, symbols=dest_symbols
)
return source_positions, dest_positions
[docs]
def get_orders(
self, destination: dict
) -> Tuple[List[TradeOrder], List[TradeOrder]]:
source_orders = self.source_orders() or []
dest_symbols = get_copy_symbols(destination, self.source)
dest_orders = self.destination_orders(destination) or []
source_orders = self.filter_positions_and_orders(
source_orders, symbols=dest_symbols
)
dest_orders = self.filter_positions_and_orders(
dest_orders, symbols=dest_symbols
)
return source_orders, dest_orders
def _copy_what(self, destination):
return destination.get("copy_what", "all")
def _isvalide_magic(self, magic):
ticket = str(magic)
id = str(self.source_id)
return (
ticket != id
and ticket.startswith(id)
and ticket[: len(id)] == id
and int(ticket[: len(id)]) == self.source_id
)
def _get_new_orders(
self, source_orders, destination_orders, destination
) -> List[Tuple]:
actions = []
dest_ids = {order.magic for order in destination_orders}
for source_order in source_orders:
if self._get_magic(source_order.ticket) not in dest_ids:
if not self.slippage(source_order, destination):
actions.append((OrderAction.COPY_NEW, source_order, destination))
return actions
def _get_modified_orders(
self, source_orders, destination_orders, destination
) -> List[Tuple]:
actions = []
dest_order_map = {order.magic: order for order in destination_orders}
for source_order in source_orders:
magic_id = self._get_magic(source_order.ticket)
if magic_id in dest_order_map:
destination_order = dest_order_map[magic_id]
if self.isorder_modified(source_order, destination_order):
ticket = destination_order.ticket
symbol = destination_order.symbol
actions.append(
(OrderAction.MODIFY, ticket, symbol, source_order, destination)
)
return actions
def _get_closed_orders(
self, source_orders, destination_orders, destination
) -> List[Tuple]:
actions = []
source_ids = {self._get_magic(order.ticket) for order in source_orders}
for destination_order in destination_orders:
if destination_order.magic not in source_ids:
if self.source_isunique or self._isvalide_magic(
destination_order.magic
):
src_symbol = self.get_copy_symbol(
destination_order.symbol, destination, type="source"
)
actions.append(
(OrderAction.CLOSE, src_symbol, destination_order, destination)
)
return actions
def _get_orders_to_sync(
self, source_orders, destination_positions, destination
) -> List[Tuple]:
actions = []
source_order_map = {
self._get_magic(order.ticket): order for order in source_orders
}
for dest_pos in destination_positions:
if dest_pos.magic in source_order_map:
source_order = source_order_map[dest_pos.magic]
actions.append(
(
OrderAction.SYNC_REMOVE,
source_order.symbol,
dest_pos,
destination,
)
)
if not self.slippage(source_order, destination):
actions.append((OrderAction.SYNC_ADD, source_order, destination))
return actions
def _execute_order_action(self, action_item: Tuple):
action_type, *args = action_item
try:
if action_type == OrderAction.COPY_NEW:
self.copy_new_order(*args)
elif action_type == OrderAction.MODIFY:
self.modify_order(*args)
elif action_type == OrderAction.CLOSE:
self.remove_order(*args)
elif action_type == OrderAction.SYNC_REMOVE:
self.remove_position(*args)
elif action_type == OrderAction.SYNC_ADD:
self.copy_new_order(*args)
else:
self.log_message(f"Warning: Unknown action type '{action_type.value}'")
except Exception as e:
self.log_error(
f"Error executing action {action_type.value} with args {args}: {e}"
)
[docs]
def process_all_orders(self, destination, max_workers=10):
source_orders, destination_orders = self.get_orders(destination)
_, destination_positions = self.get_positions(destination)
orders_actions = []
orders_actions.extend(
self._get_new_orders(source_orders, destination_orders, destination)
)
orders_actions.extend(
self._get_modified_orders(source_orders, destination_orders, destination)
)
orders_actions.extend(
self._get_closed_orders(source_orders, destination_orders, destination)
)
orders_actions.extend(
self._get_orders_to_sync(source_orders, destination_positions, destination)
)
if not orders_actions:
return
with cf.ThreadPoolExecutor(max_workers=max_workers) as executor:
list(executor.map(self._execute_order_action, orders_actions))
def _get_new_positions(
self, source_positions, destination_positions, destination
) -> List[Tuple]:
actions = []
dest_ids = {pos.magic for pos in destination_positions}
for source_pos in source_positions:
if self._get_magic(source_pos.ticket) not in dest_ids:
if not self.slippage(source_pos, destination):
actions.append((OrderAction.COPY_NEW, source_pos, destination))
return actions
def _get_modified_positions(
self, source_positions, destination_positions, destination
) -> List[Tuple]:
actions = []
dest_pos_map = {pos.magic: pos for pos in destination_positions}
for source_pos in source_positions:
magic_id = self._get_magic(source_pos.ticket)
if magic_id in dest_pos_map:
dest_pos = dest_pos_map[magic_id]
if self.isposition_modified(source_pos, dest_pos):
actions.append(
(
OrderAction.MODIFY,
dest_pos.ticket,
dest_pos.symbol,
source_pos,
destination,
)
)
return actions
def _get_closed_positions(
self, source_positions, destination_positions, destination
) -> List[Tuple]:
actions = []
source_ids = {self._get_magic(pos.ticket) for pos in source_positions}
for dest_pos in destination_positions:
if dest_pos.magic not in source_ids:
if self.source_isunique or self._isvalide_magic(dest_pos.magic):
src_symbol = self.get_copy_symbol(
dest_pos.symbol, destination, type="source"
)
actions.append(
(OrderAction.CLOSE, src_symbol, dest_pos, destination)
)
return actions
def _get_positions_to_sync(
self, source_positions, destination_orders, destination
) -> List[Tuple]:
actions = []
dest_order_map = {order.magic: order for order in destination_orders}
for source_pos in source_positions:
magic_id = self._get_magic(source_pos.ticket)
if magic_id in dest_order_map:
dest_order = dest_order_map[magic_id]
# Action 1: Always remove the corresponding order
actions.append(
(
OrderAction.SYNC_REMOVE,
source_pos.symbol,
dest_order,
destination,
)
)
# Action 2: Potentially copy a new position
if self._copy_what(destination) in ["all", "positions"]:
if not self.slippage(source_pos, destination):
actions.append((OrderAction.SYNC_ADD, source_pos, destination))
return actions
def _execute_position_action(self, action_item: Tuple):
"""A single worker task that executes one action for either Orders or Positions."""
action_type, *args = action_item
try:
if action_type == OrderAction.COPY_NEW:
self.copy_new_position(*args)
elif action_type == OrderAction.MODIFY:
self.modify_position(*args)
elif action_type == OrderAction.CLOSE:
self.remove_position(*args)
elif action_type == OrderAction.SYNC_REMOVE:
self.remove_order(*args)
elif action_type == OrderAction.SYNC_ADD:
self.copy_new_position(*args)
else:
self.log_message(f"Warning: Unknown action type '{action_type.value}'")
except Exception as e:
self.log_error(
f"Error executing action {action_type.value} with args {args}: {e}"
)
[docs]
def process_all_positions(self, destination, max_workers=20):
source_positions, destination_positions = self.get_positions(destination)
_, destination_orders = self.get_orders(destination)
positions_actions = []
positions_actions.extend(
self._get_new_positions(
source_positions, destination_positions, destination
)
)
positions_actions.extend(
self._get_modified_positions(
source_positions, destination_positions, destination
)
)
positions_actions.extend(
self._get_closed_positions(
source_positions, destination_positions, destination
)
)
positions_actions.extend(
self._get_positions_to_sync(
source_positions, destination_orders, destination
)
)
if not positions_actions:
return
with cf.ThreadPoolExecutor(max_workers=max_workers) as executor:
list(executor.map(self._execute_position_action, positions_actions))
[docs]
def copy_orders(self, destination: dict):
if self._copy_what(destination) not in ["all", "orders"]:
return
check_mt5_connection(**destination)
self.process_all_orders(destination)
[docs]
def copy_positions(self, destination: dict):
if self._copy_what(destination) not in ["all", "positions"]:
return
check_mt5_connection(**destination)
self.process_all_positions(destination)
[docs]
def start_copy_process(self, destination: dict):
"""
Worker process: copies orders and positions concurrently for a single destination account.
"""
if destination.get("path") == self.source.get("path"):
self.log_message(
f"Source and destination accounts are on the same MetaTrader 5 "
f"installation ({self.source.get('path')}), which is not allowed."
)
return
self.log_message(
f"Copy process started for source @{self.source.get('login')} "
f"and destination @{destination.get('login')}"
)
while not self.shutdown_event.is_set():
try:
self.copy_positions(destination)
self.copy_orders(destination)
except KeyboardInterrupt:
self.log_message(
"KeyboardInterrupt received, stopping the Trade Copier..."
)
self.stop()
except Exception as e:
self.log_error(f"An error occurred during the sync cycle: {e}")
time.sleep(self.sleeptime)
self.log_message(
f"Process exiting for destination @{destination.get('login')} due to shutdown event."
)
[docs]
def run(self):
"""
Entry point: Starts a dedicated worker thread for EACH destination account to run concurrently.
"""
self.log_message(
f"Main Copier instance starting for source @{self.source.get('login')}."
)
self.log_message(
f"Found {len(self.destinations)} destination accounts to process in parallel."
)
if len(set([d.get("path") for d in self.destinations])) < len(
self.destinations
):
self.log_message(
"Two or more destination accounts have the same Terminal path, which is not allowed.",
type="error",
)
return
worker_threads = []
for destination in self.destinations:
self.log_message(
f"Creating worker thread for destination @{destination.get('login')}"
)
try:
thread = threading.Thread(
target=self.start_copy_process,
args=(destination,),
name=f"Worker-{destination.get('login')}",
)
worker_threads.append(thread)
thread.start()
except Exception as e:
self.log_error(
f"Error executing thread Worker-{destination.get('login')} : {e}"
)
self.log_message(f"All {len(worker_threads)} worker threads have been started.")
try:
while not self.shutdown_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
self.log_message(
"\nKeyboardInterrupt detected by main thread. Initiating shutdown..."
)
finally:
self.stop()
self.log_message("Waiting for all worker threads to complete...")
for thread in worker_threads:
thread.join()
self.log_message("All worker threads have shut down. Copier exiting.")
[docs]
def stop(self):
"""
Stop the Trade Copier gracefully by setting the shutdown event.
"""
if self._running:
self.log_message(
f"Signaling stop for Trade Copier on source account @{self.source.get('login')}..."
)
self._running = False
self.shutdown_event.set()
self.log_message("Trade Copier stopped successfully.")
[docs]
def copier_worker_process(
source_config: dict,
destination_config: dict,
sleeptime: float,
start_time: str,
end_time: str,
/,
custom_logger=None,
shutdown_event=None,
log_queue=None,
):
"""A top-level worker function for handling a single source-to-destination copy task.
This function is the cornerstone of the robust, multi-process architecture. It is
designed to be the `target` of a `multiprocessing.Process`. By being a top-level
function, it avoids pickling issues on Windows and ensures that each copy task
runs in a completely isolated process.
A controller (like a GUI or a master script) should spawn one process with this
target for each destination account it needs to manage.
Args:
source_config (dict): Configuration dictionary for the source account.
Must contain 'login', 'password', 'server', and 'path'.
destination_config (dict): Configuration dictionary for a *single*
destination account.
sleeptime (float): The time in seconds to wait between copy cycles.
start_time (str): The time of day to start copying (e.g., "08:00").
end_time (str): The time of day to stop copying (e.g., "22:00").
custom_logger: An optional custom logger instance.
shutdown_event (multiprocessing.Event): An event object that, when set,
will signal this process to terminate gracefully.
log_queue (multiprocessing.Queue): A queue for sending log messages back
to the parent process in a thread-safe manner.
"""
copier = TradeCopier(
source_config,
[destination_config],
sleeptime=sleeptime,
start_time=start_time,
end_time=end_time,
custom_logger=custom_logger,
shutdown_event=shutdown_event,
log_queue=log_queue,
)
copier.start_copy_process(destination_config)
[docs]
def RunCopier(
source: dict,
destinations: list,
sleeptime: float,
start_time: str,
end_time: str,
/,
custom_logger=None,
shutdown_event=None,
log_queue=None,
):
"""
Initialize and run a TradeCopier instance in a single process.
This function serves as a straightforward wrapper to start a copying session
that handles one source account and one or more destination accounts
sequentially within the same thread. It does not create any new processes itself.
Use Cases
---------
* Simpler, command-line based use cases.
* Scenarios where parallelism is not required.
* As the target for ``RunMultipleCopier``, where each process handles a
full source-to-destinations session.
Parameters
----------
source : dict
Configuration dictionary for the source account.
destinations : list
A list of configuration dictionaries, one for each
destination account to be processed sequentially.
sleeptime : float
The time in seconds to wait after completing a full
cycle through all destinations.
start_time : str
The time of day to start copying (e.g., ``"08:00"``).
end_time : str
The time of day to stop copying (e.g., ``"22:00"``).
custom_logger : logging.Logger, optional
An optional custom logger instance.
shutdown_event : multiprocessing.Event, optional
An event to signal shutdown.
log_queue : multiprocessing.Queue, optional
A queue for log messages.
Returns
-------
None
Runs until stopped via ``shutdown_event`` or external interruption.
"""
copier = TradeCopier(
source,
destinations,
sleeptime=sleeptime,
start_time=start_time,
end_time=end_time,
custom_logger=custom_logger,
shutdown_event=shutdown_event,
log_queue=log_queue,
)
copier.run()
[docs]
def RunMultipleCopier(
accounts: List[dict],
sleeptime: float = 0.01,
start_delay: float = 1.0,
start_time: str = None,
end_time: str = None,
shutdown_event=None,
custom_logger=None,
log_queue=None,
):
"""
Manage multiple, independent trade copying sessions in parallel.
This function acts as a high-level manager that takes a list of account
setups and creates a separate, dedicated process for each one. Each process
is responsible for copying from one source account to its associated list of
destination accounts.
The parallelism occurs at the **source account level**. Within each spawned
process, the destinations for that source are handled sequentially by
``RunCopier``.
Example
-------
An example ``accounts`` structure:
.. code-block:: python
accounts = [
{"source": {...}, "destinations": [{...}, {...}]}, # -> Process 1
{"source": {...}, "destinations": [{...}]} # -> Process 2
]
Parameters
----------
accounts : list of dict
A list of account configurations. Each item must be a dictionary with
a ``source`` key and a ``destinations`` key.
sleeptime : float, optional
The sleep time passed down to each ``RunCopier`` process.
start_delay : float, optional
A delay in seconds between starting each new process.
Helps prevent resource contention by staggering the initialization of
multiple MetaTrader 5 terminals.
start_time : str, optional
The start time passed down to each ``RunCopier`` process.
end_time : str, optional
The end time passed down to each ``RunCopier`` process.
shutdown_event : multiprocessing.Event, optional
An event to signal shutdown to all child processes.
custom_logger : logging.Logger, optional
An optional custom logger instance.
log_queue : multiprocessing.Queue, optional
A queue for aggregating log messages from all child processes.
Returns
-------
None
Runs until stopped via ``shutdown_event`` or external interruption.
"""
processes = []
for account in accounts:
source = account.get("source")
destinations = account.get("destinations")
if not source or not destinations:
logger.warning("Skipping account due to missing source or destinations.")
continue
paths = set([source.get("path")] + [dest.get("path") for dest in destinations])
if len(paths) == 1 and len(destinations) >= 1:
logger.warning(
"Skipping account: source and destination cannot share the same MetaTrader 5 terminal path."
)
continue
logger.info(f"Starting process for source account @{source.get('login')}")
process = mp.Process(
target=RunCopier,
args=(
source,
destinations,
sleeptime,
start_time,
end_time,
),
kwargs=dict(
custom_logger=custom_logger,
shutdown_event=shutdown_event,
log_queue=log_queue,
),
)
processes.append(process)
process.start()
if start_delay:
time.sleep(start_delay)
for process in processes:
process.join()
def auto_convert(value: str) -> Union[bool, None, int, float, str]:
"""Convert string values to appropriate data types"""
if value.lower() in {"true", "false"}: # Boolean
return value.lower() == "true"
elif value.lower() in {"none", "null"}: # None
return None
elif value.isdigit():
return int(value)
try:
return float(value)
except ValueError:
return value
def dict_from_ini(
file_path: str, sections: Optional[Union[str, List[str]]] = None
) -> Dict[str, Any]:
"""Reads an INI file and converts it to a dictionary with proper data types.
Args:
file_path: Path to the INI file to read.
sections: Optional list of sections to read from the INI file.
Returns:
A dictionary containing the INI file contents with proper data types.
"""
try:
config = configparser.ConfigParser(interpolation=None)
config.read(file_path)
except Exception:
raise
ini_dict: Dict[str, Any] = {}
for section in config.sections():
ini_dict[section] = {
key: auto_convert(value) for key, value in config.items(section)
}
if isinstance(sections, str):
try:
return ini_dict[sections]
except KeyError:
raise KeyError(f"{sections} not found in the {file_path} file")
if isinstance(sections, list):
sect_dict: Dict[str, Any] = {}
for section in sections:
try:
sect_dict[section] = ini_dict[section]
except KeyError:
raise KeyError(f"{section} not found in the {file_path} file")
return sect_dict
return ini_dict
def _parse_symbols(section):
symbols: str = section.get("symbols")
symbols = symbols.strip().replace("\n", " ").replace('"""', "")
if symbols in ["all", "*"]:
section["symbols"] = symbols
else:
symbols = get_symbols_from_string(symbols)
section["symbols"] = symbols
def _parse_lots(section):
lots = section.get("value")
if not lots:
raise ValueError("Lot size value must be specified for the selected mode")
lots = get_lots_from_string(lots) if isinstance(lots, str) else lots
section["value"] = lots
[docs]
def config_copier(
source_section: str = None,
dest_sections: str | List[str] = None,
inifile: str | Path = None,
) -> Tuple[dict, List[dict]]:
"""
Read the configuration file and return the source and destination account details.
Args:
inifile (str | Path): The path to the INI configuration file.
source_section (str): The section name of the source account, defaults to "SOURCE".
dest_sections (str | List[str]): The section name(s) of the destination account(s).
Returns:
Tuple[dict, List[dict]]: A tuple containing the source account and a list of destination accounts.
Example:
```python
from pathlib import Path
config_file = ~/.bbstrader/copier/copier.ini
source, destinations = config_copier(config_file, "SOURCE", ["DEST1", "DEST2"])
```
"""
if not inifile:
inifile = Path().home() / ".bbstrader" / "copier" / "copier.ini"
if not inifile.exists() or not inifile.is_file():
raise FileNotFoundError(f"{inifile} not found")
if not source_section:
source_section = "SOURCE"
config = dict_from_ini(inifile)
try:
source = config.pop(source_section)
except KeyError:
raise ValueError(f"Source section {source_section} not found in {inifile}")
dest_sections = dest_sections or config.keys()
if not dest_sections:
raise ValueError("No destination sections found in the configuration file")
destinations = []
if isinstance(dest_sections, str):
dest_sections = [dest_sections]
for dest_section in dest_sections:
try:
section = config[dest_section]
except KeyError:
raise ValueError(
f"Destination section {dest_section} not found in {inifile}"
)
_parse_symbols(section)
_parse_lots(section)
destinations.append(section)
return source, destinations