本文使用Tensorflow1.14.0搭建超简单的网络进行mnist手写体识别。第一节代码片段讲解,第二节附完整代码。尊重原文----原文连接

代码片段讲解

首先,导入依赖的两个模块,一个是tensorflow,另一个是tensorflow.keras.datasets,我们要的数据集MNIST就是由这个datasets管理下载的。该网址中列出了datasets管理的所有数据集。

1
2
3
4
import tensorflow as tf
tf.enable_eager_execution()
from tensorflow.keras import datasets
import time

导入数据集,用(x,y)存储训练图片和标签,用(val_x,val_y)存储测试图片和标签。

1
(x, y), (val_x, val_y) = datasets.mnist.load_data()

在导入后,需要对数据形式进行初步查看,对于图像识别来说,图片的数量、大小、通道数量和数据范围、类型是必须了解的。以一下程序打印出这些信息,注释为输出结果。datasets导出的数据是Numpy数组,类型为uint8,训练图片共60k张,大小为28*28,为灰度图像,灰度范围0~255;测试图片共10k张。

1
2
3
4
5
6
7
# 数据集信息
print('type_x:', type(x), 'dtype_x:', x.dtype) #<class 'numpy.ndarray'> uint8
print('type_y:', type(y), 'dtype_y:', y.dtype) #<class 'numpy.ndarray'> uint8
print('shape_x:', x.shape, 'shape_y:', y.shape) #(60000, 28, 28) (60000,)
print('shape_val:', val_x.shape, 'shape_val:', val_y.shape) #(10000, 28, 28) (10000,)
print('max_x:', x.max(), 'min_x:', x.min()) #255 0
print('max_y:', y.max(), 'min_y:', y.min()) #9 0

在训练前必须先将数据转为Tensor。用tf.convert_to_tensor(value,dtype)函数可将value转为Tensor,并可指定数据类型(dtype)。将x,val_x转成浮点类型,而训练数据的标签y需要先转为Tensor整型再转化为独热码形式,测试数据val_y的标签转化为Tensor整型,独热码转换可用tf.one_hot(indices,depth,dtype),indices必须是整型,这也就是为什么先转成Tensor整型的原因,depth决定独热码位数,dtype默认是tf.float32。以下代码完成数据类型转换。

1
2
3
4
5
6
7
8
# 需要将数据转成Tensor
x = tf.convert_to_tensor(x, dtype=tf.float32)/255.5-0.5
y = tf.convert_to_tensor(y, dtype=tf.int32)
val_x = tf.convert_to_tensor(val_x, dtype=tf.float32)/255.5-0.5
val_y = tf.convert_to_tensor(val_y, dtype=tf.int64)
# 独热码
y = tf.one_hot(y, depth=10)
print('one_hot_y:', y.shape)

数据集一次性加载到内存计算是不现实的,所以采用批处理将数据集分批喂进网络,在分批前先对数据shuffle一下,以防网络发现顺序规律。

  • 把整个数据集一次喂给网络来更新参数的过程称为批梯度下降;
  • 而每次只喂一张图片,则称为随机梯度下降;
  • 每次将一小批图片喂给网络,称为小批量梯度下降。

简单来说,小批量梯度下降是最合适的,一般Batch设的较大,则达到最大准确率的速度变慢,但更容易收敛;Batch设小了,在一开始,准确率提高得非常快,但是最终收敛可能不太好。

1
2
3
# 生成批处理
test_db = tf.data.Dataset.from_tensor_slices((val_x, val_y)).shuffle(10000).batch(256)
train_db = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10000).batch(256)

train_db是可以直接迭代的,下面进行一次迭代,观察迭代结果,可以知道每一次迭代图片数量就是Batch大小。

1
2
3
4
# 批处理数据信息
train_iter = iter(train_db)
sample = next(train_iter)
print('sample_x_shape:', sample[0].shape, 'sample_y_shape:', sample[1].shape)

