I have written a code in Python with tensorflow and Keras to predict stock prices. In this code, the opening, minimum, maximum and the close price of the day are given as input to the LSTM neural network. In this project, to predict the price of 5 days after in the training and testing stage, I have defined close price of 5 days later as the output. After the training stage, in test stage, I noticed that the outputs are close to the close price of the same day instead of being close to the defined output (the close price of 5 days later). This issue is the same in predicting the price of 10 days and 15 days later. I show the code below, does anyone know where the code is wrong?
from genericpath import isfile
from pickle import FALSE
import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, Bidirectional
from keras.callbacks import ModelCheckpoint, TensorBoard
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from collections import deque
#import matplotlib.pyplot as plt
import os
import time
import numpy as np
import pandas as pd
import random
import openpyxl as Excel
from tkinter import filedialog as fd
# set seed, so we can get the same results after rerunning several times
np.random.seed(314)
tf.random.set_seed(314)
random.seed(314)
def shuffle_in_unison(a, b):
# shuffle two arrays in the same way
state = np.random.get_state()
np.random.shuffle(a)
np.random.set_state(state)
np.random.shuffle(b)
def load_data(n_steps=50, scale=True, shuffle=True, lookup_step=1, split_by_date=True,
test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low','range','individual_buy_ratio',"individual_sell_ratio",'individual_buy_strange']):
if os.path.isfile(ticker_tradeH_filename):
trade=pd.read_csv(ticker_tradeH_filename)
trade.index=trade["Unnamed: 0"]
del trade["Unnamed: 0"]
else:
raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
if os.path.isfile(ticker_clientH_filename):
client=pd.read_csv(ticker_clientH_filename)
client.index=client["Unnamed: 0"]
del client["Unnamed: 0"]
else:
raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
df = pd.concat([trade, client], axis=1,join="inner")
#creat new items
Rcol=[]
Rcol=(df["adjclose"]-df["yesterday"])/df["adjclose"]*100
Rcol[0]=0
df["range"]=Rcol
df["individual_buy_ratio"] = df["individual_buy_vol"]/df["individual_buy_count"]
df["individual_buy_ratio"] = df["individual_buy_ratio"].replace([np.nan, -np.inf,np.Inf], 0)
df["individual_sell_ratio"] = df["individual_sell_vol"]/df["individual_sell_count"]
df["individual_sell_ratio"] = df["individual_sell_ratio"].replace([np.nan, -np.inf,np.Inf], 0)
df["individual_buy_strange"] = df["individual_buy_ratio"]/df["individual_sell_ratio"]
df["individual_buy_strange"] = df["individual_buy_strange"].replace([np.nan, -np.inf,np.Inf], 0)
for col in df.columns:
if col not in feature_columns:
del df[col]
# this will contain all the elements we want to return from this function
result = {}
# we will also return the original dataframe itself
result['df'] = df.copy()
# make sure that the passed feature_columns exist in the dataframe
for col in feature_columns:
assert col in df.columns, f"'{col}' does not exist in the dataframe."
# add date as a column
if "date" not in df.columns:
df["date"] = df.index
if scale:
column_scaler = {}
# scale the data (prices) from 0 to 1
for column in feature_columns:
scaler = preprocessing.MinMaxScaler()
df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1))
column_scaler[column] = scaler
# add the MinMaxScaler instances to the result returned
result["column_scaler"] = column_scaler
# add the target column (label) by shifting by `lookup_step`
future=df['adjclose'].shift(-lookup_step)
df['future'] = future
# last `lookup_step` columns contains NaN in future column
# get them before droping NaNs
last_sequence = np.array(df[feature_columns].tail(lookup_step))
# drop NaNs
df.dropna(inplace=True)
sequence_data = []
sequences = deque(maxlen=n_steps)
for entry, target in zip(df[feature_columns + ["date"]].values, df['future'].values):
sequences.append(entry)
if len(sequences) == n_steps:
sequence_data.append([np.array(sequences), target])
# get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence
# for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length
# this last_sequence will be used to predict future stock prices that are not available in the dataset
last_sequence = list([s[:len(feature_columns)] for s in sequences]) + list(last_sequence)
last_sequence = np.array(last_sequence).astype(np.float32)
'''
#################################################
print("first sequence first date",sequence_data[0][0][0][-1])
print("first sequence last date",sequence_data[0][0][-1][-1])
print("last sequence fist date",sequence_data[-1][0][0][-1])
print("last sequence last date",sequence_data[-1][0][-1][-1])
#################################################
'''
# add to result
result['last_sequence'] = last_sequence
# construct the X's and Y's
X, Y = [], []
for seq, target in sequence_data:
X.append(seq)
Y.append(target)
# convert to numpy arrays
X = np.array(X)
Y = np.array(Y)
'''
#################################################
print("first sequence first date",X[0][0][-1])
print("first sequence last date",X[0][-1][-1])
print("last sequence fist date",X[-1][0][-1])
print("last sequence last date",X[-1][-1][-1])
#################################################
'''
if split_by_date:
# split the dataset into training & testing sets by date (not randomly splitting)
train_samples = int((1 - test_size) * len(X))
result["X_train"] = X[:train_samples]
result["Y_train"] = Y[:train_samples]
result["X_test"] = X[train_samples:]
result["Y_test"] = Y[train_samples:]
if shuffle:
# shuffle the datasets for training (if shuffle parameter is set)
shuffle_in_unison(result["X_train"], result["Y_train"])
shuffle_in_unison(result["X_test"], result["Y_test"])
else:
# split the dataset randomly
result["X_train"], result["X_test"], result["Y_train"], result["Y_test"] = train_test_split(X, Y,
test_size=test_size, shuffle=shuffle)
'''
#################################################
print("train first sequence first date",result["X_train"][0][0][-1])
print("train first sequence last date",result["X_train"][0][-1][-1])
print("train last sequence fist date",result["X_train"][-1][0][-1])
print("train last sequence last date",result["X_train"][-1][-1][-1])
print("test first sequence first date",result["X_test"][0][0][-1])
print("test first sequence last date",result["X_test"][0][-1][-1])
print("test last sequence fist date",result["X_test"][-1][0][-1])
print("test last sequence last date",result["X_test"][-1][-1][-1])
#################################################
'''
# get the list of test set dates
dates = result["X_test"][:, -1, -1]
'''
#################################################
print("test first date",dates[0])
print("test last date",dates[-1])
#################################################
'''
# retrieve test features from the original dataframe
result["test_df"] = result["df"].loc[dates]
# remove duplicated dates in the testing dataframe
result["test_df"] = result["test_df"][~result["test_df"].index.duplicated(keep='first')]
xx=pd.DataFrame(result["X_test"][:,-1,:])
xx.index=dates
yy=pd.DataFrame()
yy["results"]=result["Y_test"]
yy.index=dates
testdata = pd.concat([xx, yy], axis=1,join="inner")
testdata.to_csv(test_data_filename)
# remove dates from the training/testing sets & convert to float32
result["X_train"] = result["X_train"][:, :, :len(feature_columns)].astype(np.float32)
result["X_test"] = result["X_test"][:, :, :len(feature_columns)].astype(np.float32)
return result
def create_model(sequence_length, n_features, units=256, cell=LSTM, n_layers=2, dropout=0.3,
loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False):
model = Sequential()
for i in range(n_layers):
if i == 0:
# first layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True), batch_input_shape=(None, sequence_length, n_features)))
else:
model.add(cell(units, return_sequences=True, batch_input_shape=(None, sequence_length, n_features)))
elif i == n_layers - 1:
# last layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=False)))
else:
model.add(cell(units, return_sequences=False))
else:
# hidden layers
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True)))
else:
model.add(cell(units, return_sequences=True))
# add dropout after each layer
model.add(Dropout(dropout))
model.add(Dense(1, activation="linear"))
model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer)
return model
def plot_graph(test_df):
"""
This function plots true close price along with predicted close price
with blue and red colors respectively
plt.plot(test_df[f'true_adjclose_{LOOKUP_STEP}'], c='b')
plt.plot(test_df[f'adjclose_{LOOKUP_STEP}'], c='r')
plt.xlabel("Days")
plt.ylabel("Price")
plt.legend(["Actual Price", "Predicted Price"])
plt.show()
"""
def get_test_prediction(model, data):
X_test = data["X_test"]
Y_test = data["Y_test"]
# perform prediction and get prices
Y_pred = model.predict(X_test)
if SCALE:
Y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(Y_test, axis=0)))
Y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(Y_pred))
else:
Y_pred=np.squeeze(Y_pred, axis=1)
test_df = data["test_df"]
precloseprice=test_df["adjclose"]
# add predicted future prices to the dataframe
test_df[f"adjclose_{LOOKUP_STEP}"] = Y_pred
# add true future prices to the dataframe
test_df[f"true_adjclose_{LOOKUP_STEP}"] = Y_test
#calculate prediction mistake
predict_prufit=100*(Y_pred-precloseprice)/precloseprice
test_df["predict_prufit"] = predict_prufit
real_prufit=100*(Y_test-precloseprice)/precloseprice
test_df["real_prufit"] = real_prufit
predict_mistake=real_prufit-predict_prufit
test_df["predict_mistake"] = predict_mistake
relative_mistake=predict_mistake/abs(real_prufit)
relative_mistake=relative_mistake.replace([np.nan, -np.inf,np.Inf], 0)
test_df["relative_mistake"] = relative_mistake
# sort the dataframe by date
test_df.sort_index(inplace=True)
return test_df
def predict(model, data):
# retrieve the last sequence from data
last_sequence = data["last_sequence"][-N_STEPS:]
# expand dimension
last_sequence = np.expand_dims(last_sequence, axis=0)
# get the prediction (scaled from 0 to 1)
prediction = model.predict(last_sequence)
# get the price (by inverting the scaling)
if SCALE:
predicted_price = data["column_scaler"]["adjclose"].inverse_transform(prediction)[0][0]
else:
predicted_price = prediction[0][0]
return predicted_price
#### initialize the model hyperparameters
# Window size or the sequence length
N_STEPS = 24 #1 month
# Lookup step, 1 is the next day
LOOKUP_STEP = 15
# whether to scale feature columns & output price as well
##############
SCALE = True
scale_str = f"sc-{int(SCALE)}"
# whether to shuffle the dataset
#############
SHUFFLE = False
shuffle_str = f"sh-{int(SHUFFLE)}"
# whether to split the training/testing set by date
SPLIT_BY_DATE = True
###################
split_by_date_str = f"sbd-{int(SPLIT_BY_DATE)}"
# test ratio size, 0.2 is 20%
TEST_SIZE = 0.2
# features to use
FEATURE_COLUMNS = ["adjclose", "volume", "open", "high", "low","range","individual_buy_ratio","individual_sell_ratio","individual_buy_strange"]
# date now
date_now = time.strftime("%Y-%m-%d")
time_now = time.strftime("%H-%M")
### model parameters
N_LAYERS = 2
# LSTM cell
CELL = LSTM
# 256 LSTM neurons
UNITS = 256
# 40% dropout
DROPOUT = 0.2
# whether to use bidirectional RNNs
BIDIRECTIONAL = False
### training parameters
# mean absolute error loss
# LOSS = "mae"
# huber loss
LOSS = "huber_loss"
OPTIMIZER = "adam"
BATCH_SIZE = 64
EPOCHS = 10
filename=fd.askopenfilename()
name= os.path.splitext(os.path.basename(filename))[0]
cpath= os.path.dirname(filename)
tickerlistwb = Excel.load_workbook(filename)
tickerlistws = tickerlistwb['tickers']
# train result file
if not os.path.isdir(cpath + "\\Train-history"):
os.mkdir(cpath + "\\Train-history")
train_result_filename = cpath + f"\\Train-history\\Train-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
column_name=["loss","Mean Absolute Error",f"Future price after {LOOKUP_STEP} days","mistake Max","mistake Ave","mistake Min"]
train_result=pd.DataFrame(columns=column_name)
r = tickerlistws.max_row
for i in range(1,r+1,1):
# get stock name
tickerEn =tickerlistws.cell(row=i, column=1).value
tickerpath=cpath+f"\\Stocks\\{tickerEn}"
# model name to save, making it as unique as possible based on parameters
model_name = f"{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}"
#model_name = f"{date_now}_{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-seq-{N_STEPS}-step-{LOOKUP_STEP}-layers-{N_LAYERS}-units-{UNITS}"
if BIDIRECTIONAL:
model_name += "-b"
# create these folders if they does not exist
if not os.path.isdir(tickerpath+"\\Train"):
os.mkdir(tickerpath+"\\Train")
if not os.path.isdir(tickerpath+"\\History"):
os.mkdir(tickerpath+"\\History")
ticker_data_filename = tickerpath + f"\\Train\\data-{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
test_data_filename= tickerpath + f"\\Train\\test-{tickerEn}-{shuffle_str}-{scale_str}-{split_by_date_str}-epoch{EPOCHS}-dropout{DROPOUT}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-layers-{N_LAYERS}-seq-{N_STEPS}-step-{LOOKUP_STEP}-units-{UNITS}.csv"
result_filename = tickerpath + f"\\Train\\{model_name}.csv"
log_directory=tickerpath + f"\\Train\\{model_name}"
model_filename = tickerpath + f"\\{model_name}.h5"
ticker_tradeH_filename = tickerpath + "\\History\\Trade_H.csv"
ticker_clientH_filename = tickerpath + "\\History\\Client_H.csv"
# load the data
data = load_data(N_STEPS, scale=SCALE, split_by_date=SPLIT_BY_DATE,
shuffle=SHUFFLE, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE,
feature_columns=FEATURE_COLUMNS)
# save the dataframe
data["df"].to_csv(ticker_data_filename)
# construct the model
model = create_model(N_STEPS, len(FEATURE_COLUMNS), loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS,
dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL)
# some tensorflow callbacks
checkpointer = ModelCheckpoint(model_filename, save_weights_only=False, save_best_only=True, verbose=1)
tensorboard = TensorBoard(log_dir=log_directory)
# train the model and save the weights whenever we see
# a new optimal model using ModelCheckpoint
history = model.fit(data["X_train"], data["Y_train"],
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_data=(data["X_test"], data["Y_test"]),
callbacks=[checkpointer, tensorboard],
verbose=1)
# load optimal model weights from results folder
model.load_weights(model_filename)
# evaluate the model
loss, mae = model.evaluate(data["X_test"], data["Y_test"], verbose=0)
# calculate the mean absolute error (inverse scaling)
if SCALE:
mean_absolute_error = data["column_scaler"]["adjclose"].inverse_transform([[mae]])[0][0]
else:
mean_absolute_error = mae
# get the final dataframe for the testing set
test_data_predict = get_test_prediction(model, data)
# predict the future price
future_price = predict(model, data)
result=[loss,mean_absolute_error,future_price,test_data_predict["relative_mistake"].max(),test_data_predict["relative_mistake"].mean(),test_data_predict["relative_mistake"].min()]
train_result.loc[tickerEn]=result
# save the final dataframe to csv-results folder
test_data_predict.to_csv(result_filename)
train_result.to_csv(train_result_filename)