个人笔记-SAPINNs-1DAC

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import time
import scipy.io
import math
import matplotlib.gridspec as gridspec
from plotting import newfig
from mpl_toolkits.axes_grid1 import make_axes_locatable
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import layers, activations
from scipy.interpolate import griddata
from eager_lbfgs import lbfgs, Struct
from pyDOE import lhs
#define size of the network   (设置全连接神经网络的各层神经元的结构)
#全连接的神经网络的结构如下,输入的神经元是2个,输出的是1个。
#因此模型的输入应该是(batchsize,2),也就是说有batchsize那么多个(x,t),而输出则是(batchsize,1)。
layer_sizes = [2, 128, 128, 128, 128, 1]
#权重
sizes_w = []
sizes_b = []

for i, width in enumerate(layer_sizes):
    if i != 1:
        sizes_w.append(int(width * layer_sizes[1]))
        sizes_b.append(int(width if i != 0 else layer_sizes[1]))
        
#L-BFGS weight getting and setting from https://github.com/pierremtb/PINNs-TF2.0
def set_weights(model, w, sizes_w, sizes_b): 
        for i, layer in enumerate(model.layers[0:]):
            start_weights = sum(sizes_w[:i]) + sum(sizes_b[:i])
            end_weights = sum(sizes_w[:i+1]) + sum(sizes_b[:i])
            weights = w[start_weights:end_weights]
            w_div = int(sizes_w[i] / sizes_b[i])
            weights = tf.reshape(weights, [w_div, sizes_b[i]])
            biases = w[end_weights:end_weights + sizes_b[i]]
            weights_biases = [weights, biases]
            layer.set_weights(weights_biases)


def get_weights(model): 
        w = []
        for layer in model.layers[0:]:
            weights_biases = layer.get_weights()
            weights = weights_biases[0].flatten()
            biases = weights_biases[1]
            w.extend(weights)
            w.extend(biases)

        w = tf.convert_to_tensor(w)
        return w
#define the neural network model(构建神经网络的计算关系)
def neural_net(layer_sizes):
    model = Sequential() #序列模型。通过堆叠许多层,构建出深度神经网络。
    model.add(layers.InputLayer(input_shape=(layer_sizes[0],)))
    for width in layer_sizes[1:-1]:   #用循环的方式构建神经网络的计算关系,激活函数是tanh
        model.add(layers.Dense(
            width, activation=tf.nn.tanh,
            kernel_initializer="glorot_normal"))
    model.add(layers.Dense(
            layer_sizes[-1], activation=None,
            kernel_initializer="glorot_normal"))
    return model
#define the loss (定义损失函数的计算关系,使用的是RMSE)
def loss(x_f_batch, t_f_batch,
         x0, t0, u0, x_lb,
         t_lb, x_ub, t_ub,
         col_weights, u_weights):

    f_u_pred = f_model(x_f_batch, t_f_batch)
    u0_pred = u_model(tf.concat([x0, t0], 1))

    u_lb_pred, u_x_lb_pred, = u_x_model(u_model, x_lb, t_lb)
    u_ub_pred, u_x_ub_pred, = u_x_model(u_model, x_ub, t_ub)
    
	# mse_0_u 和mse_b_u是初边值的监督学习
    mse_0_u = tf.reduce_mean(tf.square(u_weights*(u0 - u0_pred))) #对初值加权u_weights
    mse_b_u = tf.reduce_mean(tf.square(tf.math.subtract(u_lb_pred, u_ub_pred))) + 
              tf.reduce_mean(tf.square(tf.math.subtract(u_x_lb_pred, u_x_ub_pred)))
              
	#mse_f_u是用方程约束的非监督学习
    mse_f_u = tf.reduce_mean(tf.square(col_weights * f_u_pred[0])) #对内部点加权col_weights

    return  mse_0_u + mse_b_u + mse_f_u , tf.reduce_mean(tf.square((u0 - u0_pred))), mse_b_u, tf.reduce_mean(tf.square(f_u_pred)) 
    #分别输出对初值和内部点都加权的total loss,没有加权的初值loss,没有加权的边值loss,没有加权的内部点loss

