TensorFlow

1 架构

阅读大神的 《TensorFlow 内核剖析》 对 TensorFlow 的整个代码框架有了一些了解,以下是读书笔记。

Graph (计算图)是 TensorFlow 领域模型的核心。计算图就是节点与边的集合,是一个 DAG (有向无环图)图。

Node(节点)持有零条或多条输入/输出的边,分别使用 inedges, outedges 表示。

Edge(边) 持有前驱节点与后驱节点,从而实现了计算图的连接,也是计算图前向遍历,后向遍历的衔接点。边上的数据以 Tensor 的形 式传递。计算图中存在两类边:

  • 普通边:用于承载 Tensor,常用实线表示;
  • 控制依赖:控制节点的执行顺序,常用虚线表示。

TensorFlow 计算的单位是 OP,它表示了某种抽象计算。通过定义 OP 来构建 DAG 图。OP 拥有 0 个或多个「输入/输出」,及其 0 个 或多个「属性」。其中,输入/输出以Tensor 的形式存在。在系统实现中,OP 的元数据使用 Protobuf 格式的 OpDef 描述,实现前端与 后端的数据交换,及其领域模型的统一。OpDef 定义包括 OP 的名字,输入输出列表,属性列表,优化选项等。其中,属性常常用于描述 输入/输出的类型,大小,默认值,约束,及OP 的其他特性。

计算图的执行过程将按照 DAG 的拓扑排序,依次启动 OP 的运算。其中,如果存在多个入度为 0 的节点,TensorFlow 运行时可以实现 并发,同时执行多个 OP 的运算,提高执行效率。

1.1 架构设计

TensorFlow 遵循良好的分层架构:

  1. front end : 用户接口,负责构造计算图
  2. runtime : 实现计算图的拆分。提供本地运行模式和分布式运行模式,两者共享大部分设计和实现
  3. 计算层 : 基于 Eigen 实现计算的逻辑实现;同时支持各种硬件的并行加速
  4. 通信层 : 基于 gRPC 实现组件间的数据交换。同时支持 RDMA
  5. 设备层 : 支持多种异构计算设备。实际执行计算的载体

1.1.1 前端系统

Client 是前端系统的主要组成部分,它是一个支持多语言的编程环境,且对 Python 和 C++ 的支持比较完善。实现时通过 Swig 完成对 后端 C++ 的调用。基于这些编程接口来构造计算图。

此时,TensorFlow 并未执行任何的图计算,直至与后台计算引擎建立 Session,并以 Session 为桥梁,建立 Client 与 Master 之间的 通道,并将 Protobuf 格式的 GraphDef 序列化后传递给 Master,启动计算图的执行过程。

1.1.2 runtime

  1. master

    在分布式的运行时环境中,Client 执行 Session.run 时,传递整个计算图给后端的 Master。此时,计算图是完整的,常称为 Full Graph。随后,Master 根据 Session.run 传参数列表递给它的 fetches, feeds ,反向遍历 Full Graph ,并按照依赖关系,对其实施 剪枝,最终计算得到最小的依赖子图,常称为 Client Graph。

    接着,Master 负责将 Client Graph 按照任务的名称分裂 ( SplitByTask ) 为多个 Graph Partition;其中,每个 Worker 对应一个 Graph Partition。随后,Master 将 Graph Partition 分别注册到相应的 Worker 上,以便在不同的 Worker 上并发执行这些 Graph Partition。最后,Master 将通知所有 Work 启动相应 Graph Partition 的执行过程。

    其中,Work 之间可能存在数据依赖关系,Master 并不参与两者之间的数据交换,它们两两之间互相通信,独立地完成交换数据,直至完 成所有计算。

  2. worker

    对于每一个任务,TensorFlow 都将启动一个 Worker 实例。Worker 主要负责如下 4 个方面的职责:

    1. 处理来自 Master 的请求;
    2. 对注册的 Graph Partition 按照本地计算设备集实施二次分裂 ( SplitByDevice ) ,并通知各个计算设备并发执行各个 Graph Partition; 在分布式运行时,图分裂经过两级分裂过程。在 Master 上按照任务分裂,而在 Worker 按照设备分裂。因此,得到结果都 称为子图片段,它们仅存在范围,及其大小的差异。
    3. 按照拓扑排序算法在某个计算设备上执行本地子图,并调度 OP 的 Kernel 实现;
    4. 协同任务之间的数据通信。

1.1.3 kernel

Kernel 是 OP 在 某 种 硬 件 设 备 的 特 定 实 现, 它 负 责 执 行 OP 的 具 体 运 算。 目 前, TensorFlow 系统中包含 200 多 个标准的 OP,包括数值计算,多维数组操作,控制流,状态管理等。

