为keras基于TensorFlow后端实现多GPU并行计算

点击量:591

在训练深度学习模型的时候,尤其是大规模深度学习模型的训练,我们可能会遇到一些问题,比如觉得计算速度不够快,或者显存不够用,然而,我们却无法为了提升速度或者降低存储空间占用,从而缩小模型的规模或者数据输入输出的尺寸等。这时,我们可以通过多GPU并行计算来解决这一问题。在Keras框架中,虽然本身内置了一些可以多GPU并行计算的API,但是似乎不起作用而且还常常报错。这里有一份基于TensorFlow后端实现的多GPU并行计算的模块,在Keras上亲自测试通过,可以起到通过多卡扩展显存空间和取得加速比的作用。

我们都知道,在python中,可以通过这两行代码实现对GPU的选择性占用:

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

但是这个占用对于tensorflow和keras来说,仅仅是让进程占据了指定的硬件资源,却并不能让框架真正利用这些资源。比如,这样做无法真正让keras使用多GPU计算:

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

这样看起来同时占用了两块显卡,但是其实最多只有一块显卡工作,而另一块则处于围观状态。为了真正使得两块显卡都工作,我们需要使用到如下程序模块:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
感谢原作者的无私奉献
来自:
https://github.com/matterport/Mask_RCNN/blob/master/mrcnn/parallel_model.py
'''

import tensorflow as tf
import keras
import keras.backend as K
import keras.layers as KL

class ParallelModel(keras.models.Model):
    """Subclasses the standard Keras Model and adds multi-GPU support.
    It works by creating a copy of the model on each GPU. Then it slices
    the inputs and sends a slice to each copy of the model, and then
    merges the outputs together and applies the loss on the combined
    outputs.
    """

    def __init__(self, keras_model, gpu_count):
        """Class constructor.
        keras_model: The Keras model to parallelize
        gpu_count: Number of GPUs. Must be > 1
        """
        super(ParallelModel, self).__init__() # Thanks to @greatken999 for fixing bugs
        self.inner_model = keras_model
        self.gpu_count = gpu_count
        merged_outputs = self.make_parallel()
        super(ParallelModel, self).__init__(inputs=self.inner_model.inputs,
                                            outputs=merged_outputs)

    def __getattribute__(self, attrname):
        """Redirect loading and saving methods to the inner model. That's where
        the weights are stored."""
        if 'load' in attrname or 'save' in attrname:
            return getattr(self.inner_model, attrname)
        return super(ParallelModel, self).__getattribute__(attrname)

    def summary(self, *args, **kwargs):
        """Override summary() to display summaries of both, the wrapper
        and inner models."""
        super(ParallelModel, self).summary(*args, **kwargs)
        self.inner_model.summary(*args, **kwargs)

    def make_parallel(self):
        """Creates a new wrapper model that consists of multiple replicas of
        the original model placed on different GPUs.
        """
        # Slice inputs. Slice inputs on the CPU to avoid sending a copy
        # of the full inputs to all GPUs. Saves on bandwidth and memory.
        input_slices = {name: tf.split(x, self.gpu_count)
                        for name, x in zip(self.inner_model.input_names,
                                           self.inner_model.inputs)}

        output_names = self.inner_model.output_names
        outputs_all = []
        for i in range(len(self.inner_model.outputs)):
            outputs_all.append([])

        # Run the model call() on each GPU to place the ops there
        for i in range(self.gpu_count):
            with tf.device('/gpu:%d' % i):
                with tf.name_scope('tower_%d' % i):
                    # Run a slice of inputs through this replica
                    zipped_inputs = zip(self.inner_model.input_names,
                                        self.inner_model.inputs)
                    inputs = [
                        KL.Lambda(lambda s: input_slices[name][i],
                                  output_shape=lambda s: (None,) + s[1:])(tensor)
                        for name, tensor in zipped_inputs]
                    # Create the model replica and get the outputs
                    outputs = self.inner_model(inputs)
                    if not isinstance(outputs, list):
                        outputs = [outputs]
                    # Save the outputs for merging back together later
                    for l, o in enumerate(outputs):
                        outputs_all[l].append(o)

        # Merge outputs on CPU
        with tf.device('/cpu:0'):
            merged = []
            for outputs, name in zip(outputs_all, output_names):
                # If outputs are numbers without dimensions, add a batch dim.
                def add_dim(tensor):
                    """Add a dimension to tensors that don't have any."""
                    if K.int_shape(tensor) == ():
                        return KL.Lambda(lambda t: K.reshape(t, [1, 1]))(tensor)
                    return tensor
                outputs = list(map(add_dim, outputs))

                # Concatenate
                merged.append(KL.Concatenate(axis=0, name=name)(outputs))
        return merged

用法:

将该模块保存成文件“multi_gpu.py”,然后,在训练代码中导入该模块并对keras生成的普通模型类进行一次多GPU的包装,其中,包装步骤必须在生成Model类之后,keras模型的编译步骤之前。

from mutli_gpu import ParallelModel
...
NUM_GPU = 2
...
model = Model(inputs=in, outputs=out)
model = ParallelModel(model, NUM_GPU)
...
model.compile(loss_func, optimizer = adam)

最后,我们启动多GPU版训练程序,通过”nvidia-smi”命令,就可以看到两个GPU都在参数与模型的训练了。经过实际测试,2颗GPU进行计算所取得的加速比大约在1.7上下。

 

应用实例:ASRT语音识别系统(https://github.com/nl8590687/ASRT_SpeechRecognition)

 

版权声明
本博客的文章除特别说明外均为原创,本人版权所有。欢迎转载,转载请注明作者及来源链接,谢谢。
本文地址: https://blog.ailemon.me/2019/07/11/multi-gpu-parallel-compute-for-keras-by-tensorflow-backend/
All articles are under Attribution-NonCommercial-ShareAlike 4.0
打赏 赞(0)
微信
支付宝
微信二维码图片

微信扫描二维码打赏

支付宝二维码图片

支付宝扫描二维码打赏

加入对话

2条评论

电子邮件地址不会被公开。 必填项已用*标注

16 − 8 =

  1. 打扰一下,请问这样和keras中的 keras.utils.multi_gpu_model模块有什么不同?

    1. keras中的多GPU那个我之前想直接用的,但是总是失败,经常有问题,所以我上网搜了一下,用这些代码实现就不会有问题,而且在不同的情况下也方便修改。