#define the physics-based residual, we want this to be 0(我们想要实现f_u=0)
# 非监督学习的点
@tf.function
def f_model(x,t):
	# 将x和t输入神经网络,得到神经网络的输出u
    u = u_model(tf.concat([x, t],1))
    # 将神经网络的输出u对神经网络的输入x和t求导,分别得到u_x, u_xx和u_t
    u_x = tf.gradients(u, x)
    u_xx = tf.gradients(u_x, x)  # 由于tf没有定义二阶导数的api,只需要求u_x对x实现u_xx
    u_t = tf.gradients(u,t)
   
   # 相加得到loss_f,如果f趋近于0则可认为神经网络的输入u和其导数满足方程f
    c1 = tf.constant(.0001, dtype = tf.float32)
    c2 = tf.constant(5.0, dtype = tf.float32)
    f_u = u_t - c1*u_xx + c2*u*u*u - c2*u 
    return f_u

# 初边值点
@tf.function
def u_x_model(u_model, x, t):
    u = u_model(tf.concat([x, t],1))
    u_x = tf.gradients(u, x)
    return u, u_x

@tf.function
def grad(model, x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights):
    with tf.GradientTape(persistent=True) as tape:
        loss_value, mse_0, mse_b, mse_f = loss(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights)
        grads = tape.gradient(loss_value, u_model.trainable_variables)
        #print(grads)
        grads_col = tape.gradient(loss_value, col_weights)
        grads_u = tape.gradient(loss_value, u_weights)
        gradients_u = tape.gradient(mse_0, u_model.trainable_variables)
        gradients_f = tape.gradient(mse_f, u_model.trainable_variables)

    return loss_value, mse_0, mse_b, mse_f, grads, grads_col, grads_u, gradients_u, gradients_f
#训练模型
def fit(x_f, t_f, x0, t0, u0, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights, tf_iter, newton_iter):

    #Can adjust batch size for collocation points, here we set it to N_f
    batch_sz = N_f
    n_batches =  N_f // batch_sz

    start_time = time.time()
    #create optimizer s for the network weights, collocation point mask, and initial boundary mask(Adam的优化器)
    tf_optimizer = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.99)
    tf_optimizer_weights = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.99)
    tf_optimizer_u = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.99)

    print("starting Adam training")

    # For mini-batch (if used)
    for epoch in range(tf_iter):
        for i in range(n_batches):

            x0_batch = x0
            t0_batch = t0
            u0_batch = u0

            x_f_batch = x_f[i*batch_sz:(i*batch_sz + batch_sz),]
            t_f_batch = t_f[i*batch_sz:(i*batch_sz + batch_sz),]

            loss_value, mse_0, mse_b, mse_f, grads, grads_col, grads_u, g_u, g_f = grad(u_model, x_f_batch, t_f_batch, x0_batch, t0_batch,  u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights)

            tf_optimizer.apply_gradients(zip(grads, u_model.trainable_variables))
            tf_optimizer_weights.apply_gradients(zip([-grads_col, -grads_u], [col_weights, u_weights]))

        if epoch % 1 == 0:
            elapsed = time.time() - start_time
            print('It: %d, Time: %.2f' % (epoch, elapsed))
            tf.print(f"mse_0: {mse_0}  mse_b  {mse_b}  mse_f: {mse_f}   total loss: {loss_value}")
            start_time = time.time()

    #l-bfgs-b optimization (L-BFGS-B的优化器)
    print("Starting L-BFGS training")

    loss_and_flat_grad = get_loss_and_flat_grad(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights)

    lbfgs(loss_and_flat_grad,
      get_weights(u_model),
      Struct(), maxIter=newton_iter, learningRate=0.8)
#L-BFGS implementation from https://github.com/pierremtb/PINNs-TF2.0
def get_loss_and_flat_grad(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights):
    def loss_and_flat_grad(w):
        with tf.GradientTape() as tape:
            set_weights(u_model, w, sizes_w, sizes_b)
            loss_value, _, _, _ = loss(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights)
        grad = tape.gradient(loss_value, u_model.trainable_variables)
        grad_flat = []
        for g in grad:
            grad_flat.append(tf.reshape(g, [-1]))
        grad_flat = tf.concat(grad_flat, 0)
        #print(loss_value, grad_flat)
        return loss_value, grad_flat

    return loss_and_flat_grad
def predict(X_star):
    X_star = tf.convert_to_tensor(X_star, dtype=tf.float32)
    u_star, _ = u_x_model(u_model, X_star[:,0:1],
                     X_star[:,1:2])

    f_u_star = f_model(X_star[:,0:1],
                 X_star[:,1:2])

    return u_star.numpy(), f_u_star.numpy()