下面就可以开始构建全连接网络了。网络节点数为784(input)->256->128->10(output),加上输入输出一共4层,其中输入层是打平后的图片,共28*28=784个像素;输出层由类别数决定,这里手写数字0-9共10类,故输出有10个节点,这些节点表示属于该类的概率。构建网络需要有初始化的参数,可以利用高斯分布进行参数的初始化,即函数tf.random.normal(shape,mean=0.0,stddev=1.0),但是为了避免参数初始化过大,常采用截断型正态分布,即函数tf.random.truncated_normal(shape,mean=0.0,stddev=1.0),该函数将丢弃幅度大于平均值的2个标准偏差的值并重新选择,这里也就是说随机的值范围在-2~2之间。在初始化参数时也要注意参数的shape,例如784->256的参数shape应为(784,256),偏置shape应为(256,),这样还方便之后的矩阵运算。偏置一般都初始化为0。此外,所有参数都必须转为tf.Variable类型,才可以记录下梯度信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 参数初始化
# input(layer0)->layer1: nodes:784->256
theta_1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1)) # 因为后面要记录梯度信息,所以要用Varible
bias_1 = tf.Variable(tf.zeros([256]))
# print('theta_1:', theta_1)

# layer1->layer2: nodes:256->128
theta_2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
bias_2 = tf.Variable(tf.zeros([128]))

# layer2->out(layer3): nodes:128->10
theta_3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
bias_3 = tf.Variable(tf.zeros([10]))

初始化参数后,可以统计一下网络的参数量:784256+256128+128*10+256+128+10=235146。大约20万个参数,相比一些经典卷积网络,全连接网络的参数量还是比较少的。

对train_db进行迭代,套上enumerate()以便获取迭代批次。每一批数据,都要进行前向传播。首先,将shape为[256,28,28]图片打平为[256,784],这个可以借助tf.reshape(tensor,shape),在不改变元素个数的前提下,对维度进行分解或者合并。这样,h_1=x@theta1+bias1就可以得到下一层网络的节点值。h_1的shape为[256,256];同理,h_2的shape为[256,128],h_3的shape为[256,10]。每一层计算之后都应该加上一个激活函数,最常用的就是ReLu,通过激活函数可以增加网络的非线性表达能力,这里使用函数tf.nn.relu(features)。

由于更新参数需要得到各参数的梯度信息,因此前向传播要用with tf.GradientTape() as tape:包裹起来。此外,还得计算代价函数,就是Loss,一般采用差平方的均值来计算,差平方使用tf.math.square(x),均值采用tf.math.reduce_mean(input_tensor,axis=None),如果不指定axis就对所有元素求均值,返回值是标量,而如果指定axis,就仅对该axis做均值,结果的shape中该axis消失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 确定学习率
alpha = tf.constant(1e-3)
# 测试样本总数
total_val = val_y.shape[0]
# 训练样本总数
total_y = y.shape[0]
# 开始时间
start_time = time.time()

for echo in range(500):
# 前向传播
correct_cnt = 0 # 预测对的数量
for batch, (x, y) in enumerate(train_db):
# x:[256,28,28]
x = tf.reshape(x, [-1, 28 * 28]) # 最后一批<256个,用-1可以自动计算
with tf.GradientTape() as tape:
# 前向传播
# x:[256,784] theta_1:[784,256] bias_1:[256,] h_1:[256,256]
h_1 = x @ theta_1 + bias_1
h_1 = tf.nn.relu(h_1)
# h_1:[256,256] theta_2:[256,128] bias_2:[128,] h_2:[256,128]
h_2 = h_1 @ theta_2 + bias_2
h_2 = tf.nn.relu(h_2)
# h_2:[256,128] theta_3:[128,10] bias_2:[10,] out:[256,10]
out = h_2 @ theta_3 + bias_3
# 计算代价函数
# out:[256,10] y:[256,10]
loss = tf.math.square(y - out)
loss = tf.keras.losses.categorical_crossentropy(y, out, from_logits=True)
# loss:[256,10]->scalar
loss = tf.math.reduce_mean(loss)

上一部分对梯度信息进行了记录,我们要更新参数,必须先执行loss对各参数求导,之后根据学习率进行参数更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 获取梯度信息,grads为一个列表,顺序依据给定的参数列表
grads = tape.gradient(loss, [theta_1, bias_1, theta_2, bias_2, theta_3, bias_3])
# 根据给定列表顺序,对参数求导
theta_1.assign_sub(alpha * grads[0]) # 原地更新,类型不变
theta_2.assign_sub(alpha * grads[2])
theta_3.assign_sub(alpha * grads[4])
bias_1.assign_sub(alpha * grads[1])
bias_2.assign_sub(alpha * grads[3])
bias_3.assign_sub(alpha * grads[5])

