本文约10000字,建议阅读10分钟
本文介绍了使用深度强化学习预测股票。
深度强化学习可以将深度学习与强化学习相结合:深度学习擅长从原始数据中学习复杂的表示,强化学习则使代理能够通过反复试验在给定环境中学习最佳动作。通过DRL,研究人员和投资者可以开发能够分析历史数据的模型,理解复杂的市场动态,并对股票购买、销售或持有做出明智的决策。
下面我们一边写代码一边介绍这些相关的知识
数据集 import numpy as np import pandas as pd import copyimport numpy as npimport chainer import chainer.functions as F import chainer.links as Lfrom plotly import tools from plotly.graph_objs import * from plotly.offline import init_notebook_mode, iplot, iplot_mpl from tqdm import tqdm_notebook as tqdminit_notebook_mode()这里主要使用使用Jupyter notebook和plotly进行可视化,所以需要一些额外的设置,下面开始读取数据
try: data = pd.read_csv('../input/Data/Stocks/goog.us.txt') data['Date'] = pd.to_datetime(data['Date']) data = data.set_index('Date')except (FileNotFoundError): import datetime import pandas_datareader as pdr from pandas import Series, DataFrame start = datetime.datetime(2010, 1, 1) end = datetime.datetime(2017, 1, 11) data = pdr.get_data_yahoo("AAPL", start, end)print(data.index.min(), data.index.max())split_index = int(len(data)/2) date_split = data.index[split_index] train = data[:split_index] test = data[split_index:]#date_split = '2016-01-01' date_split = '2016-01-01' train = data[:date_split] test = data[date_split:] print(len(data), len(train), len(test)) display(data)代码从数据集中读取数据。进行测试和验证集的拆分,然后使用' display '函数,代码在Jupyter笔记本中显示导入的数据。
def plot_train_test(train, test, date_split): data = [ Candlestick(x=train.index, open=train['Open'], high=train['High'], low=train['Low'], close=train['Close'], name='train'), Candlestick(x=test.index, open=test['Open'], high=test['High'], low=test['Low'], close=test['Close'], name='test') ] layout = { 'shapes': [ {'x0': date_split, 'x1': date_split, 'y0': 0, 'y1': 1, 'xref': 'x', 'yref': 'paper', 'line': {'color': 'rgb(0,0,0)', 'width': 1}} ], 'annotations': [ {'x': date_split, 'y': 1.0, 'xref': 'x', 'yref': 'paper', 'showarrow': False, 'xanchor': 'left', 'text': ' test data'}, {'x': date_split, 'y': 1.0, 'xref': 'x', 'yref': 'paper', 'showarrow': False, 'xanchor': 'right', 'text': 'train data '} ] } figure = Figure(data=data, layout=layout) iplot(figure)这段代码定义了一个名为plot_train_test的函数,该函数使用Python绘图库Plotly创建可视化图。基于指定的日期,图表将股票数据分为训练集和测试集。输入参数包括train、test和date_split。
可视化结果如下:
plot_train_test(train, test, date_split)环境 Environment: def __init__(self, data, history_t=90): self.data = data self.history_t = history_t self.reset() def reset(self): self.t = 0 self.done = False self.profits = 0 self.positions = [] self.position_value = 0 self.history = [0 for _ in range(self.history_t)] return [self.position_value] + self.history # obs def step(self, act): reward = 0 # act = 0: stay, 1: buy, 2: sell if act == 1: self.positions.append(self.data.iloc[self.t, :]['Close']) elif act == 2: # sell if len(self.positions) == 0: reward = -1 else: profits = 0 for p in self.positions: profits += (self.data.iloc[self.t, :]['Close'] - p) reward += profits self.profits += profits self.positions = [] # set next time self.t += 1 self.position_value = 0 for p in self.positions: self.position_value += (self.data.iloc[self.t, :]['Close'] - p) self.history.pop(0) self.history.append(self.data.iloc[self.t, :]['Close'] - self.data.iloc[(self.t-1), :]['Close']) # clipping reward if reward > 0: reward = 1 elif reward < 0: reward = -1 return [self.position_value] + self.history, reward, self.done, self.profits # obs, reward, done, profits下面我们开始编写强化学习相关的内容
首先定义强化学习的环境,这里的Environment的类模拟了一个简单的交易环境。使用历史股票价格数据,代理可以根据这些数据决定是否购买、出售或持有股票。
init()接受两个参数:data,表示股票价格数据;history_t,定义环境应该维持多少时间步长。通过设置data和history_t值并调用reset(),构造函数初始化了环境。
Reset()初始化或重置环境的内部状态变量,包括当前时间步长(self.t)、完成标志、总利润、未平仓头寸、头寸值和历史价格。该方法返回由头寸价值和价格历史组成的观测值。
step()方法,可以基于一个动作更新环境的状态。动作用整数表示:0表示持有,1表示购买,2表示出售。如果代理人决定买入,股票的当前收盘价将被添加到头寸列表中。一旦经纪人决定卖出,该方法计算每个未平仓头寸的利润或损失,并相应地更新利润变量。然后,所有未平仓头寸被平仓。根据卖出行为中产生的利润或损失,奖励被削减到- 1,0或1。
代理可以使用Environment类学习并根据历史股票价格数据做出决策,Environment类模拟股票交易环境。在受控环境中,可以训练强化学习代理来制定交易策略。
env = Environment(train) print(env.reset()) for _ in range(3): pact = np.random.randint(3) print(env.step(pact))DQNdef train_dqn(env, epoch_num=50): Q_Network(chainer.Chain): def __init__(self, input_size, hidden_size, output_size): super(Q_Network, self).__init__( fc1 = L.Linear(input_size, hidden_size), fc2 = L.Linear(hidden_size, hidden_size), fc3 = L.Linear(hidden_size, output_size) ) def __call__(self, x): h = F.relu(self.fc1(x)) h = F.relu(self.fc2(h)) y = self.fc3(h) return y def reset(self): self.zerograds() Q = Q_Network(input_size=env.history_t+1, hidden_size=100, output_size=3) Q_ast = copy.deepcopy(Q) optimizer = chainer.optimizers.Adam() optimizer.setup(Q) step_max = len(env.data)-1 memory_size = 200 batch_size = 20 epsilon = 1.0 epsilon_decrease = 1e-3 epsilon_min = 0.1 start_reduce_epsilon = 200 train_freq = 10 update_q_freq = 20 gamma = 0.97 show_log_freq = 5 memory = [] total_step = 0 total_rewards = [] total_losses = [] start = time.time() for epoch in range(epoch_num): pobs = env.reset() step = 0 done = False total_reward = 0 total_loss = 0 while not done and step < step_max: # select act pact = np.random.randint(3) if np.random.rand() > epsilon: pact = Q(np.array(pobs, dtype=np.float32).reshape(1, -1)) pact = np.argmax(pact.data) # act obs, reward, done, profit = env.step(pact) # add memory memory.append((pobs, pact, reward, obs, done)) if len(memory) > memory_size: memory.pop(0) # train or update q if len(memory) == memory_size: if total_step % train_freq == 0: shuffled_memory = np.random.permutation(memory) memory_idx = range(len(shuffled_memory)) for i in memory_idx[::batch_size]: batch = np.array(shuffled_memory[i:i+batch_size]) b_pobs = np.array(batch[:, 0].tolist(), dtype=np.float32).reshape(batch_size, -1) b_pact = np.array(batch[:, 1].tolist(), dtype=np.int32) b_reward = np.array(batch[:, 2].tolist(), dtype=np.int32) b_obs = np.array(batch[:, 3].tolist(), dtype=np.float32).reshape(batch_size, -1) b_done = np.array(batch[:, 4].tolist(), dtype=np.bool) q = Q(b_pobs) maxq = np.max(Q_ast(b_obs).data, axis=1) target = copy.deepcopy(q.data) for j in range(batch_size): target[j, b_pact[j]] = b_reward[j]+gamma*maxq[j]*(not b_done[j]) Q.reset() deephub_loss = F.mean_squared_error(q, target) total_loss += loss.data loss.backward() optimizer.update() if total_step % update_q_freq == 0: Q_ast = copy.deepcopy(Q) # epsilon if epsilon > epsilon_min and total_step > start_reduce_epsilon: epsilon -= epsilon_decrease # next step total_reward += reward pobs = obs step += 1 total_step += 1 total_rewards.append(total_reward) total_losses.append(total_loss) if (epoch+1) % show_log_freq == 0: log_reward = sum(total_rewards[((epoch+1)-show_log_freq):])/show_log_freq log_loss = sum(total_losses[((epoch+1)-show_log_freq):])/show_log_freq elapsed_time = time.time()-start print('\t'.join(map(str, [epoch+1, epsilon, total_step, log_reward, log_loss, elapsed_time]))) start = time.time() return Q, total_losses, total_rewards这段代码定义了一个函数train_dqn(),它为一个简单的股票交易环境训练一个Deep Q-Network (DQN)。该函数接受两个参数:一个是env参数,表示交易环境;另一个是epoch_num参数,指定要训练多少epoch。
代码定义了一个Q_Network类,它是Chainer的Chain类的一个子类。在Q-Network中,有三个完全连接的层,前两层具有ReLU激活函数。模型梯度通过reset()方法归零。
创建Q- network的两个实例Q和Q_ast,以及用于更新模型参数的Adam优化器。对于DQN训练,定义了几个超参数,包括缓冲区内存大小、批处理大小、epsilon、gamma和更新频率。
为了跟踪模型在训练期间的表现,在内存列表中创建total_rewards和total_losses列表。在每个epoch的开始,环境被重置,一些变量被初始化。
代理根据当前状态和epsilon-greedy探索策略选择一个动作(持有、买入或卖出)。然后,代理在环境中执行动作,获得奖励,并观察新的状态。在缓冲区中,存储了经验元组(前一个状态、动作、奖励、新状态和完成标志)。
为了训练DQN,当缓冲区满时,从内存中采样一批经验。利用Q_ast网络和Bellman方程,计算了目标q值。损失计算为预测q值与目标q值之间的均方误差。计算梯度,优化器更新模型参数。
目标网络Q_ast使用主网络q的权值定期更新,随着智能体的学习,epsilon值线性减小,促进更多的利用。每个时期,总奖励和损失都会累积起来,结果也会被记录下来。
训练结束时,train_dqn()返回训练后的Q-Network、总损失和总奖励。DQN模型可用于根据输入的股票价格数据和模拟的交易环境制定交易策略。
dqn, total_losses, total_rewards = train_dqn(Environment(train), epoch_num=25)这段代码使用来自指定环境的训练数据(使用train_dqn函数)训练DQN模型,并返回训练后的模型以及每个训练历元的总损失和奖励。
def plot_loss_reward(total_losses, total_rewards): figure = tools.make_subplots(rows=1, cols=2, subplot_titles=('loss', 'reward'), print_grid=False) figure.append_trace(Scatter(y=total_losses, mode='lines', line=dict(color='skyblue')), 1, 1) figure.append_trace(Scatter(y=total_rewards, mode='lines', line=dict(color='orange')), 1, 2) figure['layout']['xaxis1'].update(title='epoch') figure['layout']['xaxis2'].update(title='epoch') figure['layout'].update(height=400, width=900, showlegend=False) iplot(figure)plot_loss_reward”使用Plotly库的“make_subplots”函数创建一个带有两个子图的图形。在训练周期内,该图显示了损失值和奖励值的趋势,提供了对DQN模型性能的洞察。
plot_loss_reward(total_losses, total_rewards)显示了在训练时期损失和奖励值的趋势。在DQN模型(可能用于股票市场预测)的训练过程中,代码使用该函数绘制损失和回报值。
def plot_train_test_by_q(train_env, test_env, Q, algorithm_name): # train pobs = train_env.reset() train_acts = [] train_rewards = [] train_ongoing_profits = [] for _ in range(len(train_env.data)-1): pact = Q(np.array(pobs, dtype=np.float32).reshape(1, -1)) pact = np.argmax(pact.data) train_acts.append(pact) obs, reward, done, profit = train_env.step(pact) train_rewards.append(reward) train_ongoing_profits.append(profit) pobs = obs train_profits = train_env.profits # test pobs = test_env.reset() test_acts = [] test_rewards = [] test_ongoing_profits = [] for _ in range(len(test_env.data)-1): pact = Q(np.array(pobs, dtype=np.float32).reshape(1, -1)) pact = np.argmax(pact.data) test_acts.append(pact) deep_hub_obs, reward, done, profit = test_env.step(pact) test_rewards.append(reward) test_ongoing_profits.append(profit) pobs = obs test_profits = test_env.profits # plot train_copy = train_env.data.copy() test_copy = test_env.data.copy() train_copy['act'] = train_acts + [np.nan] train_copy['reward'] = train_rewards + [np.nan] test_copy['act'] = test_acts + [np.nan] test_copy['reward'] = test_rewards + [np.nan] train0 = train_copy[train_copy['act'] == 0] train1 = train_copy[train_copy['act'] == 1] train2 = train_copy[train_copy['act'] == 2] test0 = test_copy[test_copy['act'] == 0] test1 = test_copy[test_copy['act'] == 1] test2 = test_copy[test_copy['act'] == 2] act_color0, act_color1, act_color2 = 'gray', 'cyan', 'magenta' data = [ Candlestick(x=train0.index, open=train0['Open'], high=train0['High'], low=train0['Low'], close=train0['Close'], increasing=dict(line=dict(color=act_color0)), decreasing=dict(line=dict(color=act_color0))), Candlestick(x=train1.index, open=train1['Open'], high=train1['High'], low=train1['Low'], close=train1['Close'], increasing=dict(line=dict(color=act_color1)), decreasing=dict(line=dict(color=act_color1))), Candlestick(x=train2.index, open=train2['Open'], high=train2['High'], low=train2['Low'], close=train2['Close'], increasing=dict(line=dict(color=act_color2)), decreasing=dict(line=dict(color=act_color2))), Candlestick(x=test0.index, open=test0['Open'], high=test0['High'], low=test0['Low'], close=test0['Close'], increasing=dict(line=dict(color=act_color0)), decreasing=dict(line=dict(color=act_color0))), Candlestick(x=test1.index, open=test1['Open'], high=test1['High'], low=test1['Low'], close=test1['Close'], increasing=dict(line=dict(color=act_color1)), decreasing=dict(line=dict(color=act_color1))), Candlestick(x=test2.index, open=test2['Open'], high=test2['High'], low=test2['Low'], close=test2['Close'], increasing=dict(line=dict(color=act_color2)), decreasing=dict(line=dict(color=act_color2))) ] title = '{}: train s-reward {}, profits {}, test s-reward {}, profits {}'.format( deephub_algorithm_name, int(sum(train_rewards)), int(train_profits), int(sum(test_rewards)), int(test_profits) ) layout = { 'title': title, 'showlegend': False, 'shapes': [ {'x0': date_split, 'x1': date_split, 'y0': 0, 'y1': 1, 'xref': 'x', 'yref': 'paper', 'line': {'color': 'rgb(0,0,0)', 'width': 1}} ], 'annotations': [ {'x': date_split, 'y': 1.0, 'xref': 'x', 'yref': 'paper', 'showarrow': False, 'xanchor': 'left', 'text': ' test data'}, {'x': date_split, 'y': 1.0, 'xref': 'x', 'yref': 'paper', 'showarrow': False, 'xanchor': 'right', 'text': 'train data '} ] } figure = Figure(data=data, layout=layout) iplot(figure) return train_ongoing_profits, test_ongoing_profitsPlot_train_test_by_q()将训练好的DQN模型在训练和测试数据集上的交易行为和性能可视化。
使用训练好的Q-Network,该函数初始化环境并迭代训练和测试数据。对于这两个数据集,它都会累积行动、奖励和持续利润。
为了分析或比较算法的性能,该函数返回训练和测试数据集的持续利润。
train_profits, test_profits = plot_train_test_by_q(Environment(train), Environment(test), dqn, 'DQN')基于DQN模型的预测,' train_profits '变量接收从训练数据中获得的利润。' test_profits '接收测试数据作为DQN模型预测的结果而获得的利润。
代码在训练和测试数据上评估训练好的DQN模型,并计算每个数据集上获得的利润。这种评价可能有助于确定DQN模型的准确性和有效性。
我们还可以将,将DQN模型的性能与用于股市预测的“买入并持有”策略进行比较。Matplotlib的' plt '模块用于生成绘图。
plt.figure(figsize=(23,8))plt.plot(data.index,((data['Close']-data['Close'][0])/data['Close'][-1]), label='buy and hold')plt.plot(train.index, ([0] + train_profits)/data['Close'][-1], label='rl (train)')plt.plot(test.index, (([0] + test_profits) + train_profits[-1])/data['Close'][-1], label='rl (test)')plt.ylabel('relative gain')plt.legend()plt.show()代码绘制了在训练数据上获得的利润。在该图中,x轴表示训练数据的指数,y轴表示DQN模型预测的相对收益。相对收益是通过将利润除以输入数据中的最后收盘价来计算的。
使用DQN模型,代码绘制了在测试数据上获得的利润。x轴表示测试数据的指数,y轴表示DQN模型预测的相对利润增益。通过将训练利润相加并除以导入数据中的最后收盘价来计算相对收益。该图的标签为“rl (test)”。
使用Matplotlib库的“show”函数显示该图。在训练和测试数据上,该图显示了“买入并持有”策略和DQN模型预测的相对利润增益。将DQN模型与“买入并持有”等简单策略进行比较,可以深入了解其有效性。
Double DQNDouble Deep Q-Network (DDQN) 是一种用于强化学习中的深度学习算法,特别是在处理离散动作空间的 Q-Learning 问题时非常有效。DDQN 是对传统 Deep Q-Network (DQN) 的一种改进,旨在解决 DQN 在估计 Q 值时可能存在的过高估计(overestimation)问题。
DDQN 使用一个额外的神经网络来评估选取最大 Q 值的动作。它不再直接使用目标 Q 网络预测的最大 Q 值来更新当前 Q 网络的 Q 值,而是使用当前 Q 网络选择的动作在目标 Q 网络中预测的 Q 值来更新。这种方法通过减少动作选择与目标 Q 值计算之间的相关性,有助于减轻 Q 值的过高估计问题。
def train_ddqn(env, epoch_num=50): Q_Network(chainer.Chain): def __init__(self, input_size, hidden_size, output_size): super(Q_Network, self).__init__( fc1 = L.Linear(input_size, hidden_size), fc2 = L.Linear(hidden_size, hidden_size), fc3 = L.Linear(hidden_size, output_size) ) def __call__(self, x): h = F.relu(self.fc1(x)) h = F.relu(self.fc2(h)) y = self.fc3(h) return y def reset(self): self.zerograds() Q = Q_Network(input_size=env.history_t+1, hidden_size=100, output_size=3) Q_ast = copy.deepcopy(Q) optimizer = chainer.optimizers.Adam() optimizer.setup(Q) step_max = len(env.data)-1 memory_size = 200 batch_size = 50 epsilon = 1.0 epsilon_decrease = 1e-3 epsilon_min = 0.1 start_reduce_epsilon = 200 train_freq = 10 update_q_freq = 20 gamma = 0.97 show_log_freq = 5 memory = [] total_step = 0 total_rewards = [] total_losses = [] start = time.time() for epoch in range(epoch_num): pobs = env.reset() step = 0 done = False total_reward = 0 total_loss = 0 while not done and step < step_max: # select act pact = np.random.randint(3) if np.random.rand() > epsilon: pact = Q(np.array(pobs, dtype=np.float32).reshape(1, -1)) pact = np.argmax(pact.data) # act obs, reward, done, profit = env.step(pact) # add memory memory.append((pobs, pact, reward, obs, done)) if len(memory) > memory_size: memory.pop(0) # train or update q if len(memory) == memory_size: if total_step % train_freq == 0: deep_hub_shuffled_memory = np.random.permutation(memory) memory_idx = range(len(shuffled_memory)) for i in memory_idx[::batch_size]: batch = np.array(shuffled_memory[i:i+batch_size]) b_pobs = np.array(batch[:, 0].tolist(), dtype=np.float32).reshape(batch_size, -1) b_pact_deephub = np.array(batch[:, 1].tolist(), dtype=np.int32) b_reward = np.array(batch[:, 2].tolist(), dtype=np.int32) b_obs = np.array(batch[:, 3].tolist(), dtype=np.float32).reshape(batch_size, -1) b_done = np.array(batch[:, 4].tolist(), dtype=np.bool) q = Q(b_pobs) """ <<< DQN -> Double DQN maxq = np.max(Q_ast(b_obs).data, axis=1) === """ indices = np.argmax(q.data, axis=1) maxqs = Q_ast(b_obs).data """ >>> """ target = copy.deepcopy(q.data) for j in range(batch_size): """ <<< DQN -> Double DQN target[j, b_pact[j]] = b_reward[j]+gamma*maxq[j]*(not b_done[j]) === """ target[j, b_pact[j]] = b_reward[j]+gamma*maxqs[j, indices[j]]*(not b_done[j]) """ >>> """ Q.reset() loss = F.mean_squared_error(q, target) total_loss += loss.data loss.backward() optimizer.update() if total_step % update_q_freq == 0: Q_ast = copy.deepcopy(Q) # epsilon if epsilon > epsilon_min and total_step > start_reduce_epsilon: epsilon -= epsilon_decrease # next step total_reward += reward pobs = obs step += 1 total_step += 1 total_rewards.append(total_reward) total_losses.append(total_loss) if (epoch+1) % show_log_freq == 0: log_reward = sum(total_rewards[((epoch+1)-show_log_freq):])/show_log_freq log_loss = sum(total_losses[((epoch+1)-show_log_freq):])/show_log_freq elapsed_time = time.time()-start print('\t'.join(map(str, [epoch+1, epsilon, total_step, log_reward, log_loss, elapsed_time]))) start = time.time() return Q, total_losses, total_rewards上面代码定义了一个函数train_ddqn(),该函数训练Double Deep Q-Network (DDQN)来解决交易环境。
ddqn, total_losses, total_rewards = train_ddqn(Environment(train), epoch_num=50)plot_loss_reward(total_losses, total_rewards)可视化了在训练时期的损失和奖励值的趋势。在DDQN模型(可能用于预测股票市场价格)的训练过程中,该函数绘制损失和回报值。
train_profits, test_profits = plot_train_test_by_q(Environment(train), Environment(test), ddqn, 'Double DQN')在训练和测试数据上评估训练后的DDQN模型的性能,为每个数据集获得利润。对于股票市场预测或其他需要强化学习的决策任务,此评估的结果可能有用。
plt.figure(figsize=(23,8))plt.plot(data.index,((data['Close']-data['Close'][0])/data['Close'][-1]), label='buy and hold')plt.plot(train.index, ([0] + train_profits)/data['Close'][-1], label='rl (train)')plt.plot(test.index, (([0] + test_profits) + train_profits[-1])/data['Close'][-1], label='rl (test)')plt.ylabel('relative gain')plt.legend()plt.show()可以看到Double DQN要更高一些。这和Double Deep Q-Network的介绍: (DDQN)通过使用两个神经网络来分别估计当前策略选择的动作和目标 Q 值的最大动作,有效解决了传统 DQN 中的 Q 值过高估计问题,提高了在离散动作空间下的强化学习性能和稳定性。是相吻合的
Dueling Double DQNDueling Double Deep Q-Network (Dueling DDQN) 是一种结合了两种技术的强化学习算法:Dueling网络结构和Double DQN。它旨在进一步提高 Q-Learning 的效率和稳定性,特别是在处理离散动作空间的问题时非常有效。
def train_dddqn(env, epoch_num=50): """ <<< Double DQN -> Dueling Double DQN Q_Network(chainer.Chain): def __init__(self, input_size, hidden_size, output_size): super(Q_Network, self).__init__( fc1 = L.Linear(input_size, hidden_size), fc2 = L.Linear(hidden_size, hidden_size), fc3 = L.Linear(hidden_size, output_size) ) def __call__(self, x): h = F.relu(self.fc1(x)) h = F.relu(self.fc2(h)) y = self.fc3(h) return y def reset(self): self.zerograds() === """ Q_Network(chainer.Chain): def __init__(self, input_size, hidden_size, output_size): super(Q_Network, self).__init__( fc1 = L.Linear(input_size, hidden_size), fc2 = L.Linear(hidden_size, hidden_size), fc3 = L.Linear(hidden_size, hidden_size//2), fc4 = L.Linear(hidden_size, hidden_size//2), state_value = L.Linear(hidden_size//2, 1), advantage_value = L.Linear(hidden_size//2, output_size) ) self.input_size = input_size self.hidden_size = hidden_size self.output_size = output_size def __call__(self, x): h = F.relu(self.fc1(x)) h = F.relu(self.fc2(h)) hs = F.relu(self.fc3(h)) ha = F.relu(self.fc4(h)) state_value = self.state_value(hs) advantage_value = self.advantage_value(ha) advantage_mean = (F.sum(advantage_value, axis=1)/float(self.output_size)).reshape(-1, 1) q_value = F.concat([state_value for _ in range(self.output_size)], axis=1) + (advantage_value - F.concat([advantage_mean for _ in range(self.output_size)], axis=1)) return q_value def reset(self): self.zerograds() """ >>> """ Q = Q_Network(input_size=env.history_t+1, hidden_size=100, output_size=3) Q_ast = copy.deepcopy(Q) optimizer = chainer.optimizers.Adam() optimizer.setup(Q) step_max = len(env.data)-1 memory_size = 200 batch_size = 50 epsilon = 1.0 epsilon_decrease = 1e-3 epsilon_min = 0.1 start_reduce_epsilon = 200 train_freq = 10 update_q_freq = 20 gamma = 0.97 show_log_freq = 5 memory = [] total_step = 0 total_rewards = [] total_losses = [] start = time.time() for epoch in range(epoch_num): pobs = env.reset() step = 0 done = False total_reward = 0 total_loss = 0 while not done and step < step_max: # select act pact = np.random.randint(3) if np.random.rand() > epsilon: pact = Q(np.array(pobs, dtype=np.float32).reshape(1, -1)) pact = np.argmax(pact.data) # act obs, reward, done, profit = env.step(pact) # add memory memory.append((pobs, pact, reward, obs, done)) if len(memory) > memory_size: memory.pop(0) # train or update q if len(memory) == memory_size: if total_step % train_freq == 0: shuffled_memory = np.random.permutation(memory) memory_idx = range(len(shuffled_memory)) for i in memory_idx[::batch_size]: deephub_batch = np.array(shuffled_memory[i:i+batch_size]) b_pobs = np.array(batch[:, 0].tolist(), dtype=np.float32).reshape(batch_size, -1) b_pact = np.array(batch[:, 1].tolist(), dtype=np.int32) b_reward = np.array(batch[:, 2].tolist(), dtype=np.int32) b_obs = np.array(batch[:, 3].tolist(), dtype=np.float32).reshape(batch_size, -1) b_done = np.array(batch[:, 4].tolist(), dtype=np.bool) q = Q(b_pobs) """ <<< DQN -> Double DQN maxq = np.max(Q_ast(b_obs).data, axis=1) === """ indices = np.argmax(q.data, axis=1) maxqs = Q_ast(b_obs).data """ >>> """ target = copy.deepcopy(q.data) for j in range(batch_size): """ <<< DQN -> Double DQN target[j, b_pact[j]] = b_reward[j]+gamma*maxq[j]*(not b_done[j]) === """ target[j, b_pact[j]] = b_reward[j]+gamma*maxqs[j, indices[j]]*(not b_done[j]) """ >>> """ Q.reset() loss = F.mean_squared_error(q, target) total_loss += loss.data loss.backward() optimizer.update() if total_step % update_q_freq == 0: Q_ast = copy.deepcopy(Q) # epsilon if epsilon > epsilon_min and total_step > start_reduce_epsilon: epsilon -= epsilon_decrease # next step total_reward += reward pobs = obs step += 1 total_step += 1 total_rewards.append(total_reward) total_losses.append(total_loss) if (epoch+1) % show_log_freq == 0: log_reward = sum(total_rewards[((epoch+1)-show_log_freq):])/show_log_freq log_loss = sum(total_losses[((epoch+1)-show_log_freq):])/show_log_freq elapsed_time = time.time()-start print('\t'.join(map(str, [epoch+1, epsilon, total_step, log_reward, log_loss, elapsed_time]))) start = time.time() return Q, total_losses, total_rewards在call方法中,前两层在两个流之间共享,然后分成两个独立的流。状态价值流有一个输出单个值的额外线性层(state_value),而优势价值流有一个为每个动作输出值的额外线性层(advantage_value)。最终的q值由状态值和优势值结合计算,并减去平均优势值以保持稳定性。代码的其余部分与Double DQN实现非常相似。
dddqn, total_losses, total_rewards = train_dddqn(Environment(train), epoch_num=25)plot_loss_reward(total_losses, total_rewards)train_profits, test_profits = plot_train_test_by_q(Environment(train), Environment(test), dddqn, 'Dueling Double DQN')plt.figure(figsize=(23,8))plt.plot(data.index,((data['Close']-data['Close'][0])/data['Close'][-1]), label='buy and hold')plt.plot(train.index, ([0] + train_profits)/data['Close'][-1], label='rl (train)')plt.plot(test.index, (([0] + test_profits) + train_profits[-1])/data['Close'][-1], label='rl (test)')plt.plot(test.index, (([0] + test_profits) - data['Close'][0] + data['Close'][len(train_profits)])/data['Close'][-1], label='rl (test)')plt.ylabel('relative gain')plt.legend()plt.show()Dueling Double Deep Q-Network (Dueling DDQN) 是一种结合了 Dueling 网络结构和 Double DQN 的强化学习算法。它通过将 Q 函数分解为状态值函数和优势函数来提高效率,同时利用 Double DQN 的思想来减少 Q 值的过高估计,从而在处理离散动作空间下的强化学习问题中表现出色。
总结让我们对传统的 Deep Q-Network (DQN), Double DQN, Dueling DQN 和 Dueling Double DQN 进行对比总结,看看它们各自的特点和优劣势。
1、Deep Q-Network (DQN)
特点使用深度神经网络来估计 Q 函数,从而学习到每个状态下每个动作的价值。使用经验回放和固定 Q 目标网络来提高稳定性和收敛性。优点引入深度学习提高了 Q 函数的表示能力,能够处理复杂的状态和动作空间。经验回放和固定 Q 目标网络有助于稳定训练过程,减少样本间的相关性。缺点存在 Q 值的过高估计问题,尤其是在动作空间较大时更为明显,这可能导致训练不稳定和性能下降。2、Double Deep Q-Network (Double DQN)
特点解决了 DQN 中 Q 值过高估计的问题。引入一个额外的目标 Q 网络来计算目标 Q 值,减少更新时的相关性。优点减少了 Q 值的过高估计,提高了训练的稳定性和收敛性。缺点算法结构相对简单,对于某些复杂问题可能需要更高的表示能力。3、Dueling Double Deep Q-Network (Dueling DDQN)
特点结合了 Dueling 网络结构和 Double DQN 的优势。使用 Dueling 网络结构来分解 Q 函数,提高了效率和学习表示能力。使用 Double DQN 的思想来减少 Q 值的过高估计问题。优点综合了两种技术的优势,能够在更广泛的问题空间中表现出色。提高了训练的稳定性和效率,有助于更快地收敛到较好的策略。缺点算法实现和调优可能比单一 DQN 及其改进版更复杂。总结比较效果和稳定性:Dueling DDQN 在处理动作空间较大的问题时表现出更高的效率和稳定性,因为它们能够更有效地分离状态值和动作优势。过高估计问题:Dueling DDQN 解决了传统 DQN 中 Q 值过高估计的问题,其中 Double DQN 通过目标网络降低相关性,而 Dueling 结构则通过优势函数减少过高估计。复杂性:Dueling DDQN 相对于传统 DQN 和 Double DQN 更复杂,需要更多的实现和理解成本,但也带来了更好的性能。传统 DQN 适用于简单的强化学习任务,而 Double DQN、Dueling DDQN 则适用于更复杂和具有挑战性的问题,根据问题的特性选择合适的算法可以有效提升训练效率和性能。
最后我们也看到,深度强化学习预测股票是可行的,因为他不再预测具体的股票价格,而是针对收益预测买进,卖出和持有,我们这里只是使用了股票本身的数据,如果有更多的外生数据那么强化学习应该可以模拟更准确的人工操作。
数据集地址:
https://www.kaggle.com/datasets/borismarjanovic/price-volume-data-for-all-us-stocks-etfs