--------------------------------------------------------------------从main这里开始看--------------------------------------------------------------------------------------
因此,我们可以理解为PINNs模型的输入就是自变量的坐标点,一个个坐标点摞起来,每行是一个坐标(x,t),而输出的值就是那个坐标所对应的u(x,t)。而PINNs的输入分为边界点和内部点,在源代码中用u和f区分。我们知道这类偏微分方程求解时,需要确定边界条件和初始条件才能让解唯一。因此边界和初始的值是事先给定的,对于PINNs模型来说,就是初边值的点是监督学习的部分。而内部的点由于实现不知道其解u的值,所以是非监督学习,我们的目标就是让神经网络的输出满足方程,来解出内部点中u的值。举个例子,初边值的点应该有label,如果用(x,t,u)的数据结构表示的话,即(0.5,0,-1),(-1,0.5,0),其中前一个点上用初始条件给出的,后一个点是边界条件给出的。对于内部点,则只需要给出(x,t)即可,如(0.3,0.5),应在定义域x ∈ [−1, 1], t ∈ [0, 1] 中取出足够多的点进行非监督学习。明白了模型的输入输出,就可以看明白该代码中main函数开头的数据预处理的部分了.

# Define constants and weight vectors

lb = np.array([-1.0])		#下界
ub = np.array([1.0])		#上界

N0 = 512  	 # 初值点的数量
N_b = 100   	# 边值点的数量
N_f = 20000	# 内部点的数量

col_weights = tf.Variable(tf.random.uniform([N_f, 1]))
u_weights = tf.Variable(100*tf.random.uniform([N0, 1]))

#initialize the NN (初始化神经网络)
u_model = neural_net(layer_sizes)

#view the NN(输出模型结构信息)
u_model.summary()

# Import data, same data as Raissi et al
# 初边值点的数据预处理
# 作者将初边值的结果存到了AC.mat文件中
data = scipy.io.loadmat('AC.mat')

# 将坐标x,t和对应的u都从AC.mat读取出来
t = data['tt'].flatten()[:,None]
x = data['x'].flatten()[:,None]
Exact = data['uu']
Exact_u = np.real(Exact)

#grab training points from domain
# 随机打乱一下点的排序,避免训练时输入有规律的输入
idx_x = np.random.choice(x.shape[0], N0, replace=False)
x0 = x[idx_x,:]
u0 = tf.cast(Exact_u[idx_x,0:1], dtype = tf.float32)
# 其中x0代表初边值点的坐标,u0代表初边值对应的具体的值,即x0的label
 
idx_t = np.random.choice(t.shape[0], N_b, replace=False)
tb = t[idx_t,:]

# Grab collocation points using latin hpyercube sampling
# 制作内部点的数据集X_f
# lhs这个函数是随机生成N_f个0-1的数据,2代表生成的维度,因为(x,y)所以设置为2
# 因此这一行代表在定义域内随机生成N_f个(x,y)坐标
X_f = lb + (ub-lb)*lhs(2, N_f)
# X_f代表内部点的坐标,lb, ub代表定义域的边界
 
x_f = tf.convert_to_tensor(X_f[:,0:1], dtype=tf.float32)
t_f = tf.convert_to_tensor(np.abs(X_f[:,1:2]), dtype=tf.float32)


X0 = np.concatenate((x0, 0*x0), 1) # (x0, 0)
X_lb = np.concatenate((0*tb + lb[0], tb), 1) # (lb[0], tb)
X_ub = np.concatenate((0*tb + ub[0], tb), 1) # (ub[0], tb)

x0 = tf.cast(X0[:,0:1], dtype = tf.float32)
t0 = tf.cast(X0[:,1:2], dtype = tf.float32)

x_lb = tf.convert_to_tensor(X_lb[:,0:1], dtype=tf.float32)
t_lb = tf.convert_to_tensor(X_lb[:,1:2], dtype=tf.float32)

x_ub = tf.convert_to_tensor(X_ub[:,0:1], dtype=tf.float32)
t_ub = tf.convert_to_tensor(X_ub[:,1:2], dtype=tf.float32)


#train loop (调用model的fit方法开始进行训练) 
fit(x_f, t_f, x0, t0, u0, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights, tf_iter = 10000, newton_iter = 10000)