pred = tf.math.argmax(out, axis=-1)
y_label = tf.math.argmax(y, axis=-1)
acc = tf.math.equal(pred, y_label)
acc = tf.cast(acc, dtype=tf.int32)
correct_cnt += tf.math.reduce_sum(acc)

# 每隔100个batch打印一次loss
# if batch % 100 == 0:
# print('loss of batch %d: %.2f'%(batch, loss))

到此为止,整个训练网络就完成了。为了测试网络的效果,我们需要对测试数据集进行预测,并且计算出准确率。关于测试的前向传播同之前的一样,但测试时并不需要对参数进行更新。网络的输出层有10个类别的概率,我们要取概率最大的作为预测的类别,这可以通过tf.math.argmax(input,axis=None)来实现,该函数可以返回数组中最大数的位置,axis的作用类似与reduce_mean。预测结果的正确与否可用tf.math.equal(x,y)来判别,它返回Bool型列表。由于一批次有256个图片,那么预测结果也有256个,可以用tf.math.reduce_sum(input_tensor,axis=None)进行求和,求和前通过tf.cast(x,dtype)将Bool类型转为整型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 测试数据预测
for (val_x, val_y) in test_db:
val_x = tf.reshape(val_x, [-1, 28 * 28])
val_h_1 = val_x @ theta_1 + bias_1
val_h_1 = tf.nn.relu(val_h_1)
val_h_2 = val_h_1 @ theta_2 + bias_2
val_h_2 = tf.nn.relu(val_h_2)
val_out = val_h_2 @ theta_3 + bias_3

# val_out:(256,10) pred:(256,)
pred = tf.math.argmax(val_out, axis=-1)
# acc:bool (256,)
acc = tf.math.equal(pred, val_y)
acc = tf.cast(acc, dtype=tf.int32)
correct_cnt += tf.math.reduce_sum(acc)

