MCPcopy
hub / github.com/CryptoSignal/Crypto-Signal / get_historical_data

Method get_historical_data

app/exchange.py:42–112  ·  view source on GitHub ↗

Get historical OHLCV for a symbol pair Decorators: retry Args: market_pair (str): Contains the symbol pair to operate on i.e. BURST/BTC exchange (str): Contains the exchange to fetch the historical data from. time_unit (str): A string

(self, market_pair, exchange, time_unit, start_date=None, max_periods=100)

Source from the content-addressed store, hash-verified

40
41 @retry(retry=retry_if_exception_type(ccxt.NetworkError), stop=stop_after_attempt(3))
42 def get_historical_data(self, market_pair, exchange, time_unit, start_date=None, max_periods=100):
43 """Get historical OHLCV for a symbol pair
44
45 Decorators:
46 retry
47
48 Args:
49 market_pair (str): Contains the symbol pair to operate on i.e. BURST/BTC
50 exchange (str): Contains the exchange to fetch the historical data from.
51 time_unit (str): A string specifying the ccxt time unit i.e. 5m or 1d.
52 start_date (int, optional): Timestamp in milliseconds.
53 max_periods (int, optional): Defaults to 100. Maximum number of time periods
54 back to fetch data for.
55
56 Returns:
57 list: Contains a list of lists which contain timestamp, open, high, low, close, volume.
58 """
59
60 try:
61 if time_unit not in self.exchanges[exchange].timeframes:
62 raise ValueError(
63 "{} does not support {} timeframe for OHLCV data. Possible values are: {}".format(
64 exchange,
65 time_unit,
66 list(self.exchanges[exchange].timeframes)
67 )
68 )
69 except AttributeError:
70 self.logger.error(
71 '%s interface does not support timeframe queries! We are unable to fetch data!',
72 exchange
73 )
74 raise AttributeError(sys.exc_info())
75
76 if not start_date:
77 timeframe_regex = re.compile('([0-9]+)([a-zA-Z])')
78 timeframe_matches = timeframe_regex.match(time_unit)
79 time_quantity = timeframe_matches.group(1)
80 time_period = timeframe_matches.group(2)
81
82 timedelta_values = {
83 'm': 'minutes',
84 'h': 'hours',
85 'd': 'days',
86 'w': 'weeks',
87 'M': 'months',
88 'y': 'years'
89 }
90
91 timedelta_args = { timedelta_values[time_period]: int(time_quantity) }
92
93 start_date_delta = timedelta(**timedelta_args)
94
95 max_days_date = datetime.now() - (max_periods * start_date_delta)
96 start_date = int(max_days_date.replace(tzinfo=timezone.utc).timestamp() * 1000)
97
98 historical_data = self.exchanges[exchange].fetch_ohlcv(
99 market_pair,

Callers 1

_get_historical_dataMethod · 0.80

Calls

no outgoing calls

Tested by

no test coverage detected