import os.path
from abc import ABCMeta, abstractmethod
from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import Any, Dict, Generator, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import yfinance as yf
from eodhd import APIClient
from financetoolkit import Toolkit
from numpy.typing import NDArray
from pytz import timezone
from bbstrader.btengine.event import MarketEvent
from bbstrader.config import BBSTRADER_DIR
from bbstrader.metatrader.rates import download_historical_data
__all__ = [
"DataHandler",
"CSVDataHandler",
"MT5DataHandler",
"YFDataHandler",
"EODHDataHandler",
"FMPDataHandler",
]
[docs]
class DataHandler(metaclass=ABCMeta):
"""
One of the goals of an event-driven trading system is to minimise
duplication of code between the backtesting element and the live execution
element. Ideally it would be optimal to utilise the same signal generation
methodology and portfolio management components for both historical testing
and live trading. In order for this to work the Strategy object which generates
the Signals, and the `Portfolio` object which provides Orders based on them,
must utilise an identical interface to a market feed for both historic and live
running.
This motivates the concept of a class hierarchy based on a `DataHandler` object,
which givesall subclasses an interface for providing market data to the remaining
components within thesystem. In this way any subclass data handler can be "swapped out",
without affecting strategy or portfolio calculation.
Specific example subclasses could include `HistoricCSVDataHandler`,
`YFinanceDataHandler`, `FMPDataHandler`, `IBMarketFeedDataHandler` etc.
"""
@property
def symbols(self) -> List[str]:
pass
@property
def data(self) -> Dict[str, pd.DataFrame]:
pass
@property
def labels(self) -> List[str]:
pass
@property
def index(self) -> Union[str, List[str]]:
pass
[docs]
@abstractmethod
def get_latest_bar(self, symbol: str) -> pd.Series:
"""
Returns the last bar updated.
"""
raise NotImplementedError("Should implement get_latest_bar()")
[docs]
@abstractmethod
def get_latest_bars(
self, symbol: str, N: int = 1, df: bool = True
) -> Union[pd.DataFrame, List[pd.Series]]:
"""
Returns the last N bars updated.
"""
raise NotImplementedError("Should implement get_latest_bars()")
[docs]
@abstractmethod
def get_latest_bar_datetime(self, symbol: str) -> Union[datetime, pd.Timestamp]:
"""
Returns a Python datetime object for the last bar.
"""
raise NotImplementedError("Should implement get_latest_bar_datetime()")
[docs]
@abstractmethod
def get_latest_bar_value(self, symbol: str, val_type: str) -> float:
"""
Returns one of the Open, High, Low, Close, Adj Close, Volume or Returns
from the last bar.
"""
raise NotImplementedError("Should implement get_latest_bar_value()")
[docs]
@abstractmethod
def get_latest_bars_values(self, symbol: str, val_type: str, N: int = 1) -> NDArray:
"""
Returns the last N bar values from the
latest_symbol list, or N-k if less available.
"""
raise NotImplementedError("Should implement get_latest_bars_values()")
[docs]
@abstractmethod
def update_bars(self) -> None:
"""
Pushes the latest bars to the bars_queue for each symbol
in a tuple OHLCVI format: (datetime, Open, High, Low,
Close, Adj Close, Volume, Retruns).
"""
raise NotImplementedError("Should implement update_bars()")
class BaseCSVDataHandler(DataHandler):
"""
Base class for handling data loaded from CSV files.
"""
def __init__(
self,
events: "Queue[MarketEvent]",
symbol_list: List[str],
csv_dir: str,
columns: Optional[List[str]] = None,
index_col: Union[str, int, List[str], List[int]] = 0,
) -> None:
"""
Initialises the data handler by requesting the location of the CSV files
and a list of symbols.
Args:
events : The Event Queue.
symbol_list : A list of symbol strings.
csv_dir : Absolute directory path to the CSV files.
columns : List of column names to use for the data.
index_col : Column to use as the index.
"""
self.events = events
self.symbol_list = symbol_list
self.csv_dir = csv_dir
self.columns = columns
self.index_col = index_col
self.symbol_data: Dict[str, Union[pd.DataFrame, Generator]] = {}
self.latest_symbol_data: Dict[str, List[Any]] = {}
self.continue_backtest = True
self._index: Optional[Union[str, List[str]]] = None
self._load_and_process_data()
@property
def symbols(self) -> List[str]:
return self.symbol_list
@property
def data(self) -> Dict[str, pd.DataFrame]:
return self.symbol_data # type: ignore
@property
def datadir(self) -> str:
return self.csv_dir
@property
def labels(self) -> List[str]:
return self.columns # type: ignore
@property
def index(self) -> Union[str, List[str]]:
return self._index # type: ignore
def _load_and_process_data(self) -> None:
"""
Opens the CSV files from the data directory, converting
them into pandas DataFrames within a symbol dictionary.
"""
default_names = pd.read_csv(
os.path.join(self.csv_dir, f"{self.symbol_list[0]}.csv")
).columns.to_list()
new_names = self.columns or default_names
new_names = [name.strip().lower().replace(" ", "_") for name in new_names]
self.columns = new_names
assert "adj_close" in new_names or "close" in new_names, (
"Column names must contain 'Adj Close' and 'Close' or adj_close and close"
)
comb_index = None
for s in self.symbol_list:
# Load the CSV file with no header information,
# indexed on date
self.symbol_data[s] = pd.read_csv(
os.path.join(self.csv_dir, f"{s}.csv"),
header=0,
index_col=self.index_col,
parse_dates=True,
names=new_names,
)
self.symbol_data[s].sort_index(inplace=True)
# Combine the index to pad forward values
if comb_index is None:
comb_index = self.symbol_data[s].index
elif len(self.symbol_data[s].index) > len(comb_index):
comb_index = self.symbol_data[s].index
# Set the latest symbol_data to None
self.latest_symbol_data[s] = []
# Reindex the dataframes
for s in self.symbol_list:
self.symbol_data[s] = self.symbol_data[s].reindex( # type: ignore
index=comb_index, method="pad"
)
if "adj_close" not in new_names:
self.columns.append("adj_close")
self.symbol_data[s]["adj_close"] = self.symbol_data[s]["close"] # type: ignore
self.symbol_data[s]["returns"] = ( # type: ignore
self.symbol_data[s][ # type: ignore
"adj_close" if "adj_close" in new_names else "close"
]
.pct_change()
.dropna()
)
self._index = self.symbol_data[s].index.name # type: ignore
self.symbol_data[s].to_csv(os.path.join(self.csv_dir, f"{s}.csv")) # type: ignore
if self.events is not None:
self.symbol_data[s] = self.symbol_data[s].iterrows() # type: ignore
def _get_new_bar(self, symbol: str) -> Generator[Tuple[Any, Any], Any, None]:
"""
Returns the latest bar from the data feed.
"""
for b in self.symbol_data[symbol]: # type: ignore
yield b
def get_latest_bar(self, symbol: str) -> pd.Series:
"""
Returns the last bar from the latest_symbol list.
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print(f"{symbol} not available in the historical data set.")
raise
else:
return bars_list[-1]
def get_latest_bars(
self, symbol: str, N: int = 1, df: bool = True
) -> Union[pd.DataFrame, List[pd.Series]]:
"""
Returns the last N bars from the latest_symbol list,
or N-k if less available.
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print(f"{symbol} not available in the historical data set.")
raise
else:
if df:
df_ = pd.DataFrame([bar[1] for bar in bars_list[-N:]])
df_.index.name = self._index # type: ignore
return df_
return bars_list[-N:]
def get_latest_bar_datetime(self, symbol: str) -> Union[datetime, pd.Timestamp]:
"""
Returns a Python datetime object for the last bar.
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print(f"{symbol} not available in the historical data set.")
raise
else:
return bars_list[-1][0]
def get_latest_bars_datetime(
self, symbol: str, N: int = 1
) -> List[Union[datetime, pd.Timestamp]]:
"""
Returns a list of Python datetime objects for the last N bars.
"""
try:
bars_list = self.get_latest_bars(symbol, N) # type: ignore
except KeyError:
print(f"{symbol} not available in the historical data set for .")
raise
else:
return [b[0] for b in bars_list] # type: ignore
def get_latest_bar_value(self, symbol: str, val_type: str) -> float:
"""
Returns one of the Open, High, Low, Close, Volume or OI
values from the pandas Bar series object.
"""
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print(f"{symbol} not available in the historical data set.")
raise
else:
try:
return getattr(bars_list[-1][1], val_type)
except AttributeError:
print(
f"Value type {val_type} not available in the historical data set for {symbol}."
)
raise
def get_latest_bars_values(self, symbol: str, val_type: str, N: int = 1) -> NDArray:
"""
Returns the last N bar values from the
latest_symbol list, or N-k if less available.
"""
try:
bars_list = self.get_latest_bars(symbol, N, df=False)
except KeyError:
print(f"{symbol} not available in the historical data set.")
raise
else:
try:
return np.array([getattr(b[1], val_type) for b in bars_list])
except AttributeError:
print(
f"Value type {val_type} not available in the historical data set."
)
raise
def update_bars(self) -> None:
"""
Pushes the latest bar to the latest_symbol_data structure
for all symbols in the symbol list.
"""
for s in self.symbol_list:
try:
bar = next(self._get_new_bar(s))
except StopIteration:
self.continue_backtest = False
else:
if bar is not None:
self.latest_symbol_data[s].append(bar)
self.events.put(MarketEvent())
[docs]
class CSVDataHandler(BaseCSVDataHandler):
"""
`CSVDataHandler` is designed to read CSV files for
each requested symbol from disk and provide an interface
to obtain the "latest" bar in a manner identical to a live
trading interface.
This class is useful when you have your own data or you want
to cutomize specific data in some form based on your `Strategy()` .
"""
def __init__(
self, events: "Queue[MarketEvent]", symbol_list: List[str], **kwargs: Any
) -> None:
"""
Initialises the historic data handler by requesting
the location of the CSV files and a list of symbols.
It will be assumed that all files are of the form
`symbol.csv`, where `symbol` is a string in the list.
Args:
events (Queue): The Event Queue.
symbol_list (List[str]): A list of symbol strings.
csv_dir (str): Absolute directory path to the CSV files.
NOTE:
All csv fille can be stored in 'Home/.bbstrader/data/csv_data'
"""
csv_dir = kwargs.get("csv_dir")
csv_dir = csv_dir or BBSTRADER_DIR / "data" / "csv_data"
super().__init__(
events,
symbol_list,
str(csv_dir),
columns=kwargs.get("columns"),
index_col=kwargs.get("index_col", 0),
)
[docs]
class MT5DataHandler(BaseCSVDataHandler):
"""
Downloads historical data from MetaTrader 5 (MT5) and provides
an interface for accessing this data bar-by-bar, simulating
a live market feed for backtesting.
Data is downloaded from MT5, saved as CSV files, and then loaded
using the functionality inherited from `BaseCSVDataHandler`.
This class is useful when you need to get data from specific broker
for different time frames.
"""
def __init__(
self, events: "Queue[MarketEvent]", symbol_list: List[str], **kwargs: Any
) -> None:
"""
Args:
events (Queue): The Event Queue for passing market events.
symbol_list (List[str]): A list of symbol strings to download data for.
**kwargs: Keyword arguments for data retrieval:
time_frame (str): MT5 time frame (e.g., 'D1' for daily).
mt5_start (datetime): Start date for historical data.
mt5_end (datetime): End date for historical data.
data_dir (str): Directory for storing data .
Note:
Requires a working connection to an MT5 terminal.
See `bbstrader.metatrader.rates.Rates` for other arguments.
See `bbstrader.btengine.data.BaseCSVDataHandler` for other arguments.
"""
self.tf = kwargs.get("time_frame", "D1")
self.start = kwargs.get("mt5_start", datetime(2000, 1, 1))
self.end = kwargs.get("mt5_end", datetime.now())
self.use_utc = kwargs.get("use_utc", False)
self.filer = kwargs.get("filter", False)
self.fill_na = kwargs.get("fill_na", False)
self.lower_cols = kwargs.get("lower_cols", True)
self.data_dir = kwargs.get("data_dir")
self.symbol_list = symbol_list
self.kwargs = kwargs
self.kwargs["backtest"] = (
True # Ensure backtest mode is set to avoid InvalidBroker errors
)
csv_dir = self._download_and_cache_data(self.data_dir)
super().__init__(
events,
symbol_list,
str(csv_dir),
columns=kwargs.get("columns"),
index_col=kwargs.get("index_col", 0),
)
def _download_and_cache_data(self, cache_dir: Optional[str]) -> Path:
data_dir = (
Path(cache_dir) if cache_dir else BBSTRADER_DIR / "data" / "mt5" / self.tf
)
data_dir.mkdir(parents=True, exist_ok=True)
for symbol in self.symbol_list:
try:
data = download_historical_data(
symbol=symbol,
timeframe=self.tf,
date_from=self.start,
date_to=self.end,
utc=self.use_utc,
filter=self.filer,
fill_na=self.fill_na,
lower_colnames=self.lower_cols,
**self.kwargs,
)
if data is None:
raise ValueError(f"No data found for {symbol}")
data.to_csv(data_dir / f"{symbol}.csv")
except Exception as e:
raise ValueError(f"Error downloading {symbol}: {e}")
return data_dir
[docs]
class YFDataHandler(BaseCSVDataHandler):
"""
Downloads historical data from Yahoo Finance and provides
an interface for accessing this data bar-by-bar, simulating
a live market feed for backtesting.
Data is fetched using the `yfinance` library and optionally cached
to disk to speed up subsequent runs.
This class is useful when working with historical daily prices.
"""
def __init__(
self, events: "Queue[MarketEvent]", symbol_list: List[str], **kwargs: Any
) -> None:
"""
Args:
events (Queue): The Event Queue for passing market events.
symbol_list (list[str]): List of symbols to download data for.
yf_start (str): Start date for historical data (YYYY-MM-DD).
yf_end (str): End date for historical data (YYYY-MM-DD).
data_dir (str, optional): Directory for caching data .
Note:
See `bbstrader.btengine.data.BaseCSVDataHandler` for other arguments.
"""
self.symbol_list = symbol_list
self.start_date = kwargs.get("yf_start")
self.end_date = kwargs.get("yf_end", datetime.now())
self.cache_dir = kwargs.get("data_dir")
csv_dir = self._download_and_cache_data(self.cache_dir)
super().__init__(
events,
symbol_list,
str(csv_dir),
columns=kwargs.get("columns"),
index_col=kwargs.get("index_col", 0),
)
def _download_and_cache_data(self, cache_dir: Optional[str]) -> str:
"""Downloads and caches historical data as CSV files."""
_cache_dir = cache_dir or BBSTRADER_DIR / "data" / "yfinance" / "daily"
os.makedirs(_cache_dir, exist_ok=True)
for symbol in self.symbol_list:
filepath = os.path.join(_cache_dir, f"{symbol}.csv")
try:
data = yf.download(
symbol,
start=self.start_date,
end=self.end_date,
multi_level_index=False,
auto_adjust=True,
progress=False,
)
if "Adj Close" not in data.columns:
data["Adj Close"] = data["Close"]
if data.empty:
raise ValueError(f"No data found for {symbol}")
data.to_csv(filepath)
except Exception as e:
raise ValueError(f"Error downloading {symbol}: {e}")
return str(_cache_dir)
[docs]
class EODHDataHandler(BaseCSVDataHandler):
"""
Downloads historical data from EOD Historical Data.
Data is fetched using the `eodhd` library.
To use this class, you need to sign up for an API key at
https://eodhistoricaldata.com/ and provide the key as an argument.
"""
def __init__(
self, events: "Queue[MarketEvent]", symbol_list: List[str], **kwargs: Any
) -> None:
"""
Args:
events (Queue): The Event Queue for passing market events.
symbol_list (list[str]): List of symbols to download data for.
eodhd_start (str): Start date for historical data (YYYY-MM-DD).
eodhd_end (str): End date for historical data (YYYY-MM-DD).
data_dir (str, optional): Directory for caching data .
eodhd_period (str, optional): Time period for historical data (e.g., 'd', 'w', 'm', '1m', '5m', '1h').
eodhd_api_key (str, optional): API key for EOD Historical Data.
Note:
See `bbstrader.btengine.data.BaseCSVDataHandler` for other arguments.
"""
self.symbol_list = symbol_list
self.start_date = kwargs.get("eodhd_start")
self.end_date = kwargs.get("eodhd_end", datetime.now().strftime("%Y-%m-%d"))
self.period = kwargs.get("eodhd_period", "d")
self.cache_dir = kwargs.get("data_dir")
self.__api_key = kwargs.get("eodhd_api_key", "demo")
csv_dir = self._download_and_cache_data(self.cache_dir)
super().__init__(
events,
symbol_list,
str(csv_dir),
columns=kwargs.get("columns"),
index_col=kwargs.get("index_col", 0),
)
def _get_data(
self, symbol: str, period: str
) -> Union[pd.DataFrame, List[Dict[str, Any]]]:
if not self.__api_key:
raise ValueError("API key is required for EODHD data.")
client = APIClient(api_key=self.__api_key)
if period in ["d", "w", "m"]:
return client.get_historical_data(
symbol=symbol,
interval=period,
iso8601_start=self.start_date,
iso8601_end=self.end_date,
)
elif period in ["1m", "5m", "1h"]:
hms = " 00:00:00"
fmt = "%Y-%m-%d %H:%M:%S"
startdt = datetime.strptime(str(self.start_date) + hms, fmt)
enddt = datetime.strptime(str(self.end_date) + hms, fmt)
startdt = startdt.replace(tzinfo=timezone("UTC"))
enddt = enddt.replace(tzinfo=timezone("UTC"))
unix_start = int(startdt.timestamp())
unix_end = int(enddt.timestamp())
return client.get_intraday_historical_data(
symbol=symbol,
interval=period,
from_unix_time=unix_start,
to_unix_time=unix_end,
)
raise ValueError(f"Unsupported period: {period}")
def _format_data(
self, data: Union[List[Dict[str, Any]], pd.DataFrame]
) -> pd.DataFrame:
if isinstance(data, pd.DataFrame):
if data.empty or len(data) == 0:
raise ValueError("No data found.")
df = data.drop(labels=["symbol", "interval"], axis=1)
df = df.rename(columns={"adjusted_close": "adj_close"})
return df
elif isinstance(data, list):
if not data or len(data) == 0:
raise ValueError("No data found.")
df = pd.DataFrame(data)
df = df.drop(columns=["timestamp", "gmtoffset"], axis=1)
df = df.rename(columns={"datetime": "date"})
df["adj_close"] = df["close"]
df = df[["date", "open", "high", "low", "close", "adj_close", "volume"]]
df.date = pd.to_datetime(df.date)
df = df.set_index("date")
return df
raise TypeError(f"Unsupported data type: {type(data)}")
def _download_and_cache_data(self, cache_dir: Optional[str]) -> str:
"""Downloads and caches historical data as CSV files."""
_cache_dir = cache_dir or BBSTRADER_DIR / "data" / "eodhd" / self.period
os.makedirs(_cache_dir, exist_ok=True)
for symbol in self.symbol_list:
filepath = os.path.join(_cache_dir, f"{symbol}.csv")
try:
data = self._get_data(symbol, self.period)
data = self._format_data(data)
data.to_csv(filepath)
except Exception as e:
raise ValueError(f"Error downloading {symbol}: {e}")
return str(_cache_dir)
[docs]
class FMPDataHandler(BaseCSVDataHandler):
"""
Downloads historical data from Financial Modeling Prep (FMP).
Data is fetched using the `financetoolkit` library.
To use this class, you need to sign up for an API key at
https://financialmodelingprep.com/developer/docs/pricing and
provide the key as an argument.
"""
def __init__(
self, events: "Queue[MarketEvent]", symbol_list: List[str], **kwargs: Any
) -> None:
"""
Args:
events (Queue): The Event Queue for passing market events.
symbol_list (list[str]): List of symbols to download data for.
fmp_start (str): Start date for historical data (YYYY-MM-DD).
fmp_end (str): End date for historical data (YYYY-MM-DD).
data_dir (str, optional): Directory for caching data .
fmp_period (str, optional): Time period for historical data
(e.g. daily, weekly, monthly, quarterly, yearly, "1min", "5min", "15min", "30min", "1hour").
fmp_api_key (str): API key for Financial Modeling Prep.
Note:
See `bbstrader.btengine.data.BaseCSVDataHandler` for other arguments.
"""
self.symbol_list = symbol_list
self.start_date = kwargs.get("fmp_start")
self.end_date = kwargs.get("fmp_end", datetime.now().strftime("%Y-%m-%d"))
self.period = kwargs.get("fmp_period", "daily")
self.cache_dir = kwargs.get("data_dir")
self.__api_key = kwargs.get("fmp_api_key")
csv_dir = self._download_and_cache_data(self.cache_dir)
super().__init__(
events,
symbol_list,
str(csv_dir),
columns=kwargs.get("columns"),
index_col=kwargs.get("index_col", 0),
)
def _get_data(self, symbol: str, period: str) -> pd.DataFrame:
if not self.__api_key:
raise ValueError("API key is required for FMP data.")
toolkit = Toolkit(
symbol,
api_key=self.__api_key,
start_date=self.start_date,
end_date=self.end_date,
benchmark_ticker=None,
progress_bar=False,
)
if period in ["daily", "weekly", "monthly", "quarterly", "yearly"]:
return toolkit.get_historical_data(period=period, progress_bar=False)
elif period in ["1min", "5min", "15min", "30min", "1hour"]:
return toolkit.get_intraday_data(period=period, progress_bar=False)
raise ValueError(f"Unsupported period: {period}")
def _format_data(self, data: pd.DataFrame, period: str) -> pd.DataFrame:
if data.empty or len(data) == 0:
raise ValueError("No data found.")
if period[0].isnumeric():
data = data.drop(
columns=["Return", "Volatility", "Cumulative Return"], axis=1
)
else:
data = data.drop(
columns=[
"Dividends",
"Return",
"Volatility",
"Excess Return",
"Excess Volatility",
"Cumulative Return",
],
axis=1,
)
data = data.reset_index()
if "Adj Close" not in data.columns:
data["Adj Close"] = data["Close"]
data["date"] = data["date"].dt.to_timestamp() # type: ignore
data["date"] = pd.to_datetime(data["date"])
data.set_index("date", inplace=True)
return data
def _download_and_cache_data(self, cache_dir: Optional[str]) -> str:
"""Downloads and caches historical data as CSV files."""
_cache_dir = cache_dir or BBSTRADER_DIR / "data" / "fmp" / self.period
os.makedirs(_cache_dir, exist_ok=True)
for symbol in self.symbol_list:
filepath = os.path.join(_cache_dir, f"{symbol}.csv")
try:
data = self._get_data(symbol, self.period)
data = self._format_data(data, self.period)
data.to_csv(filepath)
except Exception as e:
raise ValueError(f"Error downloading {symbol}: {e}")
return str(_cache_dir)