Why my LSTM prediction dont match with defined output?

I have written a code in Python with tensorflow and Keras to predict stock prices. In this code, the opening, minimum, maximum and the close price of the day are given as input to the LSTM neural network. In this project, to predict the price of 5 days after in the training and testing stage, I have defined close price of 5 days later as the output. After the training stage, in test stage, I noticed that the outputs are close to the close price of the same day instead of being close to the defined output (the close price of 5 days later). This issue is the same in predicting the price of 10 days and 15 days later. I show the code below, does anyone know where the code is wrong?


from genericpath import isfile
from pickle import FALSE
import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, Bidirectional
from keras.callbacks import ModelCheckpoint, TensorBoard
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from collections import deque
#import matplotlib.pyplot as plt
import os
import time
import numpy as np
import pandas as pd
import random
import openpyxl as Excel
from tkinter import filedialog as fd

# set seed, so we can get the same results after rerunning several times
np.random.seed(314)
tf.random.set_seed(314)
random.seed(314)

def shuffle_in_unison(a, b):
    # shuffle two arrays in the same way
    state = np.random.get_state()
    np.random.shuffle(a)
    np.random.set_state(state)
    np.random.shuffle(b)

def load_data(n_steps=50, scale=True, shuffle=True, lookup_step=1, split_by_date=True,
                test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low','range','individual_buy_ratio',"individual_sell_ratio",'individual_buy_strange']):
   
    if os.path.isfile(ticker_tradeH_filename):
        trade=pd.read_csv(ticker_tradeH_filename)
        trade.index=trade["Unnamed: 0"]
        del trade["Unnamed: 0"]
    else:
        raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
    if os.path.isfile(ticker_clientH_filename):
        client=pd.read_csv(ticker_clientH_filename)
        client.index=client["Unnamed: 0"]
        del client["Unnamed: 0"]
    else:
        raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
    df = pd.concat([trade, client], axis=1,join="inner")
    
    #creat new items
    Rcol=[]
    Rcol=(df["adjclose"]-df["yesterday"])/df["adjclose"]*100
    Rcol[0]=0
    df["range"]=Rcol

    df["individual_buy_ratio"] = df["individual_buy_vol"]/df["individual_buy_count"]
    df["individual_buy_ratio"] = df["individual_buy_ratio"].replace([np.nan, -np.inf,np.Inf], 0)
    df["individual_sell_ratio"] = df["individual_sell_vol"]/df["individual_sell_count"]
    df["individual_sell_ratio"] = df["individual_sell_ratio"].replace([np.nan, -np.inf,np.Inf], 0)
    df["individual_buy_strange"] = df["individual_buy_ratio"]/df["individual_sell_ratio"]
    df["individual_buy_strange"] = df["individual_buy_strange"].replace([np.nan, -np.inf,np.Inf], 0)
    
    for col in df.columns:
        if col not in feature_columns:
            del df[col]
    
    # this will contain all the elements we want to return from this function
    result = {}
    # we will also return the original dataframe itself
    result['df'] = df.copy()
    # make sure that the passed feature_columns exist in the dataframe
    for col in feature_columns:
        assert col in df.columns, f"'{col}' does not exist in the dataframe."
     # add date as a column
    if "date" not in df.columns:
        df["date"] = df.index
    if scale:
        column_scaler = {}
        # scale the data (prices) from 0 to 1
        for column in feature_columns:
            scaler = preprocessing.MinMaxScaler()
            df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1))
            column_scaler[column] = scaler
        # add the MinMaxScaler instances to the result returned
        result["column_scaler"] = column_scaler
    # add the target column (label) by shifting by `lookup_step`
    future=df['adjclose'].shift(-lookup_step)
    df['future'] = future
    # last `lookup_step` columns contains NaN in future column
    # get them before droping NaNs
    last_sequence = np.array(df[feature_columns].tail(lookup_step))
    # drop NaNs
    df.dropna(inplace=True)
    
    sequence_data = []
    sequences = deque(maxlen=n_steps)
    for entry, target in zip(df[feature_columns + ["date"]].values, df['future'].values):
        sequences.append(entry)
        if len(sequences) == n_steps:
            sequence_data.append([np.array(sequences), target])
    # get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence
    # for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length
    # this last_sequence will be used to predict future stock prices that are not available in the dataset
    last_sequence = list([s[:len(feature_columns)] for s in sequences]) + list(last_sequence)
    last_sequence = np.array(last_sequence).astype(np.float32)
    '''
    #################################################
    print("first sequence first date",sequence_data[0][0][0][-1])
    print("first sequence last date",sequence_data[0][0][-1][-1])
    print("last sequence fist date",sequence_data[-1][0][0][-1])
    print("last sequence last date",sequence_data[-1][0][-1][-1])
    #################################################
    '''
    # add to result
    result['last_sequence'] = last_sequence
    # construct the X's and Y's
    X, Y = [], []
    for seq, target in sequence_data:
        X.append(seq)
        Y.append(target)
    # convert to numpy arrays
    X = np.array(X)
    Y = np.array(Y)
    '''
    #################################################
    print("first sequence first date",X[0][0][-1])
    print("first sequence last date",X[0][-1][-1])
    print("last sequence fist date",X[-1][0][-1])
    print("last sequence last date",X[-1][-1][-1])
    #################################################
    '''
    if split_by_date:
        # split the dataset into training & testing sets by date (not randomly splitting)
        train_samples = int((1 - test_size) * len(X))
        result["X_train"] = X[:train_samples]
        result["Y_train"] = Y[:train_samples]
        result["X_test"]  = X[train_samples:]
        result["Y_test"]  = Y[train_samples:]
        if shuffle:
            # shuffle the datasets for training (if shuffle parameter is set)
            shuffle_in_unison(result["X_train"], result["Y_train"])
            shuffle_in_unison(result["X_test"], result["Y_test"])
    else:    
        # split the dataset randomly
        result["X_train"], result["X_test"], result["Y_train"], result["Y_test"] = train_test_split(X, Y, 
                                                                                test_size=test_size, shuffle=shuffle)
    '''
    #################################################
    print("train first sequence first date",result["X_train"][0][0][-1])
    print("train first sequence last date",result["X_train"][0][-1][-1])
    print("train last sequence fist date",result["X_train"][-1][0][-1])
    print("train last sequence last date",result["X_train"][-1][-1][-1])
    print("test first sequence first date",result["X_test"][0][0][-1])
    print("test first sequence last date",result["X_test"][0][-1][-1])
    print("test last sequence fist date",result["X_test"][-1][0][-1])
    print("test last sequence last date",result["X_test"][-1][-1][-1])
    #################################################
    '''
    # get the list of test set dates
    dates = result["X_test"][:, -1, -1]
    '''
    #################################################
    print("test first date",dates[0])
    print("test last date",dates[-1])
    #################################################
    '''
    # retrieve test features from the original dataframe
    result["test_df"] = result["df"].loc[dates]
    # remove duplicated dates in the testing dataframe
    result["test_df"] = result["test_df"][~result["test_df"].index.duplicated(keep='first')]
    xx=pd.DataFrame(result["X_test"][:,-1,:])
    xx.index=dates
    yy=pd.DataFrame()
    yy["results"]=result["Y_test"]
    yy.index=dates
    testdata = pd.concat([xx, yy], axis=1,join="inner")
    
    testdata.to_csv(test_data_filename)
    # remove dates from the training/testing sets & convert to float32
    result["X_train"] = result["X_train"][:, :, :len(feature_columns)].astype(np.float32)
    result["X_test"] = result["X_test"][:, :, :len(feature_columns)].astype(np.float32)
    return result

