源码解读系列将和大家一起来读megatron的pretrain部分代码。
在源码解读第一篇中,我们讲解了如何做「分布式环境初始化」,即按照dp/tp/pp对进程进行分组,并为每个进程指定gpu。在这一章中,我们将一起读「模型并行部分」:如何切分模型,并搬入分布式环境定义好的dp/tp/pp组中。
「本文将提供:」
详细的图解。画图说明代码的设计架构,讲清代码想做一件什么事。 详细的代码注释。在图解的基础上,提取核心代码部分,并附上注释。 「如何利用本文提高源码阅读效率:」
先看一~三部分。了解模型并行的设计思想、整体框架及入口函数。 打开megatron源码,找到入口函数,开始阅读。 阅读中的每一块细节,可参考四~八部分。 「阅读本文前置知识:」
图解大模型训练之:张量模型并行,megatron-lm 图解大模型训练之:megatron源码解读1,分布式环境初始化 「本文目录:」
一、模型概述
二、模型切割在做一件什么事
2.1 模型切割设计思想 2.2 随机种子 三、模型并行框架
3.1 模型并行入口函数 3.2 定义并搬运模型 3.3 分布式模型:codegeex 四、megatronmodule
五、emebdding
六、vocabparallelemebdding
七、parallelselfattention:分布式block的一般套路
7.1 列切割:columnparallellinear 7.2 行切割:rowparallellinear 7.3 parallelselfattention 八、crossentropy
8.1 计算logit 8.2 计算交叉熵 九、筋疲力尽的总结
十、参考(本文相关源码与论文)
一、模型概述 前文说过,用megatron做分布式训练的开源大模型有很多,我们选用的是thudm开源的codegeex(代码生成式大模型,类比于openai codex)。选用它的原因是“完全开源”与“清晰的模型架构和预训练配置图”,能帮助我们高效阅读源码。我们再来回顾下这两张图。
「模型架构」
「预训练配置」
由图可知,codegeex在预训练中采用的是8头tp(同一个node内的8张卡做tp,8张卡组成一个完整的模型),192头dp(192个node间做dp),一共1536块gpu进行训练。
「【阅读提醒】:如果你对gpt模型比较熟悉,则不需要花时间细看codegeex架构图也能无障碍阅读本文。架构图只是在涉及模型细节时,可以对照着看。」
二、模型切割在做一件什么事 2.1 模型切割设计思想 回顾一下,在初始化分布式环境中,我们根据dp/tp/pp组设置并划分了进程,确定了模型的切割方法,如下图:(注意:这并不是codegeex的划分框架,而是一个更广义的例子,细节可阅读上篇讲解)
接下来,我们就可以根据这个框架来切割模型了。pytorch默认将模型(nn.module)定义在cpu上,因此,我们在cpu上定义并初始化模型,然后将其搬运到当前进程所对应的gpu上,整个过程如下图:首先,我们是面向进程编程的,也就是整份脚本处理的是发生在1个进程上的事情。这样做的好处是,我们只需要维护1份脚本,然后将其发去不同机器的各张卡上执行,就能实现全局的并行。
但是,1个进程处理的是模型的不同部分,比如gpt模型,它的pre层涉及到embedding计算,post层涉及到softmax和loss的计算,这样每个进程上处理的模型是不一样的,这时怎么办呢?别忘了,我们能够取到进程id(全局或dp/tp/pp组内的),这样我们就能通过进程id,写if...else...来解决模型差异化问题了。
明确了这个思想,现在我们可以开始写代码了,我们有两种方式对模型进行切割:
「方案一:」先定义出完整的模型,并对模型参数做初始化,然后根据进程id取出相应子模型,搬运到gpu上 「方案二:」直接根据进程id,设计好当前子模型,做参数初始化,搬运到gpu上 这两者的核心差别,在于“随机种子”的设定。
2.2 随机种子 在分布式训练中,「随机种子是非常重要的,它关系到模型是否能够复现」。例如我们采取activation checkpoint的技术来节省显存时,在backward过程中我们需要重算forward得到activation,这时候就需要我们完整复现之前forward的过程,各类参数的初始化结果也要和之前完全一致。
我们来看几个例子:
例1: word embedding we1和we2间需要采用不同的随机种子。因为若采用相同的随机种子,则we1和we2的结果完全一样,这不等价于先随机初始化we,再将它进行切割。
例2: dropout 左侧方框中的2个dropout,在初始化时需要用不同的随机种子。因为这样才等价于对完整的dropout做初始化,然后再切割。右侧方框中的dropout,需要用相同的随机种子(虽然右边只画了1个dropout,但其实是2个dropout,每块gpu上各一个,因为此时两块gpu上的输出已经allreduce,是完全一致的。做完allreduce后,两块gpu继续独立计算,因此实际上有两个dropout)。
关于随机种子设定的一般结论 从例子中,我们可以得出一个结论:「一般在tp/pp组内,设定不同的随机种子。而在dp组内,设定相同的随机种子。」 这只是一个一般结论,我们可以根据实际情况去调整。
最后,回到模型切割上,方案1(先做整体初始化再切割)在代码里被称为“cpu上的初始化”(_initialize_affine_weight_cpu),方案2(直接对局部初始化)被称为“在gpu上的初始化”(_initialize_affine_weight_gpu)。我们会在切割部分的代码里经常看见它们。
三、模型并行框架 现在,我们可以来看具体的代码了
3.1 模型并行入口函数 模型并行部分的代码入口依然在megatron/training.py的pretrain函数下,代码如下:
def pretrain( train_valid_test_dataset_provider, model_provider, forward_step_func, valid_forward_step_func=none, extra_args_provider=none, args_defaults={},): # 1.初始化分布式环境(源码解读1内容) initialize_megatron( extra_args_provider=extra_args_provider, args_defaults=args_defaults ) ... # 2、模型并行:定义模型架构,并切割模型(本文重点) model, optimizer, lr_scheduler = setup_model_and_optimizer(model_provider) ... # 3、构造train/val/test数据集(下一篇将讲述) ... ( train_data_iterator, valid_data_iterator, test_data_iterator, ) = build_train_valid_test_data_iterators(train_valid_test_dataset_provider) ... # 4、训练(下下一篇将讲述) iteration = train( forward_step_func, valid_forward_step_func, model, optimizer, lr_scheduler, train_data_iterator, valid_data_iterator, ) ... 由代码可知,setup_model_and_optimizer是整个模型并行的入口函数,如下图,它主要由”「定义模型架构并切割模型」“,“「设置optimizer」”和“「设置学习率」”三部分组成。我们关注的重点在第一部分上(get_model)。
3.2 定义并搬运模型 get_model的内容可简化成下图:
get_model函数主要做了两件事:
在cpu上定义模型。pytorch默认在cpu上定义模型(nn.module)。model_provider是一个函数,调用它即可返回cpu版的模型,也就是一个codegeex类,这个将是下文要介绍的重点。
把模型从cpu搬运至gpu上。这里有两种方法可供选择:
「显式搬运。」即手动将模型搬运到当前进程所对应的gpu上 「权重精度设定。」由zero的思想可知,在模型训练中,把权重精度从fp32降至fp16,是一种节省显存的好办法。如果确定使用这种优化办法,将模型搬运到gpu上后,我们需要修改精度。 「初始化dp组」。这里指的是定义dp组间forward、backward和梯度计算与通讯等方法。在megatron中,tp和pp组的这些方法是人为定义的(在定义cpu模型时已设置好,我们将在下文讲codegeex细节时看到),而dp组则是可以用现成的(torch的distributeddataparallel)。在具体使用时,我们可以:(1)直接调用distributeddataparallel。或(2)在distributeddataparallel这个类的基础上做一些改进,例如增加对碎片化内存的管理,对计算梯度时的精度控制等。 「方案一:借助deepspeed进行管理」。在源码解读1中我们提过,秉持着万物皆可wrap的原则,按照deepspeed官网教程,只需要在megatron的某些文件中插入相应代码,就可以让deepspeed来管理模型的分布式、dp组间的显存优化等,这里同理。 「方案二:手动搬运管理。」这里需要我们以下事情: get_model函数的核心代码如下(一切尽在注释中):
def get_model(model_provider_func): build the model. args = get_args() # 1、定义并构建cpu版模型 if ( # 1.1、当分布式进行框架采用virtual pipeline (是nvdia后续提出的对megatron的优化方法,可先忽略不看) mpu.get_pipeline_model_parallel_world_size() > 1 and args.virtual_pipeline_model_parallel_size is not none ): model = [] for i in range(args.virtual_pipeline_model_parallel_size): mpu.set_virtual_pipeline_model_parallel_rank(i) # set pre_process and post_process only after virtual rank is set. pre_process = mpu.is_pipeline_first_stage() post_process = mpu.is_pipeline_last_stage() this_model = model_provider_func( pre_process=pre_process, post_process=post_process ) model.append(this_model) else: # 1.2 其余情况 # 判断当前进程是否是pp组的第一个进程(例如第一部分图例中pp组的g0) pre_process = mpu.is_pipeline_first_stage() # 判断当前进程是否是pp组的最后一个进程(例如第一部分图例中pp组的g12) post_process = mpu.is_pipeline_last_stage() # 构建cpu版codegeex模型 model = model_provider_func(pre_process=pre_process, post_process=post_process) ... # 2、将模型从cpu搬运到gpu上 # 2.1 如果采用megatron-deepspeed的方式,则直接返回模型,后面的搬运,数据并行等工作将由deepspeed来完成 # ref:https://www.deepspeed.ai/tutorials/megatron/ if args.deepspeed: return model # 将当前进程所维护的模型,从cpu搬运到gpu上(gpu即为在初始化时为当前进程分配的那块gpu) print(f > moving model to gpu ..., flush=true) for model_module in model: model_module.cuda(torch.cuda.current_device()) print(f > moving to gpu done, flush=true) # fp16转换(pytorch默认模型参数精度为fp32,依需决定计算过程中是否要转成fp16,节省显存) if args.fp16 or args.bf16: print(f > converting model to fp16 ..., flush=true) model = [float16module(model_module, args) for model_module in model] print(f > converting to fp16 done, flush=true) # 采用pytorch定义的distributeddataparallel管理数据并行 if args.ddp_impl == torch: i = torch.cuda.current_device() model = [ torchddp( model_module, device_ids=[i], output_device=i, process_group=mpu.get_data_parallel_group(), # 数据并行的组 ) for model_module in model ] return model # 采用自定义的distributeddataparallel管理数据并行 # 即在pytorch的distributeddataparallel的基础上,自己再定义内存管理、梯度精度等计算方式,更有效利用显存 if args.ddp_impl == local: # 自定义的数据并行类在megatron/model/distributed.py下 print(f > creating ddp model ..., flush=true) model = [ localddp( model_module, args.accumulate_allreduce_grads_in_fp32, args.use_contiguous_buffers_in_ddp, ) for model_module in model ] print(f > creating ddp model done, flush=true) return model raise notimplementederror( unknown ddp implementation specified: {}. exiting..format(args.ddp_impl) ) 特别说明的是,前文提过模型的首尾两层和中间层的架构可能不一样,因此我们通过pre_process 和post_process来做区分。(当然你也能选择用进程序id,只是首尾两层经常被q到,所以这里单独明确了下)。对codegeex来说,由它预训练配置可知,它的pp并行度为1,也就是1块gpu上涵盖了模型的第一层至最后一层,所以pre_process和post_process实际上没有用到。感兴趣的朋友可以阅读nvidia megatron源码下关于bert、gpt2的预训练代码,具体了解pre_process和post_process在定义模型时起的作用。
3.3 分布式模型:codegeex 现在,我们来看最核心的分布式模型:codegeex类。
前文说过,1个脚本处理的是1个进程上发生的事情,而1个进程对应的是模型的一部分。单进程的架构如下:
图中每个方框都表示源码里定义的一个nn.module 类(除了最上的方框外)具体定义为:
codegeex: 定义一块gpu上的模型。它由transformerlanguagemodel 和_vocabparallelcrossentropy这两个核心类组成。 transformerlanguagemodel:定义每块gpu上输入层embedding和中间block层的结构 embedding: 定义每块gpu上输入层embedding结构及相关计算,输出结果已allreduce(tp组间) paralleltransformer:定义每块gpu上所有中间blocks的结构及相关计算,输出结果已allreduce(tp组间) paralleltransformerlayer: 定义每块gpu上单个block的结构及相关计算,输出结果已allreduce(tp组间) parallelselfattention: 定义每块gpu上单个block中,attention的结构及相关计算,输出结果已allreduce(tp组间) parallelmlp: 定义每块gpu上单个block中,mlp层的结构及相关计算,输出结果已allreduce(tp组间)。 _vocabparallelcrossentropy: torch.autograd.function,定义每块gpu上,输出层embedding、softmax和loss等结构及相关计算。 「为什么需要对输出做allreduce?」回顾megtron理论部分的讲解,在纵向切割模型时,megatron是在输入x完整的情况下,设计模型切割的方式的。因此,对于模型的每一层输出,我们都要在tp组间做allreduce,来保证下一层拿到的输入也是完整的。类名字中的parallel,也是指在tp组中做并行,如下图所示:
到这一步,我们终于把模型切割部分的整体流程讲完了。「虽然我们是以codegeex为例,但这个流程图可以看作是通用的。」不同模型间只有模型具体结构、dp/tp/pp组设置这些方面的差别,整个并行框架是通用的。下面,我们来探究图中所绘的各个类的细节。
四、megatronmodule 上面所绘制的几类,并不是直接继承自nn.module ,而是皆继承于自定义的class megatronmodule(torch.nn.module)。我们说过,gpt类模型,输入和输出层共用一个word embedding。因此,这个类的主要作用,就是令pp组的第一个进程和最后一个进程满足这个条件(不过我不懂为什么要把这个限制放在一个大母类中去做,设计上感觉有点奇怪)。megatronmodule类的整体架构如下:
特别说明,「initialize_word_embedding 并不是某一具体的初始化we方法,它只是起到如图所说的强制作用。」
megatronmodule的代码如下(一切尽在注释中):
class megatronmodule(torch.nn.module): megatron specific extensions of torch module with support for pipelining. def __init__(self, share_word_embeddings=true): super(megatronmodule, self).__init__() # input和output是否要共享一套we self.share_word_embeddings = share_word_embeddings def state_dict_for_save_checkpoint( self, destination=none, prefix=, keep_vars=false ): use this function to override the state dict for saving checkpoints. # 模型训练中,及时将参数保存到指定位置(设置checkpoint), # 这样在训练出问题时,可以从checkpoint点重新load参数,继续训练 return self.state_dict(destination, prefix, keep_vars) def word_embeddings_weight(self): 获取word_embedding if mpu.is_pipeline_first_stage(ignore_virtual=true): return self.language_model.embedding.word_embeddings.weight if mpu.is_pipeline_last_stage(ignore_virtual=true): if not self.share_word_embeddings: raise exception( # 强制要求共享一套embedding word_embeddings_weight() called for last stage, but share_word_embeddings is false ) return self.word_embeddings.weight # 参见initialize_word_embeddings中we的定义 raise exception( # 如果当前进程是pp组的中间进程,则其上未维护we,因此当然获取不到 word_embeddings_weight() should be called for first and last stage only ) def initialize_word_embeddings(self, init_method_normal): 强制pp组最后一个进程初始化we时,直接使用pp组第一个进程的we args = get_args() if not self.share_word_embeddings: # 强制share embeddingg raise exception( initialize_word_embeddings() was called but share_word_embeddings is false ) # pp组并行度为1时,第一层和最后一层都在一块gpu上,天然共享we,无需做强制 if args.pipeline_model_parallel_size == 1: return # --------------------------------------------------- # 如果流水线并行的度不为1时,依次做三件事: # 【初始化时】: # 1、在pp组最后一个进程上初始化一个we,令其取值全为0 # 2、在pp组第一个进程与最后一个进程间做一次allreduce,保证两者的we完全一致 # 【训练时】: # 3、每次想在pp组第一个/最后一个进程上使用we时,要做一次通信,保证两者用的we完全一致 if mpu.is_pipeline_last_stage(): # 若当前进程是pp组最后一个进程 assert not mpu.is_pipeline_first_stage() self._word_embeddings_for_head_key = word_embeddings_for_head # 初始化一个we(已按vocab_size维度切割,可参见megatron原理篇对we的讲解) # vocabparallelembedding将在下文详细讲解 self.word_embeddings = mpu.vocabparallelembedding( args.padded_vocab_size, # vocab_size args.hidden_size, # embed_dim init_method=init_method_normal(args.init_method_std), # 初始化方法(在model/utils.py下) ) # 用0填充we(等待下面做allreduce后取得第一个进程上的we) self.word_embeddings.weight.data.fill_(0) self.word_embeddings.weight.shared = true if torch.distributed.is_initialized(): if mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage(): # 若当前进程是pp组第一个或最后一个进程 # 在两进程间做allreduce,保证它们使用的we完全一致 # mpu.get_embedding_group:在源码解读1中讲过,是除dp/tp/pp之外设置的又一进程组, # 主要就是用来做关于we的通讯 torch.distributed.all_reduce( self.word_embeddings_weight().data, group=mpu.get_embedding_group() ) else: print( warning! distributed processes aren't initialized, so word embeddings in the last layer are not initialized. if you are just manipulating a model this is fine, but this needs to be handled manually. if you are training something is definitely wrong. ) 五、embedding emebdding类定义了word/position/segment embedding,并定义输入x过embedding层的计算方法。关键属性和方法如下图:
self.word_embeddings:来自自定义的vocabparallelembedding (下面会详述) 。「含“parallel”则意味着参数在tp组间做了切割」。因此self.word_embeddings 是切割好的we。每个进程上维护根据自己进程序号所取下的那块we(例如下图中的we1,we2,图片来自megatron原理篇): self.position_embeddings和self.tokentype_embeddings 这两者都和输入x相关,而输入x是不做切割的,因此这两者也无需切割。
state_dict_for_save_checkpoint和load_state_dict。在源码注解里,这两个函数分别给出了easy load 和customize load的注释,这个注释不是很贴切。实际上,前者用于在模型训练过程中及时读取当前参数,及时保存(做checkpoint);后者则一般用于模型的重载,例如训到一半挂掉了,我们就重新初始化一个新模型,重载上个checkpoint保存下的权重。
embedding层代码如下(一切尽在注释中):
class embedding(megatronmodule): language model embeddings. arguments: hidden_size: hidden size vocab_size: vocabulary size max_sequence_length: maximum size of sequence. this is used for positional embedding embedding_dropout_prob: dropout probability for embeddings init_method: weight initialization method num_tokentypes: size of the token-type embeddings. 0 value will ignore this embedding def __init__( self, hidden_size, # 每个token的向量维度 vocab_size, # 词表大小 max_sequence_length, # 最长序列长度 embedding_dropout_prob, # dropout probability for embeddings init_method, # 初始化权重的方法 num_tokentypes=0, # 类似于bert中的segment type ): super(embedding, self).__init__() args = get_args() self.hidden_size = hidden_size self.init_method = init_method self.num_tokentypes = num_tokentypes self.max_sequence_length = max_sequence_length # we size: (vocab_size//tp_n, hidden_size) # tp_n表示tp组模型并行度 self.word_embeddings = mpu.vocabparallelembedding( vocab_size, self.hidden_size, init_method=self.init_method) self._word_embeddings_key = 'word_embeddings' self.vocab_size = vocab_size # pe size: (max_seq_len, hidden_size) self.position_embeddings = torch.nn.embedding( max_sequence_length, self.hidden_size) self.position_embeddings = self.position_embeddings.half() self._position_embeddings_key = 'position_embeddings' # initialize the position embeddings. self.init_method(self.position_embeddings.weight) # te_size:(num_tokentypes, hidden_size) # te类似于bert中的segment embedding self._tokentype_embeddings_key = 'tokentype_embeddings' if self.num_tokentypes > 0: self.tokentype_embeddings = torch.nn.embedding(self.num_tokentypes, self.hidden_size) # initialize the token-type embeddings. self.init_method(self.tokentype_embeddings.weight) else: self.tokentype_embeddings = none # embeddings dropout self.embedding_dropout = torch.nn.dropout(embedding_dropout_prob) def add_tokentype_embeddings(self, num_tokentypes): 如果在pretrain阶段未定义te,而在fine-tune阶段te,则可通过此函数添加 if self.tokentype_embeddings is not none: raise exception('tokentype embeddings is already initialized') if torch.distributed.get_rank() == 0: print('adding embedding for {} tokentypes'.format(num_tokentypes), flush=true) self.num_tokentypes = num_tokentypes self.tokentype_embeddings = torch.nn.embedding(num_tokentypes, self.hidden_size) # initialize the token-type embeddings. self.init_method(self.tokentype_embeddings.weight) def forward(self, input_ids, position_ids, tokentype_ids=none): 定义输入x过embedding层的计算方法 # words_embeddings size = (b, seq_len, hidden_size) # 再次注意:self.word_embeddings做forward时,最终的输出结果时allreduce的(见上图) words_embeddings = self.word_embeddings(input_ids) # position_embeddings size = (b, seq_len, hidden_size) position_embeddings = self.position_embeddings(position_ids) # embedding = we + pe # embedding size = (b, seq_len, hidden_size) embeddings = words_embeddings + position_embeddings # 依需要决定是否增加te if tokentype_ids is not none: assert self.tokentype_embeddings is not none embeddings = embeddings + self.tokentype_embeddings(tokentype_ids) else: assert self.tokentype_embeddings is none # dropout. embeddings = self.embedding_dropout(embeddings) return embeddings def state_dict_for_save_checkpoint( self, destination=none, prefix='', keep_vars=false, ): for easy load. 在模型训练过程中及时读取当前参数,方便及时保存(做checkpoint) 篇幅限制,这里不展示细节 ... def load_state_dict(self, state_dict, strict=true): customized load. 用于模型的重载。例如训到一半挂掉了,我们就重新初始化一个新模型, 重载上个checkpoint保存下的权重。 篇幅限制,这里不展示细节 ... 六、vocabparallelembedding 该类用于定义分布式的word embedding,整体架构如下,同样只列举了核心属性和方法:
具体代码如下,可以特别关注「初始化和forward」部分,同时建议大家阅读理论篇中关于这一过程的详细讲解(一切尽在注释中)
class vocabparallelembedding(torch.nn.module): embedding parallelized in the vocabulary dimension. this is mainly adapted from torch.nn.embedding and all the default values are kept. arguments: num_embeddings: vocabulary size. embedding_dim: size of hidden state. init_method: method to initialize weights. def __init__(self, num_embeddings, embedding_dim, init_method=init.xavier_normal_): super(vocabparallelembedding, self).__init__() # keep the input dimensions. self.num_embeddings = num_embeddings # vocab_size self.embedding_dim = embedding_dim # hidden_state. # set the detauls for compatibility. self.padding_idx = none self.max_norm = none self.norm_type = 2.0 self.scale_grad_by_freq = false self.sparse = false self._weight = none # 当前进程所在tp组进程总数 self.tensor_model_parallel_size = get_tensor_model_parallel_world_size() # 根据当前进程在tp组中的序号,确定其所需维护的we部分,沿着vocab维度对we进行切割 # 例如,进程id=0, 维护词表序号[0,5)范围内的数据;进程id=1,维护[5,10) ( self.vocab_start_index, self.vocab_end_index, ) = vocabutility.vocab_range_from_global_vocab_size( self.num_embeddings, get_tensor_model_parallel_rank(), self.tensor_model_parallel_size, ) # 计算当前进程维护的词表大小 self.num_embeddings_per_partition = ( self.vocab_end_index - self.vocab_start_index ) # 对we做初始化 args = get_args() # 读取预训练参数配置 if args.use_cpu_initialization: # cpu上做初始化 self.weight = parameter( # 在cpu上先生成一个完整的we torch.empty( self.num_embeddings_per_partition, self.embedding_dim, dtype=args.params_dtype, # dtype=torch.float32, ) ) # 对cpu上的we做切割(随机种子在初始化分布式中已设定好,不用变) _initialize_affine_weight_cpu( self.weight, self.num_embeddings, self.embedding_dim, self.num_embeddings_per_partition, 0, init_method, # 初始化权重的方法,例如xavier之类 ) else: # 在gpu上做初始化 self.weight = parameter( # 生成一个切割好的we torch.empty( self.num_embeddings_per_partition, self.embedding_dim, device=torch.cuda.current_device(), dtype=args.params_dtype, # dtype=torch.float32, ) ) # 在gpu上做初始化,注意tp组内不同进程采用不同的随机种子 _initialize_affine_weight_gpu( self.weight, init_method, partition_dim=0, stride=1 ) def forward(self, input_): 定义输入x过we的计算方法,输出结果已经过allreduce if self.tensor_model_parallel_size > 1: # 如果使用tp # 如果在当前进程维护的we上,找不到对应的单词,那么对应位置就赋0 # 例如当前的数据的tokenid是:[2,7,1,5],当前维护的词表是[0,1,2](start_index=0, end_index = 3), # 则mask之后的数据为[2,0,1,0] # build the mask. input_mask = (input_ = self.vocab_end_index ) # mask the input. masked_input = input_.clone() - self.vocab_start_index masked_input[input_mask] = 0 else: masked_input = input_ # 输入x,过当前进程维护的部分we的结果 output_parallel = f.embedding( masked_input, # tensor containing indices into the embedding matrix self.weight, # 切割好的word embedding的权重 self.padding_idx, self.max_norm, self.norm_type, self.scale_grad_by_freq, self.sparse, ) # 当前词表不维护的部分,都设为0 if self.tensor_model_parallel_size > 1: output_parallel[input_mask, :] = 0.0 # # 将tp组各gpu上的结果做allreduce output = reduce_from_tensor_model_parallel_region(output_parallel) return outputdef _initialize_affine_weight_cpu(...): cpu版权重初始化。这个不难,大家可以自己阅读 ... def _initialize_affine_weight_gpu(...): gpu版权重初始化。特别关注设置随机种子部分 ... # 借助deepspeed或自定义的get_cuda_rng_tracker方法,对随机种子进行操作 # get_cuda_rng_tracker细节,大家可自行阅读源码 if ds_checkpointing.is_configured(): global get_cuda_rng_tracker get_cuda_rng_tracker = ds_checkpointing.get_cuda_rng_tracker with get_cuda_rng_tracker().fork(): init_method(weight) 七、parallelselfattention:分布式block的一般套路 【阅读提示】:阅读本节时可:
对照第一部分codegeex框架图 对照megatron理论篇对矩阵切分的讲解 首先来看切割attention的示意图,由图可知,「对qkv矩阵,采用“列切割”,对线性矩阵b,采用“行切割”」。这样设计的好处是,在经过qkv的计算后,各进程在不用通讯的前提下,继续做线性计算,直到最后一步才allreduce,起到降低通讯成本的作用:
我们先单独来看“列切割”与“行切割”的实现代码。megatron将它们定义成了两个nn.module类。
7.1 列切割:columnparallellinear 列切割示意图如下:
f和g是两个共轭算子,可理解为两个torch.autograd.function类。在这个类下,我们可以「根据需要重写forward和backward方法」。
f: 「forward中,直接copy输入;backward中,对梯度做allreduce」。在代码里定义为class _copytomodelparallelregion(torch.autograd.function)
g: 「forward中,all-gather输出;backward中,对梯度做split」(每张卡经过all-gather已有完整的y了,因此以y为起点计算梯度后,沿着列做split就可得到y1和y2的梯度)。在代码里定义为class _gatherfrommodelparallelregion(torch.autograd.function)
class columnparallellinear(torch.nn.module): linear layer with column parallelism. the linear layer is defined as y = xa + b. a is parallelized along its second dimension as a = [a_1, ..., a_p]. arguments: input_size: first dimension of matrix a. output_size: second dimension of matrix a. bias: if true, add bias gather_output: if true, call all-gether on output and make y avaiable to all gpus, otherwise, every gpu will have its output which is y_i = xa_i init_method: method to initialize weights. note that bias is always set to zero. stride: for the strided linear layers. keep_master_weight_for_test: this was added for testing and should be set to false. it returns the master weights used for initialization. skip_bias_add: this was added to enable performance optimations where bias can be fused with other elementwise operations. we skip adding bias but instead return it. # 该类定义了切割后的权重w,例如对上图来说,w1和w2都可分别视为该类的一个实例 def __init__( self, input_size, # w的第一个维度 output_size, # w的第二个维度 bias=true, # 是否需要引入bias gather_output=true, # 决定是否要将y1和y2做all-gather init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=false, skip_bias_add=false, params_dtype=none, skip_init=false, device=none, ): super(columnparallellinear, self).__init__() # keep input parameters self.input_size = input_size self.output_size = output_size self.gather_output = gather_output # divide the weight matrix along the last dimension. # 当前进程所在tp组的总进程数 world_size = get_tensor_model_parallel_world_size() # 每块gpu上维护的hidden_size的大小,等于 原hidden_zize // tp组总进程数 self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add self.params_dtype = params_dtype self.device = device # parameters. # note: torch.nn.functional.linear performs xa^t + b and as a result # initialize weight. args = get_args() # 取得命令行所有的参数 if not skip_init: if args.use_cpu_initialization: # cpu上初始化 self.weight = parameter( torch.empty( self.output_size_per_partition, self.input_size, dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) self.master_weight = _initialize_affine_weight_cpu( # self.weight, self.output_size, self.input_size, self.output_size_per_partition, 0, init_method, stride=stride, return_master_weight=keep_master_weight_for_test, ) else: # gpu上初始化 self.weight = parameter( torch.empty( self.output_size_per_partition, self.input_size, device=self.device if self.device is not none else torch.cuda.current_device(), dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) _initialize_affine_weight_gpu( self.weight, init_method, partition_dim=0, stride=stride ) else: self.register_parameter(weight, none) # 对bias做处理,道理同weight if bias and not skip_init: if args.use_cpu_initialization: # cpu上初始化 self.bias = parameter( torch.empty(self.output_size_per_partition, dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype) ) else: self.bias = parameter( # gpu上初始化 torch.empty( self.output_size_per_partition, device=self.device if self.device is not none else torch.cuda.current_device(), dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) set_tensor_model_parallel_attributes(self.bias, true, 0, stride) # always initialize bias to zero. with torch.no_grad(): self.bias.zero_() else: self.register_parameter(bias, none) def forward(self, input_): # 定义列切割中的f算子 # 调用copy_to_tensor_model_parallel_region则新建一个_copytomodelparallelregion实例(见下) input_parallel = copy_to_tensor_model_parallel_region(input_) bias = self.bias if not self.skip_bias_add else none # 定义bias output_parallel = f.linear(input_parallel, self.weight, bias) # x * 切割好的权重 # 决定是否要对每个进程上的输出结果做all-reduce if self.gather_output: # 定义列切割中的g算子 # 调用gather_from_tensor_model_parallel_region则新建一个_gatherfrommodelparallelregion实例(见下) output = gather_from_tensor_model_parallel_region(output_parallel) # 把各gpu上的输出按照列gather起来后,作为最终输出 else: output = output_parallel # 否则最终输出还是自己算的那块gpu output_bias = self.bias if self.skip_bias_add else none return output, output_bias# 列切割中的f与gclass _copytomodelparallelregion(torch.autograd.function): pass the input to the model parallel region. # 列切割下的f算子 # forward:copy输入 # backward:对梯度做allreduce @staticmethod def symbolic(graph, input_): return input_ @staticmethod def forward(ctx, input_): return input_ @staticmethod def backward(ctx, grad_output): return _reduce(grad_output)class _gatherfrommodelparallelregion(torch.autograd.function): gather the input from model parallel region and concatinate. # 列切割中的g算子 # forward:all-gather输出 # backward:对梯度,沿着列方向做split @staticmethod def symbolic(graph, input_): return _gather(input_) @staticmethod def forward(ctx, input_): return _gather(input_) @staticmethod def backward(ctx, grad_output): return _split(grad_output) 7.2 行切割:rowparallellinear f: forward中,按列split输入;backward中,all-gather梯度 g: forward中,allreduce输出;backward中,直接输出梯度,无需做任何通讯(因为经过g的foward,每块gpu上已拥有了yi和y,则根据图中g的backward公式可知,每块gpu可独立计算梯度) 代码如下:
class rowparallellinear(torch.nn.module): linear layer with row parallelism. the linear layer is defined as y = xa + b. a is parallelized along its first dimension and x along its second dimension as: - - | a_1 | | . | a = | . | x = [x_1, ..., x_p] | . | | a_p | - - arguments: input_size: first dimension of matrix a. output_size: second dimension of matrix a. bias: if true, add bias. note that bias is not parallelized. input_is_parallel: if true, we assume that the input is already split across the gpus and we do not split again. init_method: method to initialize weights. note that bias is always set to zero. stride: for the strided linear layers. keep_master_weight_for_test: this was added for testing and should be set to false. it returns the master weights used for initialization. skip_bias_add: this was added to enable performance optimations where bias can be fused with other elementwise operations. we skip adding bias but instead return it. def __init__( self, input_size, output_size, bias=true, input_is_parallel=false, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=false, skip_bias_add=false, params_dtype=none, skip_init=false, device=none, ): super(rowparallellinear, self).__init__() # keep input parameters self.input_size = input_size self.output_size = output_size self.input_is_parallel = input_is_parallel # divide the weight matrix along the last dimension. world_size = get_tensor_model_parallel_world_size() self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add self.params_dtype = params_dtype self.device = device # parameters. # note: torch.nn.functional.linear performs xa^t + b and as a result # we allocate the transpose. # initialize weight. args = get_args() if not skip_init: if args.use_cpu_initialization: self.weight = parameter( torch.empty( self.output_size, self.input_size_per_partition, dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) self.master_weight = _initialize_affine_weight_cpu( self.weight, self.output_size, self.input_size, self.input_size_per_partition, 1, init_method, stride=stride, return_master_weight=keep_master_weight_for_test, ) else: self.weight = parameter( torch.empty( self.output_size, self.input_size_per_partition, device=self.device if self.device is not none else torch.cuda.current_device(), dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) _initialize_affine_weight_gpu( self.weight, init_method, partition_dim=1, stride=stride ) else: self.register_parameter(weight, none) if bias and not skip_init: if args.use_cpu_initialization: self.bias = parameter( torch.empty(self.output_size, dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype) ) else: self.bias = parameter( torch.empty( self.output_size, device=self.device if self.device is not none else torch.cuda.current_device(), dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype, ) ) # always initialize bias to zero. with torch.no_grad(): self.bias.zero_() else: self.register_parameter(bias, none) def forward(self, input_): # set up backprop all-reduce. if self.input_is_parallel: input_parallel = input_ else: input_parallel = scatter_to_tensor_model_parallel_region(input_) # matrix multiply. output_parallel = f.linear(input_parallel, self.weight) # all-reduce across all the partitions. output_ = reduce_from_tensor_model_parallel_region(output_parallel) if not self.skip_bias_add: output = output_ + self.bias if self.bias is not none else output_ output_bias = none else: output = output_ output_bias = self.bias return output, output_bias# 行切割中的f和g算子class _scattertomodelparallelregion(torch.autograd.function): split the input and keep only the corresponding chuck to the rank. # 行切割中的f算子 # forward:沿列split输入 # backward:all-gather梯度 @staticmethod def symbolic(graph, input_): return _split(input_) @staticmethod def forward(ctx, input_): return _split(input_) @staticmethod def backward(ctx, grad_output): return _gather(grad_output) class _reducefrommodelparallelregion(torch.autograd.function): all-reduce the input from the model parallel region. # 行切割中的g算子 # forward:allreduce输出 # backward:正常计算梯度,gpu间无需做任何通讯 @staticmethod def symbolic(graph, input_): return _reduce(input_) @staticmethod def forward(ctx, input_): return _reduce(input_) @staticmethod def backward(ctx, grad_output): return grad_output 7.3 parallelselfattention 该类的构造如下图:
这张图中透露的核心含义是,「每个进程上维护的都是按列切割完的qkv矩阵」,进程间独立计算,qkv矩阵的输出结果一般不做allreduce。同时,「每个进程上维护的是按行切割完的dense(线型层)矩阵」,attention输出过线性层后的结果,做allreduce。另外,在设置attention_dropout时,同样调用了get_cuda_rng_tracker 方法,令tp组内的进程拥有不同的随机种子。「最后,你可能想问,dense后的dropout去哪里了」?代码里把它定义到了paralleltransformerlayer 下(等于attention + mlp)。
相信有了上面的说明,看这块代码就不难了。篇幅限制,这里不展示代码了。大家可以对照着codegeex架构图,来看这里multi-head attention的计算方式。
parallelmlp,paralleltransformerlayer和paralleltransformer都采用的是一样的套路,也略过不言。
八、crossentropy 现在,终于可以来看模型的最后一层:交叉熵的平行计算。核心类为_vocabparallelcrossentropy
我们在原理篇中讲过交叉熵的并行计算,其优化核心是将通讯量从b*s*v降至b*s。但是megatron代码中定义的交叉熵计算方式,稍微复杂一些,也和我们一般理解的交叉熵有些许差异。所以我们先用图解,来看下代码定义的交叉熵计算流程:
【注】:
对x和y_i来说,(b, s, h)维度下应该画成一个立方体,为了表达简练,这里将b拍平了。 对其余维度中含b的矩阵,b正常表示,即row=b 8.1 计算logit 首先,在使用_vocabparallelcrossentropy 计算交叉熵前,我们需要计算logit。这时我们调用parallel_lm_logits 函数,将模型最后一层的输出x(复习一下,这个x已经在tp组内allreduce了),乘上当前进程上维护的输入层we的转置(复习一下,输入层和输出层共用一套embedding),得到当前进程的logit y_i,「同时我们选择不对输出logit做allreduce」。
你可能会有一个疑惑:「在transformer中,输出层会额外训练一个线性矩阵,来计算logit;为什么在gpt中,可以用输入层we的转置来代替这个线性矩阵?」
这个问题的答案,对理解megatron交叉熵计算也至关重要。我们可「将x*we^t结果理解成“x与we间的相似度”」,例如对y1来说,它的第一行中的每个logit,表示第一个token与词表里每个词的相似度。
注意到每个进程上只维护部分we。例如,假设词表共有5个单词,we1维护前5个单词,we2维护后5个单词。因此再严格来说:「对y1,它的第一行中的每个logit,表示第一个token与词表中前5个词的相似度;对y2,它的第一行中的每个logit,表示第一个token与词表中后5个词的相似度。我们要记住这个含义。」
8.2 计算交叉熵 知道了logit的含义,我们来看交叉熵计算。
首先做了一系列求max的计算,得到基于全局的max(logit),再将orig_logit - max(logit),得到处理后的结果。这步理解起来不难,主要目的是为了防止计算溢出。
「接下来,就是基于logit算loss了。」
每个进程上都有一份(b, s)维度的真值,它表示每个token的真值是哪个词(词用id表示)。我们基于这份真值,在y_i上找出真值位置的logit。例如:seq_length = 3,即我们需要对3个token去做预测,假设前两个token的真值在第1个进程所维护的we1中,最后一个token的真值在第2个进程所维护的we2中。那么我们去y1的前两行里,取出真值位置的logit,这个logit表示“token与真值的相似度”,去y2的最后一行里做同样操作。
这样,我们就能得到l1和l2,和真值位置不对应的地方,统一填充0。随后对l1和l2做allreduce,得到l。「l中的每行表示“token与真值间的相似度」
现在,我们回来对y1和y2的每一行求sum(e^logit),得到e1和e2。将e1和e2做allreduce,得到e。「e中的每行表示“token和词表中所有词相似度的总和”」
我们希望「(token和词表中所有词相似度的总和-token与真值间的相似度) /token和词表中所有词相似度的总和」这个值最小,这个差值就是最终的loss。
8.3 代码 理清了这点,现在可以来看代码了(一切尽在注释中),建议对这块还有疑问的朋友,可以写个test脚本把中间结果打印出来,方便理解:
class _vocabparallelcrossentropy(torch.autograd.function): 分布式计算loss @staticmethod def forward(ctx, vocab_parallel_logits, target): # 1. logit - global max(logit)操作,主要目的是防溢出 logits_max = torch.max(vocab_parallel_logits, dim=-1)[0] # (b, s, 1) torch.distributed.all_reduce( # (b, s, 1) logits_max, op=torch.distributed.reduceop.max, # 找全局最大值 group=get_tensor_model_parallel_group(), ) # subtract the maximum value. vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1)) # 原始gpu上维护的logits减去每行最大值(防止溢出) # 2、根据当前进程id,取出当前进程所维护词表序号等信息 # 函数,能够获取当前进程所维护词表的start_index和end_index get_vocab_range = vocabutility.vocab_range_from_per_partition_vocab_size # 这块gpu上logits最后一维的大小,等于所维护的词表的大小(v/n) partition_vocab_size = vocab_parallel_logits.size()[-1] # 取得当前进程所在tp组中的序号 rank = get_tensor_model_parallel_rank() # 取得当前进程所在tp组的总进程数 world_size = get_tensor_model_parallel_world_size() # 取得当前进程所维护的词表的start_index和end_index vocab_start_index, vocab_end_index = get_vocab_range( partition_vocab_size, rank, world_size ) # 3. 基于真值,取出每个token在真值位置上的logit(即和真值的相似度) # create a mask of valid vocab ids (1 means it needs to be masked) target_mask = (target = vocab_end_index) # target = (b, s) masked_target = target.clone() - vocab_start_index masked_target[target_mask] = 0 # get predicted-logits = logits[target]. # for simplicity, we convert logits to a 2-d tensor with size # [*, partition-vocab-size] and target to a 1-d tensor of size [*]. logits_2d = vocab_parallel_logits.view(-1, partition_vocab_size) # (b*s, v/n) masked_target_1d = masked_target.view(-1) # (b*s) arange_1d = torch.arange( # [b*s] start=0, end=logits_2d.size()[0], device=logits_2d.device ) # logits_2d[arange_1d, masked_target_1d]: # tensor的切片操作。arange_1d:取出所有的行。masked_target_1d:取出logit predicted_logits_1d = logits_2d[arange_1d, masked_target_1d] # (b*s) predicted_logits_1d = predicted_logits_1d.clone().contiguous() predicted_logits = predicted_logits_1d.view_as(target) # (b, s) predicted_logits[target_mask] = 0.0 # all reduce is needed to get the chunks from other gpus. torch.distributed.all_reduce( # allreduce之后得到的logit矩阵为(b, s),每一个位置表示对应真值位置的预测logit predicted_logits, op=torch.distributed.reduceop.sum, group=get_tensor_model_parallel_group(), ) # sum of exponential of logits along vocab dimension across all gpus. exp_logits = vocab_parallel_logits # (b, s, v/n) torch.exp(vocab_parallel_logits, out=exp_logits) sum_exp_logits = exp_logits.sum(dim=-1) # (b, s) torch.distributed.all_reduce( sum_exp_logits, op=torch.distributed.reduceop.sum, group=get_tensor_model_parallel_group(), ) # 4. 计算loss = log(sum(exp(logits))) - predicted-logit. loss = torch.log(sum_exp_logits) - predicted_logits # (b, s) # store softmax, target-mask and masked-target for backward pass. exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1)) ctx.save_for_backward(exp_logits, target_mask, masked_target_1d) return loss @staticmethod def backward(ctx, grad_output): # retreive tensors from the forward path. softmax, target_mask, masked_target_1d = ctx.saved_tensors # all the inputs have softmax as their gradient. grad_input = softmax # for simplicity, work with the 2d gradient. partition_vocab_size = softmax.size()[-1] grad_2d = grad_input.view(-1, partition_vocab_size) # add the gradient from matching classes. arange_1d = torch.arange(start=0, end=grad_2d.size()[0], device=grad_2d.device) grad_2d[arange_1d, masked_target_1d] -= 1.0 - target_mask.view(-1).float() # finally elementwise multiplication with the output gradients. grad_input.mul_(grad_output.unsqueeze(dim=-1)) return grad_input, none 九、总结 啊这总结怎么写呢,呕心沥血终于写完了。希望能给到大家帮助!
德国工业4.0子体系统与工业互联网解析
胶囊生产设备远程维护解决方案,在家也能控制设备
机器视觉检测的概念及应用优势
KINAX角位变送器在水位控制的应用
孙丕恕:工业互联网必将为实体经济振兴、为中国经济腾飞插上翅膀
图解大模型训练之:Megatron源码解读2,模型并行
SWM32S基于GT9157的触摸芯片驱动
均衡电流,实现车规智能驱动器的最佳性能
酷派老将遭遇乐视清洗 众高管离职加盟ivvi
微软MR原型创作工具Maquette:针对PC VR头显,能在VR中创作3D内容
梁孟松:借助新封装技术获得高性能芯片
关于高频淬火的常见缺陷,它的产生原因是什么
马化腾马云布局VR产业 VR技术基础有望在5G时代迎来质变
电动汽车特斯拉股价大跌8.6% 马斯克再次跌落世界首富宝座
声发射的基本原理、特点及应用
双声道d类音频功放芯片的详细介绍
PESD1LIN供应商 杭州东沃电子
一起来了解Self-test的重要性!
黑鲨2Pro 8GB+128GB版开启预售 售价2699元
奥迪一汽新能源汽车有限公司CIO等一行到访启明信息调研