-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
132 lines (103 loc) · 4.64 KB
/
main.py
File metadata and controls
132 lines (103 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# Load the dataset
data = pd.read_csv('trading_summary.csv')
# Initialize a dictionary to store models for each company
company_models = {}
# Define features and target variables
features = ['company_share_volume', 'company_trade_volume', 'company_previous_close',
'company_open_price', 'company_highest_price', 'company_lowest_price']
target = 'company_last_trade_price'
# Function to create the time series dataset
def create_dataset(features, target, time_step=1):
dataX, dataY = [], []
for i in range(len(features) - time_step):
a = features[i:(i + time_step), :]
dataX.append(a)
dataY.append(target[i + time_step])
return np.array(dataX), np.array(dataY)
# Set the time step
time_step = 1 # Adjusted time step
# Initialize FastAPI
app = FastAPI()
# Define a request model for the API
class CompanyRequest(BaseModel):
company_name: str
# Train a separate model for each company
for company_name, company_data in data.groupby('company_name'):
# Normalize the features and target
scaler_features = MinMaxScaler(feature_range=(0, 1))
scaler_target = MinMaxScaler(feature_range=(0, 1))
scaled_features = scaler_features.fit_transform(company_data[features])
scaled_target = scaler_target.fit_transform(company_data[[target]])
# Ensure time step is appropriate for the data size
if len(company_data) <= time_step:
continue # Skip this company if dataset is too small
# Create the dataset
X, y = create_dataset(scaled_features, scaled_target, time_step)
# Build and train the LSTM model
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(time_step, len(features))))
model.add(LSTM(50, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(25))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X, y, batch_size=1, epochs=1)
# Save the model for future use
model.save(f"lstm_model_{company_name}.h5")
# Add the model to the dictionary
company_models[company_name] = model
# Define the prediction endpoint
@app.post("/report/prediction")
def predict_next_prices(request: CompanyRequest):
company_name = request.company_name
if company_name not in company_models:
raise HTTPException(status_code=404, detail="Company not found")
# Load the model for the specified company
model = load_model(f"lstm_model_{company_name}.h5")
# Get the data for the specified company
company_data = data[data['company_name'] == company_name]
if len(company_data) < time_step:
raise HTTPException(status_code=404, detail="Not enough data to make prediction")
# Normalize company data
scaler_features = MinMaxScaler(feature_range=(0, 1))
scaler_target = MinMaxScaler(feature_range=(0, 1))
scaled_features = scaler_features.fit_transform(company_data[features])
scaled_target = scaler_target.fit_transform(company_data[[target]])
# Create dataset for the company
company_X, company_y = create_dataset(scaled_features, scaled_target, time_step)
# Initialize an empty list to store predicted prices
predicted_prices = []
# Take the last steps of the company's data
last_steps = company_X[-1]
last_steps = np.reshape(last_steps, (1, time_step, len(features)))
# Initialize a list to store current last trade prices
current_last_trade_prices = []
# Get the last few actual trade prices used in the prediction
f = open("trading_summary.csv", "r")
for i in f:
if company_name in i:
if company_name == i.split(",")[0]:
current_last_trade_prices.append(float(i.split(",")[-3]))
# Predict the next 5 prices iteratively
for _ in range(5):
next_price_scaled = model.predict(last_steps)
predicted_prices.append(float(scaler_target.inverse_transform(next_price_scaled)[0][0]))
# Shift the input sequence by one step and update last_steps
last_steps = np.roll(last_steps, -1, axis=1)
last_steps[0, -1] = next_price_scaled
last_steps = np.reshape(last_steps, (1, time_step, len(features)))
return {
"company_name": company_name,
"current_last_trade_prices": current_last_trade_prices,
"predicted_last_trade_prices": predicted_prices
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8003)