def create_model(sequence_length, n_features, units=256, cell=LSTM, n_layers=2, dropout=0.3,
                loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False):
    model = Sequential()
    for i in range(n_layers):
        if i == 0:
            # first layer
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=True), batch_input_shape=(None, sequence_length, n_features)))
            else:
                model.add(cell(units, return_sequences=True, batch_input_shape=(None, sequence_length, n_features)))
        elif i == n_layers - 1:
            # last layer
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=False)))
            else:
                model.add(cell(units, return_sequences=False))
        else:
            # hidden layers
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=True)))
            else:
                model.add(cell(units, return_sequences=True))
        # add dropout after each layer
        model.add(Dropout(dropout))
    model.add(Dense(1, activation="linear"))
    model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer)
    return model

def plot_graph(test_df):
    """
    This function plots true close price along with predicted close price
    with blue and red colors respectively
    
    plt.plot(test_df[f'true_adjclose_{LOOKUP_STEP}'], c='b')
    plt.plot(test_df[f'adjclose_{LOOKUP_STEP}'], c='r')
    plt.xlabel("Days")
    plt.ylabel("Price")
    plt.legend(["Actual Price", "Predicted Price"])
    plt.show()
    """
    
def get_test_prediction(model, data):
    X_test = data["X_test"]
    Y_test = data["Y_test"]
    # perform prediction and get prices
    Y_pred = model.predict(X_test)
    if SCALE:
        Y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(Y_test, axis=0)))
        Y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(Y_pred))
    else:
        Y_pred=np.squeeze(Y_pred, axis=1)
    test_df = data["test_df"]
    precloseprice=test_df["adjclose"]
    # add predicted future prices to the dataframe
    test_df[f"adjclose_{LOOKUP_STEP}"] = Y_pred
    # add true future prices to the dataframe
    test_df[f"true_adjclose_{LOOKUP_STEP}"] = Y_test
    #calculate prediction mistake
    predict_prufit=100*(Y_pred-precloseprice)/precloseprice
    test_df["predict_prufit"] = predict_prufit
    real_prufit=100*(Y_test-precloseprice)/precloseprice
    test_df["real_prufit"] = real_prufit
    predict_mistake=real_prufit-predict_prufit
    test_df["predict_mistake"] = predict_mistake
    relative_mistake=predict_mistake/abs(real_prufit)
    relative_mistake=relative_mistake.replace([np.nan, -np.inf,np.Inf], 0)
    test_df["relative_mistake"] = relative_mistake
    # sort the dataframe by date
    test_df.sort_index(inplace=True)
    return test_df