一般每一个 OP 根据设备类型都会存在一个优化了的 Kernel 实现。在运行时,运行时根据 OP 的设备约束规范,及其本地设备的类型,为 OP 选择特定的 Kernel 实现,完成该 OP 的计算。

其中,大多数 Kernel 基于 Eigen::Tensor 实现。 Eigen::Tensor 是一个使用 C++ 模板技术,为多核 CPU/GPU 生成高效的并发代码。但 是,TensorFlow 也可以灵活地直接使用 cuDNN, cuNCCL, cuBLAS 实现更高效的 Kernel。

此外,TensorFlow 实现了矢量化技术,在高吞吐量、以数据为中心的应用需求中,及其移动设备中,实现更高效的推理。如果对于复合 OP 的子计算过程很难表示,或执行效率低下, TensorFlow 甚至支持更高效的 Kernel 注册,其扩展性表现非常优越。

1.1.4 session

首先,Client 首次执行 CreateSessionRequest tf.Session.run 时,会将整个图序列化后,通过 gRPC 发送消息,将图传递给 Master。随 后, Master 创建一个 CreateSessionResponse MasterSession 实例,并用全局唯一的 handle 标识,最终通过返回给 Client。

Client 启动迭代执行的过程, 并称每次迭代为一次 Step。 此时, Client 发送 RunStepRequest MasterSession 消息给 Master,消息携 带 handle 标识,用于 Master 索引相应的实例。

Master 收到 RunStepRequest 消息后,将执行图剪枝,分裂,优化等操作。最终按照任务 (Task),将图划分为多个子图片段 (Graph Partition)。随后,Master 向各个 Worker 发送 RegisterGraphRequest 当 Worker 收到消息,将子图片段依次注册到各个 Worker 节点上。 RegisterGraphRequest 消息后,再次实施分裂操作,最终按照设备(Device),将图划分为多个子图片段 (Graph Partition)。

当 Worker 完成子图注册后,通过返回 RegisterGraphReponse 消息,并携带 graphhandle 标识。这是因为 Worker 可以并发注册并运行多 个子图,每个子图使用 graphhandle 唯一标识。

Master 完成子图注册后,将广播所有 Worker 并发执行所有子图。这个过程是通过Master 发送 RunGraphRequest 收到消息消息给 Worker 完成的。

Worker 收到 RunGraphRequest 消息后,Worker 根据 graphhandle 索引相应的子图。最终, Worker 启动本地所有计算设备并发执行所 有子图。其中,每个子图放置在单独的 Executor 中执行, Executor 将按照拓扑排序算法完成子图片段的计算。

如果两个设备之间需要交换数据,则通过插入 Send/Recv 节点完成的。特殊地,如果两个 Worker 之间需要交换数据,则需要涉及跨进程间 的通信。此时,需要通过接收端主动发送里取出对应的 Tensor,并通过 RecvTensorRequest RecvTensorResponse 消息到发送方,再从发送 方的信箱返回。

当计算完成后, Client 向 Master 发送始释放 MasterSession CloseSessionReq 消息。Master 收到消息后,开始释放所持有的所有资 源。

2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
x_train, y_train = load_data()
X = tf.placeholder(......) # 定义数据集占位符
Y = tf.placeholder(......)

W = tf.Variable(......) # 定义变量
b = tf.Variable(......)

init = tf.global_variables_initializer() # 定义初始化变量的方式

Z = tf.add(tf.matmul(W, X), b) # 定义计算图

loss = tf.losses...... # 定义损失函数,Z和Y作为参数

optimizer = tf.train.AdamOptimizer().minimize(loss) # 定义优化器

with tf.Session as sess:
sess.run(init) # 开始执行变量初始化
for _ in range(num_epochs): # 循环训练多轮
_ , cost = sess.run([optimizer, cost], feed_dict={X:x_train, Y:y_train }) # 执行训练

使用 batch normalation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def conv_layer(inpt, filter_shape, stride):
out_channels = filter_shape[3]

filter_ = weight_variable(filter_shape)
conv = tf.nn.conv2d(inpt, filter=filter_, strides=[1, stride, stride, 1], padding="SAME")
mean, var = tf.nn.moments(conv, axes=[0,1,2])#计算一阶矩(均值),以及二阶矩(方差)
beta = tf.Variable(tf.zeros([out_channels]), name="beta")
gamma = weight_variable([out_channels], name="gamma")

batch_norm = tf.nn.batch_norm_with_global_normalization(
conv, mean, var, beta, gamma, 0.001,
scale_after_normalization=True)

out = tf.nn.relu(batch_norm)

return out
0%