timeframe-converter

PHOTO EMBED

Sun Dec 04 2022 21:44:14 GMT+0000 (Coordinated Universal Time)

Saved by @savabeh191 #0

"""This is a finantial library useful to convert Candle Data from
financial time series datasets (Open,Close, High, Low, Volume).
It is built on Pandas and Numpy.
.. moduleauthor:: Joao Pedro Aguilera Cardoso
"""

import pandas as pd
from datetime import datetime


def convertcandle(
        time: pd.Series,
        open: pd.Series,
        high: pd.Series,
        low: pd.Series,
        close: pd.Series,
        timeframe: str,
        fromtime: str,
        totime: str,
        dtformat: str) -> pd.DataFrame:
    """OHLC Candle Converter
    Args:
        time(pandas.Series): dataset 'Time' Column.
        close(pandas.Series): dataset 'Close' column.
        high(pandas.Series): dataset 'High' column.
        low(pandas.Series): dataset 'Low' column.
        timeframe(str): output candle time Period (see reference above).
        fromtime(datetime): begin of conversion.
        totime(datetime): end of conversion.
        dtformat(str): string of all data input formats
    timeframe accepted output:
        ['1m', '5m', '15m', '30m', '1h', '2h', '3h', '4h', '6h', '12h', '1d', '1w', '1M', '1y']
    """

    def get_input_tf(time, fmt):
        first = datetime.strptime(time[0], fmt)
        second = datetime.strptime(time[1], fmt)

        tf = (second - first)
        return tf

    def check_tfs(dictframes, timeframe, time, dtformat):
        """this module verify that input is smaller than output."""
        if dictframes[timeframe][1] == 'M':
            dictframes[timeframe][0] = 32
            dictframes[timeframe][1] = 'd'
        if dictframes[timeframe][1] == 'y':
            dictframes[timeframe][0] = 370
            dictframes[timeframe][1] = 'd'

        input_tf = get_input_tf(time, dtformat)
        output_tf = pd.to_timedelta(dictframes[timeframe][0], unit=dictframes[timeframe][1])

        if input_tf >= output_tf:
            raise ValueError("Output timeframe must be bigger than input timeframe.")
        else:
            return input_tf

    def get_candle_times(time, timeframe, fromtime, totime, fmt):
        """This function will generate the time series for the output candle dataframe"""
        time_lst = time.tolist()
        fromtime = datetime.strptime(fromtime, fmt)
        totime = datetime.strptime(totime, fmt)
        for t in time_lst:
            t = datetime.strptime(t, fmt)

            if t >= fromtime:
                if timeframe[1] == 'm' and timeframe[0] < 60:  # minutely tfs
                    if t.minute % timeframe[0] == 0 and t.second == 0:
                        """You found the first candle time"""
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            t = t + timedelta
                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df

                if timeframe[1] == 'm' and timeframe[0] > 60:  # hourly tfs
                    if t.hour % (timeframe[0] / 60) == 0 and t.minute == 0 and t.second == 0:
                        """You found the first candle time"""
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            t = t + timedelta
                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df
                    pass

                if timeframe[1] == 'd' and timeframe[0] == 1:  # daily tf
                    if t.hour == 0 and t.minute == 0 and t.second == 0:
                        """You found the first candle time"""
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            t = t + timedelta
                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df

                if timeframe[1] == 'w' and timeframe[0] < 60:  # weekly tf
                    if t.weekday() == 0 and t.hour == 0 and t.minute == 0 and t.second == 0:
                        """You found the first candle time"""
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            t = t + timedelta
                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df

                if timeframe[1] == 'd' and timeframe[0] < 40:  # montly tf
                    if t.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
                        """You found the first candle time"""
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            if t.month < 12:
                                t = t.replace(month=t.month + 1)
                            else:
                                t = t.replace(month=1, year=t.year + 1)

                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df

                if timeframe[1] == 'd' and timeframe[0] > 300:  # yearly tf
                    if t.month == 1 and t.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
                        new_time_lst = []
                        timedelta = pd.to_timedelta(timeframe[0], unit=timeframe[1])
                        while t < totime + timedelta:
                            str_t = datetime.strftime(t, fmt)
                            new_time_lst.append(str_t)
                            t = t.replace(year=t.year + 1)
                        else:
                            time_df = pd.Series(new_time_lst)
                            return time_df

    def get_candle_o_h_l_c(out_time, in_time, in_open, in_high, in_low, in_close, fromtime, totime, fmt):
        """The open value of each new candle will be the same value as the open from the last candle"""
        in_df = pd.DataFrame()
        in_df['time'] = in_time
        in_df['open'] = in_open
        in_df['high'] = in_high
        in_df['low'] = in_low
        in_df['close'] = in_close

        out_df = pd.DataFrame(columns=['time', 'open', 'high', 'low', 'close'])
        out_df['time'] = out_time

        open_lst = []
        out_df_time_lst = out_df.values.tolist()
        in_lst = in_df.values.tolist()
        out_cndl_lst = []
        c = 0
        i = 0
        while i < len(in_lst):
            in_row = in_lst[i]
            if datetime.strptime(fromtime, fmt) <= datetime.strptime(in_row[0], fmt):
                if datetime.strptime(in_row[0], fmt) < datetime.strptime(totime, fmt):
                    if in_row[0] == out_df_time_lst[c][0]:
                        time = out_df_time_lst[c][0]
                        open = in_row[1]
                        j = i
                        high_lst = []
                        low_lst = []
                        close = 0
                        while (c + 1) < len(out_df) and j < len(in_lst) and in_lst[j][0] != out_df_time_lst[c + 1][0]:
                            high_lst.append(in_lst[j][2])
                            low_lst.append(in_lst[j][3])
                            close = in_lst[j][4]
                            j += 1
                        out_cndl_lst.append(
                            [
                                time,
                                open,
                                max(high_lst),
                                min(low_lst),
                                close
                            ]
                        )
                        c += 1
                        i = j - 1


                else:
                    out_df = pd.DataFrame(out_cndl_lst, columns=['time', 'open', 'high', 'low', 'close'])
                    return out_df
            i += 1

    dictframes = {'1m': [1, 'm'], '5m': [5, 'm'], '15m': [15, 'm'], '30m': [30, 'm'], '1h': [60, 'm'],
                  '2h': [120, 'm'], '3h': [180, 'm'], '4h': [240, 'm'], '6h': [360, 'm'], '12h': [720, 'm'],
                  '1d': [1, 'd'], '1w': [1, 'w'], '1M': [1, 'M'], '1y': [1, 'y'],
                  }

    input_tf = check_tfs(dictframes, timeframe, time, dtformat)

    new_cndl_times = get_candle_times(time, dictframes[timeframe], fromtime, totime, dtformat)

    new_cndl_o_h_l_c = get_candle_o_h_l_c(new_cndl_times, time, open, high, low, close, fromtime, totime, dtformat)

    return 
content_copyCOPY