def predict(model, data):
    # retrieve the last sequence from data
    last_sequence = data["last_sequence"][-N_STEPS:]
    # expand dimension
    last_sequence = np.expand_dims(last_sequence, axis=0)
    # get the prediction (scaled from 0 to 1)
    prediction = model.predict(last_sequence)
    # get the price (by inverting the scaling)
    if SCALE:
        predicted_price = data["column_scaler"]["adjclose"].inverse_transform(prediction)[0][0]
    else:
        predicted_price = prediction[0][0]
    return predicted_price

#### initialize the model hyperparameters  
# Window size or the sequence length
N_STEPS = 24  #1 month
# Lookup step, 1 is the next day
LOOKUP_STEP = 15
# whether to scale feature columns & output price as well
##############
SCALE = True
scale_str = f"sc-{int(SCALE)}"
# whether to shuffle the dataset
#############
SHUFFLE = False
shuffle_str = f"sh-{int(SHUFFLE)}"
# whether to split the training/testing set by date
SPLIT_BY_DATE = True
###################
split_by_date_str = f"sbd-{int(SPLIT_BY_DATE)}"
# test ratio size, 0.2 is 20%
TEST_SIZE = 0.2
# features to use
FEATURE_COLUMNS = ["adjclose", "volume", "open", "high", "low","range","individual_buy_ratio","individual_sell_ratio","individual_buy_strange"]
# date now
date_now = time.strftime("%Y-%m-%d")
time_now = time.strftime("%H-%M")
### model parameters
N_LAYERS = 2
# LSTM cell
CELL = LSTM
# 256 LSTM neurons
UNITS = 256
# 40% dropout
DROPOUT = 0.2
# whether to use bidirectional RNNs
BIDIRECTIONAL = False
### training parameters
# mean absolute error loss
# LOSS = "mae"
# huber loss
LOSS = "huber_loss"
OPTIMIZER = "adam"
BATCH_SIZE = 64
EPOCHS = 10

filename=fd.askopenfilename()
name= os.path.splitext(os.path.basename(filename))[0]
cpath= os.path.dirname(filename)
tickerlistwb = Excel.load_workbook(filename)
tickerlistws = tickerlistwb['tickers']

# train result file
if not os.path.isdir(cpath + "\\Train-history"):
   os.mkdir(cpath + "\\Train-history")
train_result_filename = cpath + f"\\Train-history\\Train-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
column_name=["loss","Mean Absolute Error",f"Future price after {LOOKUP_STEP} days","mistake Max","mistake Ave","mistake Min"]
train_result=pd.DataFrame(columns=column_name) 
    