# 测试准确度
test_percent = float(correct_cnt / total_val)
# print('val_acc: %.2f'%(test_percent))
print('time:', int(time.time() - start_time) // 60, ':', int(time.time() - start_time) % 60, 'loss: %.2f'%(loss), 'train_acc: %.2f'%(train_percent), 'val_acc: %.2f'%(test_percent))

自此所有的代码片段都已分析完毕。下一节将展示综合的代码和运行结果。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
@Description:
@Author : zhangyan
@Time : 2022/2/18 上午11:38
"""
import tensorflow as tf
tf.enable_eager_execution()
from tensorflow.keras import datasets
import time

# 导入数据集
(x, y), (val_x, val_y) = datasets.mnist.load_data()
# 数据集信息
print('type_x:', type(x), 'dtype_x:', x.dtype)
print('type_y:', type(y), 'dtype_y:', y.dtype)
print('shape_x:', x.shape, 'shape_y:', y.shape)
print('shape_val:', val_x.shape, 'shape_val:', val_y.shape)
print('max_x:', x.max(), 'min_x:', x.min())
print('max_y:', y.max(), 'min_y:', y.min())

# 需要将数据转成Tensor
x = tf.convert_to_tensor(x, dtype=tf.float32)/255.5-0.5
y = tf.convert_to_tensor(y, dtype=tf.int32)
val_x = tf.convert_to_tensor(val_x, dtype=tf.float32)/255.5-0.5
val_y = tf.convert_to_tensor(val_y, dtype=tf.int64)
# 独热码
y = tf.one_hot(y, depth=10)
print('one_hot_y:', y.shape)

# 生成批处理
test_db = tf.data.Dataset.from_tensor_slices((val_x, val_y)).shuffle(10000).batch(256)
train_db = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10000).batch(256)
# 批处理数据信息
train_iter = iter(train_db)
sample = next(train_iter)
print('sample_x_shape:', sample[0].shape, 'sample_y_shape:', sample[1].shape)

# 参数初始化
# input(layer0)->layer1: nodes:784->256
theta_1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1)) # 因为后面要记录梯度信息,所以要用Varible
bias_1 = tf.Variable(tf.zeros([256]))
# print('type of theta_1', theta_1)

# layer1->layer2: nodes:256->128
theta_2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
bias_2 = tf.Variable(tf.zeros([128]))

# layer2->out(layer3): nodes:128->10
theta_3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
bias_3 = tf.Variable(tf.zeros([10]))

# 确定学习率
alpha = tf.constant(1e-3)
# 测试样本总数
total_val = val_y.shape[0]
# 训练样本总数
total_y = y.shape[0]
# 开始时间
start_time = time.time()

for echo in range(500):
# 前向传播
correct_cnt = 0 # 预测对的数量
for batch, (x, y) in enumerate(train_db):
# x:[256,28,28]
x = tf.reshape(x, [-1, 28 * 28]) # 最后一批<256个,用-1可以自动计算
with tf.GradientTape() as tape:
# 前向传播
# x:[256,784] theta_1:[784,256] bias_1:[256,] h_1:[256,256]
h_1 = x @ theta_1 + bias_1
h_1 = tf.nn.relu(h_1)
# h_1:[256,256] theta_2:[256,128] bias_2:[128,] h_2:[256,128]
h_2 = h_1 @ theta_2 + bias_2
h_2 = tf.nn.relu(h_2)
# h_2:[256,128] theta_3:[128,10] bias_2:[10,] out:[256,10]
out = h_2 @ theta_3 + bias_3
# 计算代价函数
# out:[256,10] y:[256,10]
loss = tf.math.square(y - out)
loss = tf.keras.losses.categorical_crossentropy(y, out, from_logits=True)
# loss:[256,10]->scalar
loss = tf.math.reduce_mean(loss)

# 获取梯度信息,grads为一个列表,顺序依据给定的参数列表
grads = tape.gradient(loss, [theta_1, bias_1, theta_2, bias_2, theta_3, bias_3])
# 根据给定列表顺序,对参数求导
theta_1.assign_sub(alpha * grads[0]) # 原地更新,类型不变
theta_2.assign_sub(alpha * grads[2])
theta_3.assign_sub(alpha * grads[4])
bias_1.assign_sub(alpha * grads[1])
bias_2.assign_sub(alpha * grads[3])
bias_3.assign_sub(alpha * grads[5])

pred = tf.math.argmax(out, axis=-1)
y_label = tf.math.argmax(y, axis=-1)
acc = tf.math.equal(pred, y_label)
acc = tf.cast(acc, dtype=tf.int32)
correct_cnt += tf.math.reduce_sum(acc)

# 每隔100个batch打印一次loss
# if batch % 100 == 0:
# print('loss of batch %d: %.2f'%(batch, loss))

# 训练的准确度
train_percent = float(correct_cnt / total_y)
# print('train_acc: %.2f'%(train_percent))

correct_cnt = 0 # 预测对的数量

# 测试数据预测
for (val_x, val_y) in test_db:
val_x = tf.reshape(val_x, [-1, 28 * 28])
val_h_1 = val_x @ theta_1 + bias_1
val_h_1 = tf.nn.relu(val_h_1)
val_h_2 = val_h_1 @ theta_2 + bias_2
val_h_2 = tf.nn.relu(val_h_2)
val_out = val_h_2 @ theta_3 + bias_3

# val_out:(256,10) pred:(256,)
pred = tf.math.argmax(val_out, axis=-1)
# acc:bool (256,)
acc = tf.math.equal(pred, val_y)
acc = tf.cast(acc, dtype=tf.int32)
correct_cnt += tf.math.reduce_sum(acc)

# 测试准确度
test_percent = float(correct_cnt / total_val)
# print('val_acc: %.2f'%(test_percent))
print('time:', int(time.time() - start_time) // 60, ':', int(time.time() - start_time) % 60, 'loss: %.2f'%(loss), 'train_acc: %.2f'%(train_percent), 'val_acc: %.2f'%(test_percent))

小结

本文采用2070训练,10min左右训练结束,最终达到0.95测试集准确度。作者手动加了一层网络,最终达到0.97的准确度。大家可以各自尝试手写。