强化学习(四)—— Actor-Critic
强化学习(四)—— Actor-Critic
1. 网络结构
-
状态价值函数:
V
π
(
s
t
)
=
∑
a
Q
π
(
s
t
,
a
)
⋅
π
(
a
∣
s
t
)
V_pi(s_t)=sum_aQ_pi(s_t,a)cdotpi(a|s_t)
Vπ(st)=a∑Qπ(st,a)⋅π(a∣st)
-
通过策略网络近似策略函数:
π
(
a
∣
s
)
≈
π
(
a
∣
s
;
θ
)
pi(a|s)approxpi(a|s;theta)
π(a∣s)≈π(a∣s;θ)
-
通过价值网络近似动作价值函数:
q
(
s
,
a
;
W
)
≈
Q
(
s
,
a
)
q(s,a;W)approx Q(s,a)
q(s,a;W)≈Q(s,a)
-
神经网络近似后的状态价值函数:
V
(
s
;
θ
,
W
)
=
∑
a
q
(
s
,
a
;
W
)
∗
π
(
a
∣
s
;
θ
)
V(s;theta ,W)=sum_aq(s,a;W)*pi(a|s;theta)
V(s;θ,W)=a∑q(s,a;W)∗π(a∣s;θ)
-
通过对策略网络不断更新以增加状态价值函数值。
-
通过对价值网络不断更新来更好的预测所获得的回报。
2. 网络函数
Policy Network
- 通过策略网络近似策略函数
π
(
a
∣
s
t
)
≈
π
(
a
∣
s
t
;
θ
)
π(a|s_t)≈π(a|s_t;theta)
- 状态价值函数及其近似
V
π
(
s
t
)
=
∑
a
π
(
a
∣
s
t
)
Q
π
(
s
t
,
a
)
V_π(s_t)=sum_aπ(a|s_t)Q_π(s_t,a)
V
(
s
t
;
θ
)
=
∑
a
π
(
a
∣
s
t
;
θ
)
⋅
Q
π
(
s
t
,
a
)
V(s_t;theta)=sum_aπ(a|s_t;theta)·Q_π(s_t,a)
- 策略学习最大化的目标函数
J
(
θ
)
=
E
S
[
V
(
S
;
θ
)
]
J(theta)=E_S[V(S;theta)]
- 依据策略梯度上升进行
θ
←
θ
+
β
⋅
∂
V
(
s
;
θ
)
∂
θ
thetagetstheta+beta·frac{partial V(s;theta)}{partial theta}
3. 策略网络的更新-策略梯度
Policy Network
- 策略梯度为:
g
(
a
,
θ
)
=
∂
l
n
π
(
a
∣
s
;
θ
)
∂
θ
⋅
q
(
s
,
a
;
W
)
∂
V
(
s
;
θ
,
W
)
∂
θ
=
E
[
g
(
A
,
θ
)
]
g(a,theta)=frac{partial lnpi(a|s;theta)}{partial theta}cdot q(s,a;W)\frac{partial V(s;theta,W)}{partial theta}=E[g(A,theta)]
- 可采用随机策略梯度,(无偏估计)
a
∼
π
(
⋅
∣
s
t
;
θ
)
θ
t
+
1
=
θ
t
+
β
⋅
g
(
a
,
θ
t
)
asim pi(cdot|s_t;theta)\theta_{t+1}=theta_t+beta·g(a,theta_t)
4. 价值网络的更新-时序差分(TD)
- TD的目标:
y
t
=
r
t
+
γ
q
(
s
t
+
1
,
a
t
+
1
;
W
t
)
y_t= r_t+gamma q(s_{t+1},a_{t+1};W_t)
- 损失函数为:
l
o
s
s
=
1
2
[
q
(
s
t
,
a
t
;
W
t
)
−
y
t
]
2
loss = frac{1}{2}[q(s_t,a_t;W_t)-y_t]^2
- 采用梯度下降进行更新:
W
t
+
1
=
W
t
−
α
⋅
∂
l
o
s
s
∂
W
∣
W
=
W
t
W_{t+1}=W_t-alphacdotfrac{partial loss}{partial W}|_{W=W_t}
5. 网络训练流程
一次更新中,Agent执行一次动作,获得一次奖励。
- 获得状态st并随机采样动作:
a
t
∼
π
(
⋅
∣
s
t
;
θ
)
a_t simpi(cdot|s_t;theta)
- Agent执行动作,并获得环境的新状态和奖励:
s
t
+
1
r
t
s_{t+1}\r_t
- 依据新状态再次随机采样动作(该动作在本次迭代中并不执行):
a
~
t
+
1
∼
π
(
⋅
∣
s
t
+
1
;
θ
)
tilde{a}_{t+1}simpi(cdot|s_{t+1};theta)
- 依据价值网络,分别计算两个动作和状态的价值:
q
t
=
q
(
s
t
,
a
t
;
W
t
)
q
t
+
1
=
q
(
s
t
+
1
,
a
~
t
+
1
;
W
t
)
q_t=q(s_t,a_t;W_t)\q_{t+1}=q(s_{t+1},tilde{a}_{t+1};W_t)
- 计算TD误差:
δ
t
=
q
t
−
(
r
t
+
γ
q
t
+
1
)
delta_t=q_t-(r_t+gamma q_{t+1})
- 计算价值网络的导数:
d
W
,
t
=
∂
q
(
s
t
,
a
t
;
W
)
∂
W
∣
W
=
W
t
d_{W,t}=frac{partial q(s_t,a_t;W)}{partial W}|_{W=W_t}
- 对价值网络进行梯度更新:
W
t
+
1
=
W
t
−
α
⋅
δ
t
⋅
d
W
,
t
W_{t+1}=W_t-alphacdotdelta_tcdot d_{W,t}
- 计算策略网络的梯度:
d
θ
,
t
=
∂
l
n
[
π
(
⋅
∣
s
t
;
θ
)
]
∂
θ
∣
θ
=
θ
t
d_{theta,t}=frac{partial ln[pi(cdot|s_t;theta)]}{partialtheta}|_{theta=theta_t}
- 对策略网络进行梯度更新,式(2)为式(1)对动作价值函数值使用了baseline,目标函数的期望一致,但是方差减小,网络更容易收敛。
θ
t
+
1
=
θ
+
β
⋅
q
t
⋅
d
θ
,
t
θ
t
+
1
=
θ
+
β
⋅
δ
t
⋅
d
θ
,
t
theta_{t+1}=theta+betacdot q_tcdot d_{theta,t}\theta_{t+1}=theta+betacdot delta_tcdot d_{theta,t}
6. 案例
该网络的收敛对于模型大小、激活函数等参数较敏感。
# -*- coding: utf-8 -*-
# @Time : 2022/3/29 21:51
# @Author : CyrusMay WJ
# @FileName: AC.py
# @Software: PyCharm
# @Blog :https://blog.csdn.net/Cyrus_May
import tensorflow as tf
import numpy as np
import logging
import sys
import gym
class Critic():
def __init__(self,logger=None,input_dim=6,gamma=0.9):
self.logger = logger
self.__build_model(input_dim)
self.gamma = gamma
self.optimizer = tf.optimizers.Adam(learning_rate=0.001)
def __build_model(self,input_dim):
self.model = tf.keras.Sequential([
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(1)
])
self.model.build(input_shape=[None,input_dim])
def predict(self,action,state):
action = tf.one_hot([action],depth=2)
state = tf.convert_to_tensor([state])
x = tf.concat([action,state],axis=1)
return self.model(x)[0][0]
def train(self,state,state_,action,action_,reward,done):
action = tf.one_hot([action], depth=2)
state = tf.convert_to_tensor([state])
action_ = tf.one_hot([action_], depth=2)
state_ = tf.convert_to_tensor([state_])
x = tf.concat([action, state], axis=1)
x_ = tf.concat([action_, state_], axis=1)
done = 0 if done else 1
with tf.GradientTape() as tape:
q = self.model(x)
q_ = self.model(x_)
Td_error = (reward + done * self.gamma * q_ - q)
loss = tf.square(Td_error)
dt = tape.gradient(loss,self.model.trainable_variables)
self.optimizer.apply_gradients(zip(dt,self.model.trainable_variables))
return Td_error
class Actor():
def __init__(self,logger=None,input_dim=4,gamma=0.9,output_dim=2):
self.logger = logger
self.__build_model(input_dim,output_dim)
self.gamma = gamma
self.optimizer = tf.optimizers.Adam(learning_rate=0.001)
self.output_dim = output_dim
def __build_model(self,input_dim,output_dim=2):
self.model = tf.keras.Sequential([
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(output_dim)
])
self.model.build(input_shape=[None,input_dim])
def predict(self,state):
state = tf.convert_to_tensor([state])
logits = self.model(state)
prob = tf.nn.softmax(logits).numpy()
action = np.random.choice([i for i in range(self.output_dim)],p=prob.ravel())
return action
def train(self,state,action,TD_error,done):
state = tf.convert_to_tensor([state])
with tf.GradientTape() as tape:
logits = self.model(state)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels = [action], logits=logits)
loss = tf.reduce_sum(tf.multiply(TD_error,loss))
dt = tape.gradient(loss,self.model.trainable_variables)
self.optimizer.apply_gradients(zip(dt,self.model.trainable_variables))
class Agent():
def __init__(self,gamma=0.9,logger=None):
self.gamma = gamma
self.logger = logger
self.env = gym.make("CartPole-v0")
self.actor = Actor(logger=logger,input_dim=4,gamma=self.gamma,output_dim=2)
self.critic = Critic(logger = logger,input_dim=6,gamma=self.gamma)
def train(self,tran_epochs=1000,max_act=100):
history_returns = []
for epoch in range(tran_epochs):
single_returns = 0
state = self.env.reset()
for iter in range(max_act):
self.env.render()
action = self.actor.predict(state)
state_,reward,done,info = self.env.step(action)
action_ = self.actor.predict(state_)
TD_error = self.critic.train(state,state_,action,action_,reward,done)
self.actor.train(state,action,TD_error,done)
single_returns+=(reward)
state = state_
if done:
break
if history_returns:
history_returns.append(history_returns[-1]*0.9+0.1*single_returns)
else:
history_returns.append( single_returns)
self.logger.info("epoch:{}{} || epoch return:{:,.4f} || history return:{:,.4f}".format(tran_epochs,epoch+1,single_returns,history_returns[-1]))
self.env.close()
def test(self,max_act=1000):
state = self.env.reset()
single_returns = 0
for iter in range(max_act):
self.env.render()
action = self.actor.predict(state)
state_, reward, done, info = self.env.step(action)
single_returns += (reward)
if done:
self.logger.info("End in {} iterations".format(iter+1))
break
if not done:
self.logger.info("success and return is {}".format(single_returns))
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
screen_handler = logging.StreamHandler(sys.stdout)
screen_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(module)s.%(funcName)s:%(lineno)d - %(levelname)s - %(message)s')
screen_handler.setFormatter(formatter)
logger.addHandler(screen_handler)
agent = Agent(logger=logger)
agent.train(tran_epochs=2000,max_act=500)
agent.test()
本文部分内容为参考B站学习视频书写的笔记!
by CyrusMay 2022 03 29
摸不到的颜色 是否叫彩虹
看不到的拥抱 是否叫做微风
————五月天(星空)————