r = tickerlistws.max_row
for i in range(1,r+1,1):
    # get stock name
    tickerEn =tickerlistws.cell(row=i, column=1).value
    tickerpath=cpath+f"\\Stocks\\{tickerEn}"
    # model name to save, making it as unique as possible based on parameters
    model_name = f"{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}"
    #model_name = f"{date_now}_{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-seq-{N_STEPS}-step-{LOOKUP_STEP}-layers-{N_LAYERS}-units-{UNITS}"
    if BIDIRECTIONAL:
        model_name += "-b"
    
    # create these folders if they does not exist
    if not os.path.isdir(tickerpath+"\\Train"):
        os.mkdir(tickerpath+"\\Train")
    if not os.path.isdir(tickerpath+"\\History"):
        os.mkdir(tickerpath+"\\History")
    ticker_data_filename = tickerpath + f"\\Train\\data-{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
    test_data_filename= tickerpath + f"\\Train\\test-{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
    result_filename = tickerpath + f"\\Train\\{model_name}.csv"
    log_directory=tickerpath + f"\\Train\\{model_name}"
    model_filename = tickerpath + f"\\{model_name}.h5"
    ticker_tradeH_filename = tickerpath + "\\History\\Trade_H.csv"
    ticker_clientH_filename = tickerpath + "\\History\\Client_H.csv"
    # load the data
    data = load_data(N_STEPS, scale=SCALE, split_by_date=SPLIT_BY_DATE, 
                    shuffle=SHUFFLE, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE, 
                    feature_columns=FEATURE_COLUMNS)
    # save the dataframe
    data["df"].to_csv(ticker_data_filename)
    # construct the model
    model = create_model(N_STEPS, len(FEATURE_COLUMNS), loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS,
                        dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL)
    # some tensorflow callbacks
    checkpointer = ModelCheckpoint(model_filename, save_weights_only=False, save_best_only=True, verbose=1)
    tensorboard = TensorBoard(log_dir=log_directory)
    # train the model and save the weights whenever we see 
    # a new optimal model using ModelCheckpoint
    history = model.fit(data["X_train"], data["Y_train"],
                        batch_size=BATCH_SIZE,
                        epochs=EPOCHS,
                        validation_data=(data["X_test"], data["Y_test"]),
                        callbacks=[checkpointer, tensorboard],
                        verbose=1)

    # load optimal model weights from results folder
    model.load_weights(model_filename)
    # evaluate the model
    loss, mae = model.evaluate(data["X_test"], data["Y_test"], verbose=0)
    # calculate the mean absolute error (inverse scaling)
    if SCALE:
        mean_absolute_error = data["column_scaler"]["adjclose"].inverse_transform([[mae]])[0][0]
    else:
        mean_absolute_error = mae
           
    # get the final dataframe for the testing set
    test_data_predict = get_test_prediction(model, data)
    # predict the future price
    future_price = predict(model, data)
    result=[loss,mean_absolute_error,future_price,test_data_predict["relative_mistake"].max(),test_data_predict["relative_mistake"].mean(),test_data_predict["relative_mistake"].min()]
    train_result.loc[tickerEn]=result

    # save the final dataframe to csv-results folder
    test_data_predict.to_csv(result_filename)
train_result.to_csv(train_result_filename)

I do not think there is necessarily a problem with your code. I would think that historically the best prediction of a stock price five days in the future is the current price. If it were possible to make a better prediction this would already have been done and incorporated into the price. Stock market prices are random walks and can only be predicted if you have access to information that no one else has.
So I am afraid the problem is conceptual and not related to you python coding.

1 Like

The code is wrong because you expect that an AI could predict the future. If that were possible, everybody would already be doing it and getting rich.

2 Likes

I don’t believe that’s entirely true. You need other information, but not necessarily inside information. But there is obviously no way to predict the future price of any particular stock based only on the historical price of that stock.

1 Like

Thanks for your comment, I know the forecast has an error. But regardless of the nature of the data and the possibility and impossibility of predicting the stock market, my question is about the behavior of the neural network. If you look carefully in the figures, the green curve is defined output, the orange curve is predicted output by the network and the blue curve is close price of each day, which is given as an input in the test stage. As seen in diagram 1, instead of following the defined output pattern, the predicted output followed the pattern of one of the input data. By changing the forecast window in graph 2, this shift is clearly visible.