#generate meshgrid for forward pass of u_pred
# 将这些点摞起来,变成(N_u,x,t)的数据结构
X, T = np.meshgrid(x,t)

X_star = np.hstack((X.flatten()[:,None], T.flatten()[:,None]))
u_star = Exact_u.T.flatten()[:,None]

lb = np.array([-1.0, 0.0]) #这里的lb,ub应该是给下面画图用的吧...
ub = np.array([1.0, 1])

# 调用model的predict方法传入坐标X_star获得对应的结果u_pred和方程偏差 f_u_pred
u_pred, f_u_pred = predict(X_star)

# 计算error
error_u = np.linalg.norm(u_star-u_pred,2)/np.linalg.norm(u_star,2)

# 打印error
print('Error u: %e' % (error_u))


U_pred = griddata(X_star, u_pred.flatten(), (X, T), method='cubic')

FU_pred = griddata(X_star, f_u_pred.flatten(), (X, T), method='cubic')

######################################################################
############################# Plotting ###############################
######################################################################

X0 = np.concatenate((x0, 0*x0), 1) # (x0, 0)
X_lb = np.concatenate((0*tb + lb[0], tb), 1) # (lb[0], tb)
X_ub = np.concatenate((0*tb + ub[0], tb), 1) # (ub[0], tb)
X_u_train = np.vstack([X0, X_lb, X_ub])

fig, ax = newfig(1.3, 1.0)
ax.axis('off')

####### Row 0: h(t,x) ##################
gs0 = gridspec.GridSpec(1, 2)
gs0.update(top=1-0.06, bottom=1-1/3, left=0.15, right=0.85, wspace=0)
ax = plt.subplot(gs0[:, :])

h = ax.imshow(U_pred.T, interpolation='nearest', cmap='YlGnBu',
              extent=[lb[1], ub[1], lb[0], ub[0]],
              origin='lower', aspect='auto')
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(h, cax=cax)


line = np.linspace(x.min(), x.max(), 2)[:,None]
ax.plot(t[25]*np.ones((2,1)), line, 'k--', linewidth = 1)
ax.plot(t[50]*np.ones((2,1)), line, 'k--', linewidth = 1)
ax.plot(t[100]*np.ones((2,1)), line, 'k--', linewidth = 1)
ax.plot(t[150]*np.ones((2,1)), line, 'k--', linewidth = 1)

ax.set_xlabel('$t$')
ax.set_ylabel('$x$')
leg = ax.legend(frameon=False, loc = 'best')
#    plt.setp(leg.get_texts(), color='w')
ax.set_title('$u(t,x)$', fontsize = 10)

####### Row 1: h(t,x) slices ##################
gs1 = gridspec.GridSpec(1, 3)
gs1.update(top=1-1/3, bottom=0, left=0.1, right=0.9, wspace=0.5)

ax = plt.subplot(gs1[0, 0])
ax.plot(x,Exact_u[:,50], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[50,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.set_title('$t = %.2f$' % (t[50]), fontsize = 10)
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])

ax = plt.subplot(gs1[0, 1])
ax.plot(x,Exact_u[:,100], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[100,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])
ax.set_title('$t = %.2f$' % (t[100]), fontsize = 10)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.3), ncol=5, frameon=False)

ax = plt.subplot(gs1[0, 2])
ax.plot(x,Exact_u[:,150], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[150,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])
ax.set_title('$t = %.2f$' % (t[150]), fontsize = 10)

#show u_pred across domain
fig, ax = plt.subplots()

h = plt.imshow(U_pred.T, interpolation='nearest', cmap='rainbow',
            extent=[0.0, 1.0, -1.0, 1.0],
            origin='lower', aspect='auto')
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(h, cax=cax)

plt.legend(frameon=False, loc = 'best')

plt.show()

fig, ax = plt.subplots()

ec = plt.imshow(FU_pred.T, interpolation='nearest', cmap='rainbow',
            extent=[0.0, math.pi/2, -5.0, 5.0],
            origin='lower', aspect='auto')

#ax.add_collection(ec)
ax.autoscale_view()
ax.set_xlabel('$x$')
ax.set_ylabel('$t$')
cbar = plt.colorbar(ec)
cbar.set_label('$overline{f}_u$ prediction')
plt.show()

plt.scatter(t_f, x_f, c = col_weights.numpy(), s = col_weights.numpy()/10)
plt.show()

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>