图解大模型训练之:Megatron源码解读2,模型并行

源码解读系列将和大家一起来读megatron的pretrain部分代码。
在源码解读第一篇中,我们讲解了如何做「分布式环境初始化」,即按照dp/tp/pp对进程进行分组,并为每个进程指定gpu。在这一章中,我们将一起读「模型并行部分」:如何切分模型,并搬入分布式环境定义好的dp/tp/pp组中。
「本文将提供:」
详细的图解。画图说明代码的设计架构,讲清代码想做一件什么事。 详细的代码注释。在图解的基础上,提取核心代码部分,并附上注释。 「如何利用本文提高源码阅读效率:」
先看一~三部分。了解模型并行的设计思想、整体框架及入口函数。 打开megatron源码,找到入口函数,开始阅读。 阅读中的每一块细节,可参考四~八部分。 「阅读本文前置知识:」
图解大模型训练之:张量模型并行,megatron-lm 图解大模型训练之:megatron源码解读1,分布式环境初始化 「本文目录:」
一、模型概述
二、模型切割在做一件什么事
2.1 模型切割设计思想 2.2 随机种子 三、模型并行框架
3.1 模型并行入口函数 3.2 定义并搬运模型 3.3 分布式模型:codegeex 四、megatronmodule
五、emebdding
六、vocabparallelemebdding
七、parallelselfattention:分布式block的一般套路
7.1 列切割:columnparallellinear 7.2 行切割:rowparallellinear 7.3 parallelselfattention 八、crossentropy
8.1 计算logit 8.2 计算交叉熵 九、筋疲力尽的总结
十、参考(本文相关源码与论文)
一、模型概述 前文说过,用megatron做分布式训练的开源大模型有很多,我们选用的是thudm开源的codegeex(代码生成式大模型,类比于openai codex)。选用它的原因是“完全开源”与“清晰的模型架构和预训练配置图”,能帮助我们高效阅读源码。我们再来回顾下这两张图。
「模型架构」
「预训练配置」
由图可知,codegeex在预训练中采用的是8头tp(同一个node内的8张卡做tp,8张卡组成一个完整的模型),192头dp(192个node间做dp),一共1536块gpu进行训练。
「【阅读提醒】:如果你对gpt模型比较熟悉,则不需要花时间细看codegeex架构图也能无障碍阅读本文。架构图只是在涉及模型细节时,可以对照着看。」
二、模型切割在做一件什么事 2.1 模型切割设计思想 回顾一下,在初始化分布式环境中,我们根据dp/tp/pp组设置并划分了进程,确定了模型的切割方法,如下图:(注意:这并不是codegeex的划分框架,而是一个更广义的例子,细节可阅读上篇讲解)
接下来,我们就可以根据这个框架来切割模型了。pytorch默认将模型(nn.module)定义在cpu上,因此,我们在cpu上定义并初始化模型,然后将其搬运到当前进程所对应的gpu上,整个过程如下图:首先,我们是面向进程编程的,也就是整份脚本处理的是发生在1个进程上的事情。这样做的好处是,我们只需要维护1份脚本,然后将其发去不同机器的各张卡上执行,就能实现全局的并行。
但是,1个进程处理的是模型的不同部分,比如gpt模型,它的pre层涉及到embedding计算,post层涉及到softmax和loss的计算,这样每个进程上处理的模型是不一样的,这时怎么办呢?别忘了,我们能够取到进程id(全局或dp/tp/pp组内的),这样我们就能通过进程id,写if...else...来解决模型差异化问题了。
明确了这个思想,现在我们可以开始写代码了,我们有两种方式对模型进行切割:
「方案一:」先定义出完整的模型,并对模型参数做初始化,然后根据进程id取出相应子模型,搬运到gpu上 「方案二:」直接根据进程id,设计好当前子模型,做参数初始化,搬运到gpu上 这两者的核心差别,在于“随机种子”的设定。
2.2 随机种子 在分布式训练中,「随机种子是非常重要的,它关系到模型是否能够复现」。例如我们采取activation checkpoint的技术来节省显存时,在backward过程中我们需要重算forward得到activation,这时候就需要我们完整复现之前forward的过程,各类参数的初始化结果也要和之前完全一致。
我们来看几个例子:
例1: word embedding we1和we2间需要采用不同的随机种子。因为若采用相同的随机种子,则we1和we2的结果完全一样,这不等价于先随机初始化we,再将它进行切割。
例2: dropout 左侧方框中的2个dropout,在初始化时需要用不同的随机种子。因为这样才等价于对完整的dropout做初始化,然后再切割。右侧方框中的dropout,需要用相同的随机种子(虽然右边只画了1个dropout,但其实是2个dropout,每块gpu上各一个,因为此时两块gpu上的输出已经allreduce,是完全一致的。做完allreduce后,两块gpu继续独立计算,因此实际上有两个dropout)。
关于随机种子设定的一般结论 从例子中,我们可以得出一个结论:「一般在tp/pp组内,设定不同的随机种子。而在dp组内,设定相同的随机种子。」 这只是一个一般结论,我们可以根据实际情况去调整。
最后,回到模型切割上,方案1(先做整体初始化再切割)在代码里被称为“cpu上的初始化”(_initialize_affine_weight_cpu),方案2(直接对局部初始化)被称为“在gpu上的初始化”(_initialize_affine_weight_gpu)。我们会在切割部分的代码里经常看见它们。
三、模型并行框架 现在,我们可以来看具体的代码了
3.1 模型并行入口函数 模型并行部分的代码入口依然在megatron/training.py的pretrain函数下,代码如下:
def pretrain(    train_valid_test_dataset_provider,    model_provider,    forward_step_func,    valid_forward_step_func=none,    extra_args_provider=none,    args_defaults={},):      # 1.初始化分布式环境(源码解读1内容)    initialize_megatron(        extra_args_provider=extra_args_provider, args_defaults=args_defaults    )    ...    # 2、模型并行:定义模型架构,并切割模型(本文重点)    model, optimizer, lr_scheduler = setup_model_and_optimizer(model_provider)    ...        # 3、构造train/val/test数据集(下一篇将讲述)    ... (            train_data_iterator,            valid_data_iterator,            test_data_iterator,        ) = build_train_valid_test_data_iterators(train_valid_test_dataset_provider)         ...    # 4、训练(下下一篇将讲述)    iteration = train(            forward_step_func,            valid_forward_step_func,            model,            optimizer,            lr_scheduler,            train_data_iterator,            valid_data_iterator,        )        ... 由代码可知,setup_model_and_optimizer是整个模型并行的入口函数,如下图,它主要由”「定义模型架构并切割模型」“,“「设置optimizer」”和“「设置学习率」”三部分组成。我们关注的重点在第一部分上(get_model)。
3.2 定义并搬运模型 get_model的内容可简化成下图:
get_model函数主要做了两件事:
在cpu上定义模型。pytorch默认在cpu上定义模型(nn.module)。model_provider是一个函数,调用它即可返回cpu版的模型,也就是一个codegeex类,这个将是下文要介绍的重点。
把模型从cpu搬运至gpu上。这里有两种方法可供选择:
「显式搬运。」即手动将模型搬运到当前进程所对应的gpu上 「权重精度设定。」由zero的思想可知,在模型训练中,把权重精度从fp32降至fp16,是一种节省显存的好办法。如果确定使用这种优化办法,将模型搬运到gpu上后,我们需要修改精度。 「初始化dp组」。这里指的是定义dp组间forward、backward和梯度计算与通讯等方法。在megatron中,tp和pp组的这些方法是人为定义的(在定义cpu模型时已设置好,我们将在下文讲codegeex细节时看到),而dp组则是可以用现成的(torch的distributeddataparallel)。在具体使用时,我们可以:(1)直接调用distributeddataparallel。或(2)在distributeddataparallel这个类的基础上做一些改进,例如增加对碎片化内存的管理,对计算梯度时的精度控制等。 「方案一:借助deepspeed进行管理」。在源码解读1中我们提过,秉持着万物皆可wrap的原则,按照deepspeed官网教程,只需要在megatron的某些文件中插入相应代码,就可以让deepspeed来管理模型的分布式、dp组间的显存优化等,这里同理。 「方案二:手动搬运管理。」这里需要我们以下事情: get_model函数的核心代码如下(一切尽在注释中):
def get_model(model_provider_func):    build the model.    args = get_args()    # 1、定义并构建cpu版模型    if ( # 1.1、当分布式进行框架采用virtual pipeline (是nvdia后续提出的对megatron的优化方法,可先忽略不看)        mpu.get_pipeline_model_parallel_world_size() > 1         and args.virtual_pipeline_model_parallel_size is not none     ):        model = []        for i in range(args.virtual_pipeline_model_parallel_size):            mpu.set_virtual_pipeline_model_parallel_rank(i)             # set pre_process and post_process only after virtual rank is set.            pre_process = mpu.is_pipeline_first_stage()            post_process = mpu.is_pipeline_last_stage()             this_model = model_provider_func(                 pre_process=pre_process, post_process=post_process            )             model.append(this_model)     else: # 1.2 其余情况        # 判断当前进程是否是pp组的第一个进程(例如第一部分图例中pp组的g0)        pre_process = mpu.is_pipeline_first_stage()        # 判断当前进程是否是pp组的最后一个进程(例如第一部分图例中pp组的g12)        post_process = mpu.is_pipeline_last_stage()        # 构建cpu版codegeex模型        model = model_provider_func(pre_process=pre_process, post_process=post_process)       ...        # 2、将模型从cpu搬运到gpu上    # 2.1 如果采用megatron-deepspeed的方式,则直接返回模型,后面的搬运,数据并行等工作将由deepspeed来完成    # ref:https://www.deepspeed.ai/tutorials/megatron/    if args.deepspeed:         return model    # 将当前进程所维护的模型,从cpu搬运到gpu上(gpu即为在初始化时为当前进程分配的那块gpu)    print(f > moving model to gpu ..., flush=true)    for model_module in model:        model_module.cuda(torch.cuda.current_device())    print(f > moving to gpu done, flush=true)    # fp16转换(pytorch默认模型参数精度为fp32,依需决定计算过程中是否要转成fp16,节省显存)    if args.fp16 or args.bf16:        print(f > converting model to fp16 ..., flush=true)        model = [float16module(model_module, args) for model_module in model]        print(f > converting to fp16 done, flush=true)        # 采用pytorch定义的distributeddataparallel管理数据并行    if args.ddp_impl == torch:         i = torch.cuda.current_device()         model = [            torchddp(                model_module,                device_ids=[i],                output_device=i,                process_group=mpu.get_data_parallel_group(), # 数据并行的组            )            for model_module in model        ]        return model        # 采用自定义的distributeddataparallel管理数据并行    # 即在pytorch的distributeddataparallel的基础上,自己再定义内存管理、梯度精度等计算方式,更有效利用显存    if args.ddp_impl == local: # 自定义的数据并行类在megatron/model/distributed.py下        print(f > creating ddp model ..., flush=true)        model = [            localddp(                model_module,                args.accumulate_allreduce_grads_in_fp32,                args.use_contiguous_buffers_in_ddp,            )            for model_module in model        ]        print(f > creating ddp model done, flush=true)        return model    raise notimplementederror(        unknown ddp implementation specified: {}.  exiting..format(args.ddp_impl)    ) 特别说明的是,前文提过模型的首尾两层和中间层的架构可能不一样,因此我们通过pre_process 和post_process来做区分。(当然你也能选择用进程序id,只是首尾两层经常被q到,所以这里单独明确了下)。对codegeex来说,由它预训练配置可知,它的pp并行度为1,也就是1块gpu上涵盖了模型的第一层至最后一层,所以pre_process和post_process实际上没有用到。感兴趣的朋友可以阅读nvidia megatron源码下关于bert、gpt2的预训练代码,具体了解pre_process和post_process在定义模型时起的作用。
3.3 分布式模型:codegeex 现在,我们来看最核心的分布式模型:codegeex类。
前文说过,1个脚本处理的是1个进程上发生的事情,而1个进程对应的是模型的一部分。单进程的架构如下:
图中每个方框都表示源码里定义的一个nn.module 类(除了最上的方框外)具体定义为:
codegeex: 定义一块gpu上的模型。它由transformerlanguagemodel 和_vocabparallelcrossentropy这两个核心类组成。 transformerlanguagemodel:定义每块gpu上输入层embedding和中间block层的结构 embedding: 定义每块gpu上输入层embedding结构及相关计算,输出结果已allreduce(tp组间) paralleltransformer:定义每块gpu上所有中间blocks的结构及相关计算,输出结果已allreduce(tp组间) paralleltransformerlayer: 定义每块gpu上单个block的结构及相关计算,输出结果已allreduce(tp组间) parallelselfattention: 定义每块gpu上单个block中,attention的结构及相关计算,输出结果已allreduce(tp组间) parallelmlp: 定义每块gpu上单个block中,mlp层的结构及相关计算,输出结果已allreduce(tp组间)。 _vocabparallelcrossentropy: torch.autograd.function,定义每块gpu上,输出层embedding、softmax和loss等结构及相关计算。 「为什么需要对输出做allreduce?」回顾megtron理论部分的讲解,在纵向切割模型时,megatron是在输入x完整的情况下,设计模型切割的方式的。因此,对于模型的每一层输出,我们都要在tp组间做allreduce,来保证下一层拿到的输入也是完整的。类名字中的parallel,也是指在tp组中做并行,如下图所示:
到这一步,我们终于把模型切割部分的整体流程讲完了。「虽然我们是以codegeex为例,但这个流程图可以看作是通用的。」不同模型间只有模型具体结构、dp/tp/pp组设置这些方面的差别,整个并行框架是通用的。下面,我们来探究图中所绘的各个类的细节。
四、megatronmodule 上面所绘制的几类,并不是直接继承自nn.module ,而是皆继承于自定义的class megatronmodule(torch.nn.module)。我们说过,gpt类模型,输入和输出层共用一个word embedding。因此,这个类的主要作用,就是令pp组的第一个进程和最后一个进程满足这个条件(不过我不懂为什么要把这个限制放在一个大母类中去做,设计上感觉有点奇怪)。megatronmodule类的整体架构如下:
特别说明,「initialize_word_embedding 并不是某一具体的初始化we方法,它只是起到如图所说的强制作用。」
megatronmodule的代码如下(一切尽在注释中):
class megatronmodule(torch.nn.module):    megatron specific extensions of torch module with support    for pipelining.    def __init__(self, share_word_embeddings=true):        super(megatronmodule, self).__init__()        # input和output是否要共享一套we        self.share_word_embeddings = share_word_embeddings    def state_dict_for_save_checkpoint(        self, destination=none, prefix=, keep_vars=false    ):        use this function to override the state dict for        saving checkpoints.        # 模型训练中,及时将参数保存到指定位置(设置checkpoint),        # 这样在训练出问题时,可以从checkpoint点重新load参数,继续训练        return self.state_dict(destination, prefix, keep_vars)    def word_embeddings_weight(self):        获取word_embedding        if mpu.is_pipeline_first_stage(ignore_virtual=true):            return self.language_model.embedding.word_embeddings.weight         if mpu.is_pipeline_last_stage(ignore_virtual=true):             if not self.share_word_embeddings:                raise exception( # 强制要求共享一套embedding                    word_embeddings_weight() called for last                     stage, but share_word_embeddings is false                )            return self.word_embeddings.weight # 参见initialize_word_embeddings中we的定义        raise exception( # 如果当前进程是pp组的中间进程,则其上未维护we,因此当然获取不到            word_embeddings_weight() should be  called for first and last stage only        )    def initialize_word_embeddings(self, init_method_normal):        强制pp组最后一个进程初始化we时,直接使用pp组第一个进程的we        args = get_args()        if not self.share_word_embeddings: # 强制share embeddingg            raise exception(                initialize_word_embeddings() was called but                 share_word_embeddings is false            )        # pp组并行度为1时,第一层和最后一层都在一块gpu上,天然共享we,无需做强制        if args.pipeline_model_parallel_size == 1:            return        # ---------------------------------------------------        # 如果流水线并行的度不为1时,依次做三件事:        # 【初始化时】:        # 1、在pp组最后一个进程上初始化一个we,令其取值全为0        # 2、在pp组第一个进程与最后一个进程间做一次allreduce,保证两者的we完全一致        # 【训练时】:        # 3、每次想在pp组第一个/最后一个进程上使用we时,要做一次通信,保证两者用的we完全一致               if mpu.is_pipeline_last_stage(): # 若当前进程是pp组最后一个进程            assert not mpu.is_pipeline_first_stage()            self._word_embeddings_for_head_key = word_embeddings_for_head             # 初始化一个we(已按vocab_size维度切割,可参见megatron原理篇对we的讲解)            # vocabparallelembedding将在下文详细讲解            self.word_embeddings = mpu.vocabparallelembedding(                args.padded_vocab_size, # vocab_size                args.hidden_size, # embed_dim                init_method=init_method_normal(args.init_method_std), # 初始化方法(在model/utils.py下)            )            # 用0填充we(等待下面做allreduce后取得第一个进程上的we)            self.word_embeddings.weight.data.fill_(0)             self.word_embeddings.weight.shared = true                if torch.distributed.is_initialized():            if mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage(): # 若当前进程是pp组第一个或最后一个进程                # 在两进程间做allreduce,保证它们使用的we完全一致                # mpu.get_embedding_group:在源码解读1中讲过,是除dp/tp/pp之外设置的又一进程组,                # 主要就是用来做关于we的通讯                torch.distributed.all_reduce(                    self.word_embeddings_weight().data, group=mpu.get_embedding_group()                )        else:            print(                warning! distributed processes aren't initialized, so                 word embeddings in the last layer are not initialized.                 if you are just manipulating a model this is fine, but                 this needs to be handled manually. if you are training                 something is definitely wrong.            ) 五、embedding emebdding类定义了word/position/segment embedding,并定义输入x过embedding层的计算方法。关键属性和方法如下图:
self.word_embeddings:来自自定义的vocabparallelembedding (下面会详述) 。「含“parallel”则意味着参数在tp组间做了切割」。因此self.word_embeddings 是切割好的we。每个进程上维护根据自己进程序号所取下的那块we(例如下图中的we1,we2,图片来自megatron原理篇): self.position_embeddings和self.tokentype_embeddings 这两者都和输入x相关,而输入x是不做切割的,因此这两者也无需切割。
state_dict_for_save_checkpoint和load_state_dict。在源码注解里,这两个函数分别给出了easy load 和customize load的注释,这个注释不是很贴切。实际上,前者用于在模型训练过程中及时读取当前参数,及时保存(做checkpoint);后者则一般用于模型的重载,例如训到一半挂掉了,我们就重新初始化一个新模型,重载上个checkpoint保存下的权重。
embedding层代码如下(一切尽在注释中):
class embedding(megatronmodule):    language model embeddings.    arguments:        hidden_size: hidden size         vocab_size: vocabulary size         max_sequence_length: maximum size of sequence. this                             is used for positional embedding        embedding_dropout_prob: dropout probability for embeddings        init_method: weight initialization method        num_tokentypes: size of the token-type embeddings. 0 value                        will ignore this embedding        def __init__(        self,        hidden_size, # 每个token的向量维度        vocab_size, # 词表大小        max_sequence_length, # 最长序列长度        embedding_dropout_prob, # dropout probability for embeddings        init_method, # 初始化权重的方法        num_tokentypes=0, # 类似于bert中的segment type    ):        super(embedding, self).__init__()                args = get_args()                self.hidden_size = hidden_size        self.init_method = init_method        self.num_tokentypes = num_tokentypes        self.max_sequence_length = max_sequence_length                # we size: (vocab_size//tp_n, hidden_size)        # tp_n表示tp组模型并行度        self.word_embeddings = mpu.vocabparallelembedding(            vocab_size, self.hidden_size, init_method=self.init_method)         self._word_embeddings_key = 'word_embeddings'                    self.vocab_size = vocab_size        # pe size: (max_seq_len, hidden_size)        self.position_embeddings = torch.nn.embedding(            max_sequence_length, self.hidden_size)        self.position_embeddings = self.position_embeddings.half()        self._position_embeddings_key = 'position_embeddings'        # initialize the position embeddings.        self.init_method(self.position_embeddings.weight)        # te_size:(num_tokentypes, hidden_size)        # te类似于bert中的segment embedding        self._tokentype_embeddings_key = 'tokentype_embeddings'        if self.num_tokentypes > 0:            self.tokentype_embeddings = torch.nn.embedding(self.num_tokentypes,                                                           self.hidden_size)            # initialize the token-type embeddings.            self.init_method(self.tokentype_embeddings.weight)        else:            self.tokentype_embeddings = none        # embeddings dropout        self.embedding_dropout = torch.nn.dropout(embedding_dropout_prob)    def add_tokentype_embeddings(self, num_tokentypes):        如果在pretrain阶段未定义te,而在fine-tune阶段te,则可通过此函数添加                if self.tokentype_embeddings is not none:            raise exception('tokentype embeddings is already initialized')        if torch.distributed.get_rank() == 0:            print('adding embedding for {} tokentypes'.format(num_tokentypes),                  flush=true)        self.num_tokentypes = num_tokentypes        self.tokentype_embeddings = torch.nn.embedding(num_tokentypes,                                                       self.hidden_size)        # initialize the token-type embeddings.        self.init_method(self.tokentype_embeddings.weight)    def forward(self, input_ids, position_ids, tokentype_ids=none):        定义输入x过embedding层的计算方法                # words_embeddings size = (b, seq_len, hidden_size)        # 再次注意:self.word_embeddings做forward时,最终的输出结果时allreduce的(见上图)        words_embeddings = self.word_embeddings(input_ids)         # position_embeddings size = (b, seq_len, hidden_size)        position_embeddings = self.position_embeddings(position_ids)        # embedding = we + pe        # embedding size = (b, seq_len, hidden_size)        embeddings = words_embeddings + position_embeddings        # 依需要决定是否增加te        if tokentype_ids is not none:             assert self.tokentype_embeddings is not none            embeddings = embeddings + self.tokentype_embeddings(tokentype_ids)        else:            assert self.tokentype_embeddings is none        # dropout.        embeddings = self.embedding_dropout(embeddings)        return embeddings    def state_dict_for_save_checkpoint(        self, destination=none, prefix='', keep_vars=false,    ):        for easy load.        在模型训练过程中及时读取当前参数,方便及时保存(做checkpoint)        篇幅限制,这里不展示细节                ...    def load_state_dict(self, state_dict, strict=true):        customized load.        用于模型的重载。例如训到一半挂掉了,我们就重新初始化一个新模型,        重载上个checkpoint保存下的权重。        篇幅限制,这里不展示细节                ... 六、vocabparallelembedding 该类用于定义分布式的word embedding,整体架构如下,同样只列举了核心属性和方法:
具体代码如下,可以特别关注「初始化和forward」部分,同时建议大家阅读理论篇中关于这一过程的详细讲解(一切尽在注释中)
class vocabparallelembedding(torch.nn.module):    embedding parallelized in the vocabulary dimension.    this is mainly adapted from torch.nn.embedding and all the default    values are kept.    arguments:        num_embeddings: vocabulary size.        embedding_dim: size of hidden state.        init_method: method to initialize weights.        def __init__(self, num_embeddings, embedding_dim, init_method=init.xavier_normal_):        super(vocabparallelembedding, self).__init__()        # keep the input dimensions.        self.num_embeddings = num_embeddings # vocab_size        self.embedding_dim = embedding_dim # hidden_state.        # set the detauls for compatibility.        self.padding_idx = none        self.max_norm = none        self.norm_type = 2.0        self.scale_grad_by_freq = false        self.sparse = false        self._weight = none        # 当前进程所在tp组进程总数        self.tensor_model_parallel_size = get_tensor_model_parallel_world_size()        # 根据当前进程在tp组中的序号,确定其所需维护的we部分,沿着vocab维度对we进行切割        # 例如,进程id=0, 维护词表序号[0,5)范围内的数据;进程id=1,维护[5,10)        (            self.vocab_start_index,            self.vocab_end_index,        ) = vocabutility.vocab_range_from_global_vocab_size(            self.num_embeddings,            get_tensor_model_parallel_rank(),            self.tensor_model_parallel_size,        )        # 计算当前进程维护的词表大小        self.num_embeddings_per_partition = (            self.vocab_end_index - self.vocab_start_index        )        # 对we做初始化        args = get_args() # 读取预训练参数配置        if args.use_cpu_initialization: # cpu上做初始化            self.weight = parameter( # 在cpu上先生成一个完整的we                torch.empty(                    self.num_embeddings_per_partition,                    self.embedding_dim,                    dtype=args.params_dtype,                    # dtype=torch.float32,                )            )            # 对cpu上的we做切割(随机种子在初始化分布式中已设定好,不用变)            _initialize_affine_weight_cpu(                self.weight,                 self.num_embeddings,                 self.embedding_dim,                self.num_embeddings_per_partition,                0,                init_method, # 初始化权重的方法,例如xavier之类            )        else: # 在gpu上做初始化            self.weight = parameter( # 生成一个切割好的we                torch.empty(                    self.num_embeddings_per_partition,                    self.embedding_dim,                    device=torch.cuda.current_device(),                    dtype=args.params_dtype,                    # dtype=torch.float32,                )            )            # 在gpu上做初始化,注意tp组内不同进程采用不同的随机种子            _initialize_affine_weight_gpu(                self.weight, init_method, partition_dim=0, stride=1            )    def forward(self, input_):        定义输入x过we的计算方法,输出结果已经过allreduce        if self.tensor_model_parallel_size > 1: # 如果使用tp            # 如果在当前进程维护的we上,找不到对应的单词,那么对应位置就赋0            # 例如当前的数据的tokenid是:[2,7,1,5],当前维护的词表是[0,1,2](start_index=0, end_index = 3),            # 则mask之后的数据为[2,0,1,0]            # build the mask.            input_mask = (input_ = self.vocab_end_index            )            # mask the input.            masked_input = input_.clone() - self.vocab_start_index            masked_input[input_mask] = 0        else:            masked_input = input_                    # 输入x,过当前进程维护的部分we的结果        output_parallel = f.embedding(            masked_input, # tensor containing indices into the embedding matrix            self.weight, # 切割好的word embedding的权重            self.padding_idx,            self.max_norm,            self.norm_type,            self.scale_grad_by_freq,            self.sparse,        )        # 当前词表不维护的部分,都设为0        if self.tensor_model_parallel_size > 1:            output_parallel[input_mask, :] = 0.0 #                # 将tp组各gpu上的结果做allreduce        output = reduce_from_tensor_model_parallel_region(output_parallel)        return outputdef _initialize_affine_weight_cpu(...):    cpu版权重初始化。这个不难,大家可以自己阅读    ...    def _initialize_affine_weight_gpu(...):    gpu版权重初始化。特别关注设置随机种子部分    ...    # 借助deepspeed或自定义的get_cuda_rng_tracker方法,对随机种子进行操作    # get_cuda_rng_tracker细节,大家可自行阅读源码    if ds_checkpointing.is_configured():        global get_cuda_rng_tracker        get_cuda_rng_tracker = ds_checkpointing.get_cuda_rng_tracker    with get_cuda_rng_tracker().fork():         init_method(weight) 七、parallelselfattention:分布式block的一般套路 【阅读提示】:阅读本节时可:
对照第一部分codegeex框架图 对照megatron理论篇对矩阵切分的讲解 首先来看切割attention的示意图,由图可知,「对qkv矩阵,采用“列切割”,对线性矩阵b,采用“行切割”」。这样设计的好处是,在经过qkv的计算后,各进程在不用通讯的前提下,继续做线性计算,直到最后一步才allreduce,起到降低通讯成本的作用:
我们先单独来看“列切割”与“行切割”的实现代码。megatron将它们定义成了两个nn.module类。
7.1 列切割:columnparallellinear 列切割示意图如下:
f和g是两个共轭算子,可理解为两个torch.autograd.function类。在这个类下,我们可以「根据需要重写forward和backward方法」。
f: 「forward中,直接copy输入;backward中,对梯度做allreduce」。在代码里定义为class _copytomodelparallelregion(torch.autograd.function)
g: 「forward中,all-gather输出;backward中,对梯度做split」(每张卡经过all-gather已有完整的y了,因此以y为起点计算梯度后,沿着列做split就可得到y1和y2的梯度)。在代码里定义为class _gatherfrommodelparallelregion(torch.autograd.function)
class columnparallellinear(torch.nn.module):    linear layer with column parallelism.    the linear layer is defined as y = xa + b. a is parallelized along    its second dimension as a = [a_1, ..., a_p].    arguments:        input_size: first dimension of matrix a.         output_size: second dimension of matrix a.         bias: if true, add bias        gather_output: if true, call all-gether on output and make y avaiable                       to all gpus, otherwise, every gpu will have its output                       which is y_i = xa_i         init_method: method to initialize weights. note that bias is always set                     to zero.        stride: for the strided linear layers.        keep_master_weight_for_test: this was added for testing and should be                                     set to false. it returns the master weights                                     used for initialization.        skip_bias_add: this was added to enable performance optimations where bias                       can be fused with other elementwise operations. we skip                       adding bias but instead return it.         # 该类定义了切割后的权重w,例如对上图来说,w1和w2都可分别视为该类的一个实例    def __init__(        self,        input_size, # w的第一个维度        output_size, # w的第二个维度        bias=true, # 是否需要引入bias        gather_output=true, # 决定是否要将y1和y2做all-gather        init_method=init.xavier_normal_,        stride=1,        keep_master_weight_for_test=false,        skip_bias_add=false,        params_dtype=none,        skip_init=false,        device=none,    ):        super(columnparallellinear, self).__init__()        # keep input parameters        self.input_size = input_size         self.output_size = output_size         self.gather_output = gather_output         # divide the weight matrix along the last dimension.        # 当前进程所在tp组的总进程数        world_size = get_tensor_model_parallel_world_size()        # 每块gpu上维护的hidden_size的大小,等于 原hidden_zize // tp组总进程数        self.output_size_per_partition = divide(output_size, world_size)         self.skip_bias_add = skip_bias_add         self.params_dtype = params_dtype         self.device = device         # parameters.        # note: torch.nn.functional.linear performs xa^t + b and as a result         # initialize weight.        args = get_args() # 取得命令行所有的参数        if not skip_init:             if args.use_cpu_initialization: # cpu上初始化                self.weight = parameter(                      torch.empty(                        self.output_size_per_partition,                        self.input_size,                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )                self.master_weight = _initialize_affine_weight_cpu( #                     self.weight,                    self.output_size,                    self.input_size,                    self.output_size_per_partition,                    0,                    init_method,                    stride=stride,                    return_master_weight=keep_master_weight_for_test,                )            else: # gpu上初始化                self.weight = parameter(                     torch.empty(                        self.output_size_per_partition,                        self.input_size,                        device=self.device if self.device is not none else torch.cuda.current_device(),                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )                _initialize_affine_weight_gpu(                     self.weight, init_method, partition_dim=0, stride=stride                )        else:            self.register_parameter(weight, none)        # 对bias做处理,道理同weight        if bias and not skip_init:             if args.use_cpu_initialization: # cpu上初始化                self.bias = parameter(                    torch.empty(self.output_size_per_partition,                                 dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype)                )             else:                 self.bias = parameter( # gpu上初始化                    torch.empty(                        self.output_size_per_partition,                        device=self.device if self.device is not none else torch.cuda.current_device(),                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )                        set_tensor_model_parallel_attributes(self.bias, true, 0, stride)             # always initialize bias to zero.             with torch.no_grad():                self.bias.zero_()        else:            self.register_parameter(bias, none)    def forward(self, input_):        # 定义列切割中的f算子        # 调用copy_to_tensor_model_parallel_region则新建一个_copytomodelparallelregion实例(见下)        input_parallel = copy_to_tensor_model_parallel_region(input_)        bias = self.bias if not self.skip_bias_add else none # 定义bias        output_parallel = f.linear(input_parallel, self.weight, bias) # x * 切割好的权重        # 决定是否要对每个进程上的输出结果做all-reduce        if self.gather_output:            # 定义列切割中的g算子            # 调用gather_from_tensor_model_parallel_region则新建一个_gatherfrommodelparallelregion实例(见下)            output = gather_from_tensor_model_parallel_region(output_parallel) # 把各gpu上的输出按照列gather起来后,作为最终输出        else:            output = output_parallel # 否则最终输出还是自己算的那块gpu        output_bias = self.bias if self.skip_bias_add else none        return output, output_bias# 列切割中的f与gclass _copytomodelparallelregion(torch.autograd.function):    pass the input to the model parallel region.    # 列切割下的f算子    # forward:copy输入    # backward:对梯度做allreduce    @staticmethod    def symbolic(graph, input_):        return input_    @staticmethod    def forward(ctx, input_):        return input_    @staticmethod    def backward(ctx, grad_output):        return _reduce(grad_output)class _gatherfrommodelparallelregion(torch.autograd.function):    gather the input from model parallel region and concatinate.    # 列切割中的g算子    # forward:all-gather输出    # backward:对梯度,沿着列方向做split    @staticmethod    def symbolic(graph, input_):        return _gather(input_)    @staticmethod    def forward(ctx, input_):        return _gather(input_)    @staticmethod    def backward(ctx, grad_output):        return _split(grad_output) 7.2 行切割:rowparallellinear f: forward中,按列split输入;backward中,all-gather梯度 g: forward中,allreduce输出;backward中,直接输出梯度,无需做任何通讯(因为经过g的foward,每块gpu上已拥有了yi和y,则根据图中g的backward公式可知,每块gpu可独立计算梯度) 代码如下:
class rowparallellinear(torch.nn.module):    linear layer with row parallelism.    the linear layer is defined as y = xa + b. a is parallelized along    its first dimension and x along its second dimension as:               -   -              | a_1 |              | .   |          a = | .   |        x = [x_1, ..., x_p]              | .   |              | a_p |               -   -    arguments:        input_size: first dimension of matrix a.        output_size: second dimension of matrix a.        bias: if true, add bias. note that bias is not parallelized.        input_is_parallel: if true, we assume that the input is already                           split across the gpus and we do not split                           again.        init_method: method to initialize weights. note that bias is always set                     to zero.        stride: for the strided linear layers.        keep_master_weight_for_test: this was added for testing and should be                                     set to false. it returns the master weights                                     used for initialization.        skip_bias_add: this was added to enable performance optimations where bias                       can be fused with other elementwise operations. we skip                       adding bias but instead return it.        def __init__(        self,        input_size,        output_size,        bias=true,        input_is_parallel=false,        init_method=init.xavier_normal_,        stride=1,        keep_master_weight_for_test=false,        skip_bias_add=false,        params_dtype=none,        skip_init=false,        device=none,    ):        super(rowparallellinear, self).__init__()        # keep input parameters        self.input_size = input_size        self.output_size = output_size        self.input_is_parallel = input_is_parallel        # divide the weight matrix along the last dimension.        world_size = get_tensor_model_parallel_world_size()        self.input_size_per_partition = divide(input_size, world_size)        self.skip_bias_add = skip_bias_add        self.params_dtype = params_dtype        self.device = device                # parameters.        # note: torch.nn.functional.linear performs xa^t + b and as a result        # we allocate the transpose.        # initialize weight.        args = get_args()        if not skip_init:            if args.use_cpu_initialization:                self.weight = parameter(                    torch.empty(                        self.output_size,                        self.input_size_per_partition,                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )                self.master_weight = _initialize_affine_weight_cpu(                    self.weight,                    self.output_size,                    self.input_size,                    self.input_size_per_partition,                    1,                    init_method,                    stride=stride,                    return_master_weight=keep_master_weight_for_test,                )            else:                self.weight = parameter(                    torch.empty(                        self.output_size,                        self.input_size_per_partition,                        device=self.device if self.device is not none else torch.cuda.current_device(),                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )                _initialize_affine_weight_gpu(                    self.weight, init_method, partition_dim=1, stride=stride                )        else:            self.register_parameter(weight, none)                    if bias and not skip_init:            if args.use_cpu_initialization:                self.bias = parameter(                    torch.empty(self.output_size,                                 dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype)                )            else:                self.bias = parameter(                    torch.empty(                        self.output_size,                        device=self.device if self.device is not none else torch.cuda.current_device(),                        dtype=self.params_dtype if self.params_dtype is not none else args.params_dtype,                    )                )            # always initialize bias to zero.            with torch.no_grad():                self.bias.zero_()        else:            self.register_parameter(bias, none)    def forward(self, input_):        # set up backprop all-reduce.        if self.input_is_parallel:            input_parallel = input_        else:            input_parallel = scatter_to_tensor_model_parallel_region(input_)        # matrix multiply.        output_parallel = f.linear(input_parallel, self.weight)        # all-reduce across all the partitions.        output_ = reduce_from_tensor_model_parallel_region(output_parallel)        if not self.skip_bias_add:            output = output_ + self.bias if self.bias is not none else output_            output_bias = none        else:            output = output_            output_bias = self.bias        return output, output_bias# 行切割中的f和g算子class _scattertomodelparallelregion(torch.autograd.function):    split the input and keep only the corresponding chuck to the rank.    # 行切割中的f算子    # forward:沿列split输入    # backward:all-gather梯度    @staticmethod    def symbolic(graph, input_):        return _split(input_)    @staticmethod    def forward(ctx, input_):        return _split(input_)    @staticmethod    def backward(ctx, grad_output):        return _gather(grad_output) class _reducefrommodelparallelregion(torch.autograd.function):    all-reduce the input from the model parallel region.    # 行切割中的g算子    # forward:allreduce输出    # backward:正常计算梯度,gpu间无需做任何通讯    @staticmethod    def symbolic(graph, input_):        return _reduce(input_)    @staticmethod    def forward(ctx, input_):        return _reduce(input_)    @staticmethod    def backward(ctx, grad_output):        return grad_output 7.3 parallelselfattention 该类的构造如下图:
这张图中透露的核心含义是,「每个进程上维护的都是按列切割完的qkv矩阵」,进程间独立计算,qkv矩阵的输出结果一般不做allreduce。同时,「每个进程上维护的是按行切割完的dense(线型层)矩阵」,attention输出过线性层后的结果,做allreduce。另外,在设置attention_dropout时,同样调用了get_cuda_rng_tracker 方法,令tp组内的进程拥有不同的随机种子。「最后,你可能想问,dense后的dropout去哪里了」?代码里把它定义到了paralleltransformerlayer 下(等于attention + mlp)。
相信有了上面的说明,看这块代码就不难了。篇幅限制,这里不展示代码了。大家可以对照着codegeex架构图,来看这里multi-head attention的计算方式。
parallelmlp,paralleltransformerlayer和paralleltransformer都采用的是一样的套路,也略过不言。
八、crossentropy 现在,终于可以来看模型的最后一层:交叉熵的平行计算。核心类为_vocabparallelcrossentropy
我们在原理篇中讲过交叉熵的并行计算,其优化核心是将通讯量从b*s*v降至b*s。但是megatron代码中定义的交叉熵计算方式,稍微复杂一些,也和我们一般理解的交叉熵有些许差异。所以我们先用图解,来看下代码定义的交叉熵计算流程:
【注】:
对x和y_i来说,(b, s, h)维度下应该画成一个立方体,为了表达简练,这里将b拍平了。 对其余维度中含b的矩阵,b正常表示,即row=b 8.1 计算logit 首先,在使用_vocabparallelcrossentropy 计算交叉熵前,我们需要计算logit。这时我们调用parallel_lm_logits 函数,将模型最后一层的输出x(复习一下,这个x已经在tp组内allreduce了),乘上当前进程上维护的输入层we的转置(复习一下,输入层和输出层共用一套embedding),得到当前进程的logit y_i,「同时我们选择不对输出logit做allreduce」。
你可能会有一个疑惑:「在transformer中,输出层会额外训练一个线性矩阵,来计算logit;为什么在gpt中,可以用输入层we的转置来代替这个线性矩阵?」
这个问题的答案,对理解megatron交叉熵计算也至关重要。我们可「将x*we^t结果理解成“x与we间的相似度”」,例如对y1来说,它的第一行中的每个logit,表示第一个token与词表里每个词的相似度。
注意到每个进程上只维护部分we。例如,假设词表共有5个单词,we1维护前5个单词,we2维护后5个单词。因此再严格来说:「对y1,它的第一行中的每个logit,表示第一个token与词表中前5个词的相似度;对y2,它的第一行中的每个logit,表示第一个token与词表中后5个词的相似度。我们要记住这个含义。」
8.2 计算交叉熵 知道了logit的含义,我们来看交叉熵计算。
首先做了一系列求max的计算,得到基于全局的max(logit),再将orig_logit - max(logit),得到处理后的结果。这步理解起来不难,主要目的是为了防止计算溢出。
「接下来,就是基于logit算loss了。」
每个进程上都有一份(b, s)维度的真值,它表示每个token的真值是哪个词(词用id表示)。我们基于这份真值,在y_i上找出真值位置的logit。例如:seq_length = 3,即我们需要对3个token去做预测,假设前两个token的真值在第1个进程所维护的we1中,最后一个token的真值在第2个进程所维护的we2中。那么我们去y1的前两行里,取出真值位置的logit,这个logit表示“token与真值的相似度”,去y2的最后一行里做同样操作。
这样,我们就能得到l1和l2,和真值位置不对应的地方,统一填充0。随后对l1和l2做allreduce,得到l。「l中的每行表示“token与真值间的相似度」
现在,我们回来对y1和y2的每一行求sum(e^logit),得到e1和e2。将e1和e2做allreduce,得到e。「e中的每行表示“token和词表中所有词相似度的总和”」
我们希望「(token和词表中所有词相似度的总和-token与真值间的相似度) /token和词表中所有词相似度的总和」这个值最小,这个差值就是最终的loss。
8.3 代码 理清了这点,现在可以来看代码了(一切尽在注释中),建议对这块还有疑问的朋友,可以写个test脚本把中间结果打印出来,方便理解:
class _vocabparallelcrossentropy(torch.autograd.function):        分布式计算loss            @staticmethod    def forward(ctx, vocab_parallel_logits, target):        # 1. logit - global max(logit)操作,主要目的是防溢出        logits_max = torch.max(vocab_parallel_logits, dim=-1)[0] # (b, s, 1)        torch.distributed.all_reduce( # (b, s, 1)            logits_max,            op=torch.distributed.reduceop.max, # 找全局最大值            group=get_tensor_model_parallel_group(),        )        # subtract the maximum value.         vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1)) # 原始gpu上维护的logits减去每行最大值(防止溢出)        # 2、根据当前进程id,取出当前进程所维护词表序号等信息        # 函数,能够获取当前进程所维护词表的start_index和end_index        get_vocab_range = vocabutility.vocab_range_from_per_partition_vocab_size         # 这块gpu上logits最后一维的大小,等于所维护的词表的大小(v/n)        partition_vocab_size = vocab_parallel_logits.size()[-1]        # 取得当前进程所在tp组中的序号        rank = get_tensor_model_parallel_rank()        # 取得当前进程所在tp组的总进程数        world_size = get_tensor_model_parallel_world_size()        # 取得当前进程所维护的词表的start_index和end_index         vocab_start_index, vocab_end_index = get_vocab_range(             partition_vocab_size, rank, world_size        )        # 3. 基于真值,取出每个token在真值位置上的logit(即和真值的相似度)        # create a mask of valid vocab ids (1 means it needs to be masked)        target_mask = (target = vocab_end_index) # target = (b, s)        masked_target = target.clone() - vocab_start_index        masked_target[target_mask] = 0        # get predicted-logits = logits[target].        # for simplicity, we convert logits to a 2-d tensor with size        # [*, partition-vocab-size] and target to a 1-d tensor of size [*].        logits_2d = vocab_parallel_logits.view(-1, partition_vocab_size) # (b*s, v/n)        masked_target_1d = masked_target.view(-1) # (b*s)        arange_1d = torch.arange( # [b*s]            start=0, end=logits_2d.size()[0], device=logits_2d.device        )        # logits_2d[arange_1d, masked_target_1d]:         # tensor的切片操作。arange_1d:取出所有的行。masked_target_1d:取出logit        predicted_logits_1d = logits_2d[arange_1d, masked_target_1d] # (b*s)        predicted_logits_1d = predicted_logits_1d.clone().contiguous()        predicted_logits = predicted_logits_1d.view_as(target) # (b, s)        predicted_logits[target_mask] = 0.0        # all reduce is needed to get the chunks from other gpus.        torch.distributed.all_reduce( # allreduce之后得到的logit矩阵为(b, s),每一个位置表示对应真值位置的预测logit            predicted_logits,            op=torch.distributed.reduceop.sum,            group=get_tensor_model_parallel_group(),        )        # sum of exponential of logits along vocab dimension across all gpus.        exp_logits = vocab_parallel_logits # (b, s, v/n)        torch.exp(vocab_parallel_logits, out=exp_logits)        sum_exp_logits = exp_logits.sum(dim=-1) # (b, s)        torch.distributed.all_reduce(            sum_exp_logits,            op=torch.distributed.reduceop.sum,            group=get_tensor_model_parallel_group(),        )        # 4. 计算loss = log(sum(exp(logits))) - predicted-logit.        loss = torch.log(sum_exp_logits) - predicted_logits # (b, s)        # store softmax, target-mask and masked-target for backward pass.        exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))        ctx.save_for_backward(exp_logits, target_mask, masked_target_1d)        return loss    @staticmethod    def backward(ctx, grad_output):        # retreive tensors from the forward path.        softmax, target_mask, masked_target_1d = ctx.saved_tensors        # all the inputs have softmax as their gradient.        grad_input = softmax        # for simplicity, work with the 2d gradient.        partition_vocab_size = softmax.size()[-1]        grad_2d = grad_input.view(-1, partition_vocab_size)        # add the gradient from matching classes.        arange_1d = torch.arange(start=0, end=grad_2d.size()[0], device=grad_2d.device)        grad_2d[arange_1d, masked_target_1d] -= 1.0 - target_mask.view(-1).float()        # finally elementwise multiplication with the output gradients.        grad_input.mul_(grad_output.unsqueeze(dim=-1))        return grad_input, none 九、总结 啊这总结怎么写呢,呕心沥血终于写完了。希望能给到大家帮助!

德国工业4.0子体系统与工业互联网解析
胶囊生产设备远程维护解决方案,在家也能控制设备
机器视觉检测的概念及应用优势
KINAX角位变送器在水位控制的应用
孙丕恕:工业互联网必将为实体经济振兴、为中国经济腾飞插上翅膀
图解大模型训练之:Megatron源码解读2,模型并行
SWM32S基于GT9157的触摸芯片驱动
均衡电流,实现车规智能驱动器的最佳性能
酷派老将遭遇乐视清洗 众高管离职加盟ivvi
微软MR原型创作工具Maquette:针对PC VR头显,能在VR中创作3D内容
梁孟松:借助新封装技术获得高性能芯片
关于高频淬火的常见缺陷,它的产生原因是什么
马化腾马云布局VR产业 VR技术基础有望在5G时代迎来质变
电动汽车特斯拉股价大跌8.6% 马斯克再次跌落世界首富宝座
声发射的基本原理、特点及应用
双声道d类音频功放芯片的详细介绍
PESD1LIN供应商 杭州东沃电子
一起来了解Self-test的重要性!
黑鲨2Pro 8GB+128GB版开启预售 售价2699元
奥迪一汽新能源汽车有限公司CIO等一行到访启明信息调研