技术标签: DL框架实战与源码理解
pytorch
官方提供的数据并行类为:
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
当给定model
时,主要实现功能是将input
数据依据batch
的这个维度,将数据划分到指定的设备上。其他的对象(objects
)复制到每个设备上。在前向传播的过程中,module
被复制到每个设备上,每个复制的副本处理一部分输入数据。在反向传播过程中,每个副本module
的梯度被汇聚到原始的module
上计算(一般为第0
块GPU
)。
并且这里要注意的一点是,这里官方推荐是用
DistributedDataParallel
,因为DistributedDataParallel
使用的是多进程方式,而DataParallel
使用的是多线程的方式。如果使用的是DistributedDataParallel
,你需要使用torch.distributed.launch
去launch程序,参考Distributed Communication Package - Torch.Distributed。
batch size
的大小一定要大于GPU
的数量,我在实践过程中batch size
的大小一般设置为GPU
块数的倍数。在数据分配到不同的机器上的时候,传入module
的数据同样都可以传入DataParallel
(并行之后的module
类型)中,但是tensor
默认按照dim=0
分配到不同的机器上,tuple
, list
,dict
类型的数据被浅拷贝到不同的GPU
上,其它类型的数据将会被分配到不同的进程中。
在调用DataParallel
之前,module
必须要具有他自己的参数(能获取到模型的参数),还需要在指定的GPU
上具有buffer
(不然会报内存出错)。
在前向传播的过程中,
module
被复制到每个设备上,因此在前线传播过程中的任何更新都会丢失。举例来说,如果module
有一个counter
属性,在每次前线传播过程中都会加1
,它将会保留在初始值状态,因为更新在副本上,但是副本前线传播完就被销毁了。然而在DataParallel
中,device[0]
上的副本将其参数和内存数据与并行的module
共享,因此在device[0]
上更新数据将会被记录。
返回的结果是来自各个
device
上的数据的汇总。默认是dim 0
维度上的汇总。因此在处理RNN
时序数据时就需要注意这一点。My recurrent network doesn’t work with data parallelism
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
torch.nn.DataParallel()
函数的参数主要有module
,device_ids
,output_device
这三个。
module
为需要并行的module
。device_ids
为一个list
,默认为所有可操作的devices
。output_device
为需要输出汇总的指定GPU
,默认为device_ids[0]
号。简单的举例为:
>>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
>>> output = net(input_var) # input_var can be on any device, including CPU
data_parallel.py
的源码地址为:https://github.com/pytorch/pytorch/blob/master/torch/nn/parallel/data_parallel.py
源码注释
import operator
import torch
import warnings
from itertools import chain
from ..modules import Module
from .scatter_gather import scatter_kwargs, gather
from .replicate import replicate
from .parallel_apply import parallel_apply
from torch._utils import (
_get_all_device_indices,
_get_available_device_type,
_get_device_index,
_get_devices_properties
)
def _check_balance(device_ids):
imbalance_warn = """
There is an imbalance between your GPUs. You may want to exclude GPU {} which
has less than 75% of the memory or cores of GPU {}. You can do so by setting
the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES
environment variable."""
device_ids = [_get_device_index(x, True) for x in device_ids]
dev_props = _get_devices_properties(device_ids)
def warn_imbalance(get_prop):
values = [get_prop(props) for props in dev_props]
min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1))
max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1))
if min_val / max_val < 0.75:
warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos]))
return True
return False
if warn_imbalance(lambda props: props.total_memory):
return
if warn_imbalance(lambda props: props.multi_processor_count):
return
class DataParallel(Module):
# TODO: update notes/cuda.rst when this class handles 8+ GPUs well
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 通过调用torch.cuda.is_available()判断是返回“cuda”还是None。
device_type = _get_available_device_type()
if device_type is None: # 检查是否有GPU
# 如果没有GPU的话,module就不能够并行,直接赋值,设备id置空
self.module = module
self.device_ids = []
return
if device_ids is None: # 如果没有指定GPU,则默认使用所有可用的GPU
# 获取所有可用的设备ID,为一个list。
device_ids = _get_all_device_indices()
if output_device is None: # 判断输出设备是否指定
output_device = device_ids[0] # 默认为指定设备的第一个
self.dim = dim
self.module = module # self.module就是传入的module。
self.device_ids = [_get_device_index(x, True) for x in device_ids]
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
_check_balance(self.device_ids)
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
# 如果没有可用的GPU则使用原来的module来计算
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 这里应该是判断模型的参数和buffer都要有。
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# 用scatter函数将input平均分配到每个GPU上
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs = ((),)
kwargs = ({
},)
if len(self.device_ids) == 1: # 只有一个给定的GPU的话,就直接调用未并行的module,否者进入下一步
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # replicate函数主要讲模型复制到多个GPU上
outputs = self.parallel_apply(replicas, inputs, kwargs) # 并行地在多个GPU上计算模型。
return self.gather(outputs, self.output_device) # 将数据聚合到一起,传送到output_device上,默认也是dim 0维度聚合。
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
scatter
函数:def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
return Scatter.apply(target_gpus, None, dim, obj)
if isinstance(obj, tuple) and len(obj) > 0:
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
return list(map(list, zip(*map(scatter_map, obj))))
if isinstance(obj, dict) and len(obj) > 0:
return list(map(type(obj), zip(*map(scatter_map, obj.items()))))
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map = None
return res
在前向传播中,数据需要通过scatter
函数分配到每个GPU
上,代码在scatter_gather.py
文件下,如果输入的类型不是tensor
的话,会依据数据类型处理一下变成tensor
,再递归调用scatter_map
,最后调用Scatter.apply
方法将数据依据给定的GPU
给划分好返回。
replicate
函数: replicate
函数需要将模型给复制到每个GPU
上。如果你定义的模型是ScriptModule
的话,也就是在编写自己model
的时候不是继承的nn.Module
,而是继承的nn.ScriptModule
,就不能复制,会报错。
这个函数主要就是将模型参数、buffer
等需要共享的信息,复制到每个GPU
上,感兴趣的自己看吧。
def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
r"""Evaluates module(input) in parallel across the GPUs given in device_ids.
This is the functional version of the DataParallel module.
Args:
module (Module): the module to evaluate in parallel
inputs (Tensor): inputs to the module
device_ids (list of int or torch.device): GPU ids on which to replicate module
output_device (list of int or torch.device): GPU location of the output Use -1 to indicate the CPU.
(default: device_ids[0])
Returns:
a Tensor containing the result of module(input) located on
output_device
"""
if not isinstance(inputs, tuple):
inputs = (inputs,) if inputs is not None else ()
device_type = _get_available_device_type()
if device_ids is None:
device_ids = _get_all_device_indices()
if output_device is None:
output_device = device_ids[0]
device_ids = [_get_device_index(x, True) for x in device_ids]
output_device = _get_device_index(output_device, True)
src_device_obj = torch.device(device_type, device_ids[0])
for t in chain(module.parameters(), module.buffers()):
if t.device != src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(src_device_obj, t.device))
inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
# for module without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not module_kwargs:
inputs = ((),)
module_kwargs = ({
},)
if len(device_ids) == 1:
return module(*inputs[0], **module_kwargs[0])
used_device_ids = device_ids[:len(inputs)]
replicas = replicate(module, used_device_ids)
outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)
return gather(outputs, output_device, dim)
并行的模型也有了,数据也有了,之后就是利用并行的模型和并行的数据来做计算了。
parallel_apply
函数:def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
# 判断模型数和输入数据数是否相等
assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({
},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {
}
grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device), autocast(enabled=autocast_enabled):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception:
with lock:
results[i] = ExceptionWrapper(
where="in replica {} on device {}".format(i, device))
if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs = []
for i in range(len(inputs)):
output = results[i]
if isinstance(output, ExceptionWrapper):
output.reraise()
outputs.append(output)
return outputs
先判断一下数据的长度是否符合要求。之后利用多线程来处理数据。最后将所有的数据gather
在一起,默认是从第0
个维度gather
在一起。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return self.len
class Model(nn.Module):
def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
self.sigmoid = nn.Sigmoid()
# self.modules = [self.fc, self.sigmoid]
def forward(self, input):
return self.sigmoid(self.fc(input))
if __name__ == '__main__':
# Parameters and DataLoaders
input_size = 5
output_size = 1
batch_size = 30
data_size = 100
rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
batch_size=batch_size, shuffle=True)
model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
model = nn.DataParallel(model).cuda()
optimizer = optim.SGD(params=model.parameters(), lr=1e-3)
cls_criterion = nn.BCELoss()
for data in rand_loader:
targets = torch.empty(data.size(0)).random_(2).view(-1, 1)
if torch.cuda.is_available():
input = Variable(data.cuda())
with torch.no_grad():
targets = Variable(targets.cuda())
else:
input = Variable(data)
with torch.no_grad():
targets = Variable(targets)
output = model(input)
optimizer.zero_grad()
loss = cls_criterion(output, targets)
loss.backward()
optimizer.step()
文章浏览阅读3.6w次。想象一下,有一堆杂乱无章的数据,如何快速找到需要的信息呢?这不仅是一种技术挑战,更是一种艺术。在这篇文章中,将深入探讨Python中的数据选取和过滤技术。这就像是在一片星空中寻找那颗闪耀的星星,而Python提供的工具就像是望远镜,帮助聚焦并观察那些重要的数据点。从基于标签的索引到复杂的布尔型索引,每一种方法都像是一种魔法,让数据变得触手可及。文章将通过生动的例子,展示如何在数据的海洋中巧妙地航行,找到目标数据。无论是数据分析初学者,还是资深的数据科学家,都能在这里找到有价值的信息。_python pandas 返回index
文章浏览阅读908次。Flink作为一个大数据分布式流处理框架,必须要考虑系统的容错性,主要就是发生故障之后的恢复。Flink容错机制的核心就是检查点,它通过巧妙的分布式快照算法保证了故障恢复后的一致性,并且尽可能地降低对处理性能的影响。本章中我们详细介绍了Flink检查点的原理、算法和配置,并且结合一致性理论与Flink-Kafka的实际互连系统,阐述了如何用Flink实现流处理应用的端到端exactly-once状态一致性。这既是Flink底层原理的深入,也与之前的状态管理、水位线机制有联系和相通之处;_为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过source任务向外
文章浏览阅读4.6k次,点赞7次,收藏80次。成绩管理系统需求说明书1 引言1.1目的首先给出了整个系统的整体网络结构和功能结构的概貌,试图从总体架构上给出整个系统的轮廓,然后又对功能需求、性能需求和其它非功能性需求进行了详细的描述。其中对功能需求的描述采用了UML的用例模型方式,主要描述了每一用例的基本事件流,若有备选事件流则描述,否则则省略。而且还给出了非常直观的用例图。这些文字和图形都为了本文档能详细准确地描述用户的需求,..._学生成绩管理系统用例描述
文章浏览阅读9.4k次,点赞3次,收藏2次。现象:❯ git submodule updatefatal: not a git repository: D:/td/code/qemu/meson/../.git/modules/mesonFailed to clone 'meson'. Retry scheduledBUG: submodule considered for cloning, doesn't need cloning any more?fatal: could not get a repository handle for_fatal: could not get a repository handle for submodule
文章浏览阅读1.9w次,点赞13次,收藏52次。转自 https://blog.csdn.net/qq_23534759/article/details/804575572018年05月25日 22:18:00 curious_girl 阅读数:5947版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_23534759/article/details/80457557TSNE()参数解..._sklearn的t-sne参数
文章浏览阅读3.1k次。欢迎使用Markdown编辑器你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章,了解一下Markdown的基本语法知识。新的改变我们对Markdown编辑器进行了一些功能拓展与语法支持,除了标准的Markdown编辑器功能,我们增加了如下几点新功能,帮助你用它写博客:全新的界面设计 ,将会带来全新的写作体..._使用adb 提示报错000017
文章浏览阅读2.9k次。启动SVN服务器有两种方法,一个是命令行方式,一个是注册Windows服务。[3]命令行方式的缺陷是:只要运行服务器端程序的命令行窗口一关闭,服务就停止了,很不方便,而且每次开机都需要手动启动。此时查看当前系统中的服务,可以看到我们刚刚创建的服务,但此时它还没有启动,如果创建失败,需检查sc命令是否正确。[1]将SVN服务端程序注册为Windows服务,就可以让SVN服务随系统一起启动,克服了命令行方式的不足。如果启动失败,那很有可能是binpath中的内容有错误,此时只能将已经创建的服务删除,重新创建。_win11安装svn
文章浏览阅读348次。基本概念同步通信1、异步通信2同步和异步关注的是消息通信机制同步通信所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了;换句话说,就是由调用者主动等待这个调用的结果;异步通信调用在发出之后,这个调用就直接返回了,所以没有返回结果;换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果;而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用.阻塞 vs 非阻塞阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)._复用io模型解决了一个线程可以监控多个fd的问题
文章浏览阅读1.1w次,点赞33次,收藏141次。第七章、网络安全本章的习题计算机网络都面临哪几种威胁?主动攻击和被动攻击的区别是什么?对于计算机网 络的安全措施都有哪些? 计算机网络面临以下的四种威胁:截获(interception);中断(interruption);篡改 (modification);伪造(fabrication)。网络安全的威胁可以分为两大类:即被动攻击和主动攻击。 主动攻击是指攻击者对某个连接中通过的 PDU 进行各种处理。如有选择地更改、删除、延迟这些PDU。甚至还可将合成的或伪造的PDU 送入到一个连接中去。主动_ip过滤技术在谢希仁教材哪一章
文章浏览阅读3.6k次,点赞4次,收藏19次。项目中如果用到了缓存,就会涉及到数据库与缓存的双写,由于这两个操作不是原子性的,在并发的场景下,容易产生数据库与缓存不一致的情况。_缓存和数据库一致性怎么解决
文章浏览阅读2.1w次,点赞12次,收藏21次。文章目录1. 报错现象2. 排查过程2.1 Connection reset by peer 的原因2.2 syscall:read(..) failed: Connection reset by peer 错误3. 最终原因1. 报错现象组内一个服务从 spring-webmvc 框架切换到 spring-webflux,在线上跑了一段时间后偶现如下错误 log 。log 中 L:/10.0.168.212:8805 代表了本地服务所在的服务器 IP 和 端口,R:/10.0.168.38:473_readaddress(..) failed: connection reset by peer
文章浏览阅读582次。day5题目:剑指 Offer 04. 二维数组中的查找、11. 旋转数组的最小数字、50. 第一个只出现一次的字符,知识点:数组、二分、哈希,难度为中等、简单、简单