Source code for bbstrader.trading.strategy

from datetime import datetime
from enum import IntEnum
from queue import Queue
from typing import Any, Callable, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from loguru import logger

from bbstrader.api.metatrader_client import TradeOrder  # type: ignore
from bbstrader.btengine.event import FillEvent, SignalEvent
from bbstrader.config import BBSTRADER_DIR
from bbstrader.core.strategy import (
    BaseStrategy,
    TradeAction,
    TradeSignal,
    TradingMode,
    generate_signal,
)
from bbstrader.metatrader import Account, Rates

logger.add(
    f"{BBSTRADER_DIR}/logs/strategy.log",
    enqueue=True,
    level="INFO",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name} | {message}",
)

__all__ = [
    "LiveStrategy",
]


class SignalType(IntEnum):
    BUY = 0
    SELL = 1
    EXIT_LONG = 2
    EXIT_SHORT = 3
    EXIT_ALL_POSITIONS = 4
    EXIT_ALL_ORDERS = 5
    EXIT_STOP = 6
    EXIT_LIMIT = 7


[docs] class LiveStrategy(BaseStrategy): """ Strategy implementation for Live Trading. Relies on the `Account` class for state (orders, positions, cash) and `Rates` for data. """ events: "Queue[Union[SignalEvent, FillEvent]]" def __init__( self, symbol_list: List[str], **kwargs: Any, ) -> None: """ Initialize the `LiveStrategy` object. Args: symbol_list : The list of symbols for the strategy. **kwargs : Additional keyword arguments for other classes (e.g, Portfolio, ExecutionHandler). - max_trades : The maximum number of trades allowed per symbol. - time_frame : The time frame for the strategy. - logger : The logger object for the strategy. """ super().__init__(symbol_list, **kwargs) self.mode = TradingMode.LIVE @property def account(self) -> Account: """Create or access the MT5 Account.""" return Account(**self.kwargs) @property def cash(self) -> float: return self.account.equity @property def orders(self) -> List[TradeOrder]: """Returns active orders from the Broker.""" return self.account.get_orders() or [] @property def positions(self) -> List[Any]: """Returns open positions from the Broker.""" return self.account.get_positions() or []
[docs] def get_asset_values( self, symbol_list: List[str], window: int, value_type: str = "returns", array: bool = True, **kwargs, ) -> Optional[Dict[str, Union[np.ndarray, pd.Series]]]: asset_values: Dict[str, Union[np.ndarray, pd.Series]] = {} for asset in symbol_list: rates = Rates(asset, timeframe=self.tf, count=window + 1, **self.kwargs) if array: values = getattr(rates, value_type).to_numpy() asset_values[asset] = values[~np.isnan(values)] else: values = getattr(rates, value_type) asset_values[asset] = values if all(len(values) >= window for values in asset_values.values()): return {a: v[-window:] for a, v in asset_values.items()} if kwargs.get("error") == "raise": raise ValueError("Not enough data to calculate the values.") elif kwargs.get("error") == "ignore": return asset_values return None
[docs] def signal( self, signal: int, symbol: str, sl: float = None, tp: float = None ) -> TradeSignal: """ Generate a ``TradeSignal`` object based on the signal value. Parameters ---------- signal : int An integer value representing the signal type: * 0: BUY * 1: SELL * 2: EXIT_LONG * 3: EXIT_SHORT * 4: EXIT_ALL_POSITIONS * 5: EXIT_ALL_ORDERS * 6: EXIT_STOP * 7: EXIT_LIMIT symbol : str The symbol for the trade. Returns ------- TradeSignal A ``TradeSignal`` object representing the trade signal. Raises ------ ValueError If the signal value is not between 0 and 7. Notes ----- This generates only common signals. For more complex signals, use ``generate_signal`` directly. """ signal_id = getattr(self, "id", getattr(self, "ID", None)) if signal_id is None: raise ValueError("Strategy ID not set") action_map = { SignalType.BUY: TradeAction.BUY, SignalType.SELL: TradeAction.SELL, SignalType.EXIT_LONG: TradeAction.EXIT_LONG, SignalType.EXIT_SHORT: TradeAction.EXIT_SHORT, SignalType.EXIT_ALL_POSITIONS: TradeAction.EXIT_ALL_POSITIONS, SignalType.EXIT_ALL_ORDERS: TradeAction.EXIT_ALL_ORDERS, SignalType.EXIT_STOP: TradeAction.EXIT_STOP, SignalType.EXIT_LIMIT: TradeAction.EXIT_LIMIT, } try: action = action_map[SignalType(signal)] except (ValueError, KeyError): raise ValueError(f"Invalid signal value: {signal}") kwargs = ( {"sl": sl, "tp": tp} if action in (TradeAction.BUY, TradeAction.SELL) else {} ) return generate_signal(signal_id, symbol, action, **kwargs)
[docs] def ispositions( self, symbol: str, strategy_id: int, position: int, max_trades: int, one_true: bool = False, ) -> bool: """ This function is use for live trading to check if there are open positions for a given symbol and strategy. It is used to prevent opening more trades than the maximum allowed trades per symbol. Args: symbol : The symbol for the trade. strategy_id : The unique identifier for the strategy. position : The position type (1: short, 0: long). max_trades : The maximum number of trades allowed per symbol. one_true : If True, return True if there is at least one open position. account : The `bbstrader.metatrader.Account` object for the strategy. Returns: bool : True if there are open positions, False otherwise """ positions = self.account.get_positions(symbol=symbol) if positions is not None: open_positions = [ pos.ticket for pos in positions if pos.type == position and pos.magic == strategy_id ] if one_true: return len(open_positions) in range(1, max_trades + 1) return len(open_positions) >= max_trades return False
[docs] def get_positions_prices( self, symbol: str, strategy_id: int, position: int, ) -> np.ndarray: """ Get the buy or sell prices for open positions of a given symbol and strategy. Args: symbol : The symbol for the trade. strategy_id : The unique identifier for the strategy. position : The position type (1: short, 0: long). account : The `bbstrader.metatrader.Account` object for the strategy. Returns: prices : numpy array of buy or sell prices for open positions if any or an empty array. """ positions = self.account.get_positions(symbol=symbol) if positions is not None: prices = np.array( [ pos.price_open for pos in positions if pos.type == position and pos.magic == strategy_id ] ) return prices return np.array([])
[docs] def get_active_orders( self, symbol: str, strategy_id: int, order_type: Optional[int] = None ) -> List[TradeOrder]: """ Get the active orders for a given symbol and strategy. Args: symbol : The symbol for the trade. strategy_id : The unique identifier for the strategy. order_type : The type of order to filter by (optional): "BUY_LIMIT": 2 "SELL_LIMIT": 3 "BUY_STOP": 4 "SELL_STOP": 5 "BUY_STOP_LIMIT": 6 "SELL_STOP_LIMIT": 7 Returns: List[TradeOrder] : A list of active orders for the given symbol and strategy. """ all_orders = self.orders orders = [ o for o in all_orders if isinstance(o, TradeOrder) and o.symbol == symbol and o.magic == strategy_id ] if order_type is not None and len(orders) > 0: orders = [o for o in orders if o.type == order_type] return orders
[docs] def exit_positions( self, position: int, prices: np.ndarray, asset: str, th: float = 0.01 ) -> bool: """Logic to determine if positions should be exited based on threshold.""" if len(prices) == 0: return False tick_info = self.account.get_tick_info(asset) if tick_info is None: return False bid, ask = tick_info.bid, tick_info.ask price = None if len(prices) == 1: price = prices[0] elif len(prices) in range(2, self.max_trades[asset] + 1): price = np.mean(prices) if price is not None: if position == 0: # Long exit check return self.calculate_pct_change(ask, price) >= th elif position == 1: # Short exit check return self.calculate_pct_change(bid, price) <= -th return False
[docs] def send_trade_report(self, perf_analyzer: Callable, **kwargs: Any) -> None: """ Generates and sends a trade report message containing performance metrics for the current strategy. This method retrieves the trade history for the current account, filters it by the strategy's ID, computes performance metrics using the provided `perf_analyzer` callable, and formats the results into a message. The message includes account information, strategy details, a timestamp, and performance metrics. The message is then sent via Telegram using the specified bot token and chat ID. Args: perf_analyzer (Callable): A function or callable object that takes the filtered trade history (as a DataFrame) and additional keyword arguments, and returns a DataFrame of performance metrics. **kwargs: Additional keyword arguments, which may include - Any other param requires by ``perf_analyzer`` """ from bbstrader.trading.utils import send_message history = self.account.get_trades_history() if history is None or history.empty: self.logger.warning("No trades found on this account.") return ID = getattr(self, "id", None) or getattr(self, "ID") history = history[history["magic"] == ID] performance = perf_analyzer(history, **kwargs) if performance.empty: self.logger.warning("No trades found for the current strategy.") return account_name = self.kwargs.get("account", "MT5 Account") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") header = ( f"TRADE REPORT\n\n" f"ACCOUNT: {account_name}\n" f"STRATEGY: {self.NAME}\n" f"ID: {ID}\n" f"DESCRIPTION: {self.DESCRIPTION}\n" f"TIMESTAMP: {timestamp}\n\n" f"📊 PERFORMANCE:\n" ) metrics = performance.iloc[0].to_dict() lines = [] for key, value in metrics.items(): if isinstance(value, float): value = round(value, 4) lines.append(f"{key:<15}: {value}") performance_str = "\n".join(lines) message = f"{header}{performance_str}" send_message( message=message, telegram=True, token=self.kwargs.get("bot_token"), chat_id=self.kwargs.get("chat_id"), )