# 引入必要的库 import os import sys import torch import logging import math import numpy as np from typing import Dict import transformers from transformers import ( AutoConfig, AutoTokenizer, LlamaForCausalLM, LlamaTokenizer, Trainer, DataCollatorWithPadding, AutoModelForCausalLM, BitsAndBytesConfig, ) # 将上级目录添加到系统路径中,这样可以引用上级目录中的模块 sys.path.append("..") # 引入自定义模块,包括模型配置、任务类型定义等 from peft import LoraConfig, PeftModel, TaskType, get_peft_model from pathlib import Path from datasets import load_dataset, concatenate_datasets from itertools import chain from utils.parser_args import parser_arguments from utils.metrics import compute_metrics_for_pair from utils.trainer import PeftTrainer, RMPeftTrainer from trl import AutoModelForCausalLMWithValueHead from utils.data_collator import PairDataCollatorWithPadding from utils.utils import PROMPT_TEMPLATE # 设置日志记录器 logger = logging.getLogger(__name__) # 定义一个忽略索引常量,通常用于计算交叉熵时忽略某些特定的标签 IGNORE_INDEX = -100 # 定义模型类别的字典,便于后续根据类型创建模型和分词器 MODEL_CLASSES = { "llama": (AutoConfig, LlamaTokenizer, LlamaForCausalLM), "auto": (AutoConfig, AutoTokenizer, AutoModelForCausalLM), } # 打印模型的可训练参数数量的函数 def print_trainable_params(model: torch.nn.Module) -> None: # 引用自:https://github.com/LLaMA-Efficient-Tuning-main/src/utils/other.py # 用于计算和打印模型可训练参数和总参数的数量 trainable_params, all_param = 0, 0 for param in model.parameters(): num_params = param.numel() # 如果使用了DS Zero 3并且权重初始化为空 if num_params == 0 and hasattr(param, "ds_numel"): num_params = param.ds_numel all_param += num_params if param.requires_grad: trainable_params += num_params print(f"可训练参数数量: {trainable_params} || 总参数数量: {all_param} || 可训练参数百分比: {100 * trainable_params / all_param:.4f}") # 创建模型的函数 def create_model(model_args, data_args, training_args): # 加载模型配置、分词器、模型类 config_class, tokenizer_class, model_class = MODEL_CLASSES[model_args.model_type] # 如果没有提供分词器的路径,则从预训练模型路径加载分词器 if model_args.tokenizer_name_or_path is None: tokenizer = tokenizer_class.from_pretrained(model_args.model_name_or_path, use_fast=model_args.use_fast_tokenizer) else: tokenizer = tokenizer_class.from_pretrained(model_args.tokenizer_name_or_path, use_fast=model_args.use_fast_tokenizer) # 设置pad的token id,如果分词器中没有设置pad_token_id,则默认为0 tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id # 定义模型配置参数 config_kwargs = { "trust_remote_code": True, "torch_dtype": model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype), "low_cpu_mem_usage": True, } # 如果需要以4bit加载模型,设置相关配置 if model_args.load_in_4bit: config_kwargs["load_in_4bit"] = True config_kwargs["quantization_config"] = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", ) # 从预训练模型加载 model = model_class.from_pretrained( pretrained_model_name_or_path=model_args.model_name_or_path, from_tf=bool(".ckpt" in model_args.model_name_or_path), **config_kwargs ) # 如果提供了预训练模型路径,加载预训练模型 if model_args.peft_path is not None: logger.info(f"加载预训练模型: {model_args.peft_path}") model = PeftModel.from_pretrained(model, model_args.peft_path, is_trainable=True) else: logger.info("初始化新的PEFT模型") # 配置LoRA(Low-Rank Adaptation)的参数 lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, inference_mode=False, # 推理模式,这里设置为False,意味着在训练模式 target_modules=training_args.lora_target.split(','), r=training_args.lora_rank, # LoRA的秩(rank),影响参数数量和模型容量 lora_alpha=training_args.lora_alpha, # LoRA的alpha参数,用于调节LoRA适配器的学习速率 lora_dropout=training_args.lora_dropout, # LoRA适配器中使用的dropout比率 ) # 使用LoRA配置获取PEFT模型 # PEFT模型在保持预训练模型性能的同时,通过只更新LoRA适配器中的参数,减少了整体的参数量,这使得模型更适合于特定任务的微调,同时计算效率更高。 model = get_peft_model(model, peft_config=lora_config) # 从预训练模型加载含有值头(value head)的因果语言模型 # AutoModelForCausalLMWithValueHead是一个特殊的模型类型,用于强化学习或任务中需要同时预测动作和估计值(如价值函数)的情况。 # 这类模型结合了因果语言模型(Causal Language Model)的功能和额外的“值头”(Value Head),不仅生成下一个可能的词或序列(标准的语言模型功能), # 还输出一个额外的值,这个值通常用来估计某个状态或动作的预期回报。这种结构在强化学习中特别有用,其中模型需要评估其行动的长期效益。 # 这种模型的使用,特别是在结合了PEFT和LoRA适配器的情况下,可能是为了提高模型在特定任务上的性能,同时确保模型的决策是在考虑长期回报的基础上进行的。 # 这种方法在处理复杂决策问题时非常有效,尤其是在需要考虑未来回报的策略学习中。 # 加权组合损失: Loss=λ×LM Loss+(1−λ)×Value Head Loss model = AutoModelForCausalLMWithValueHead.from_pretrained(model) # 如果提供了预训练模型路径,并且需要加载adapter模型 if model_args.peft_path is not None: lora_state_dict = torch.load(os.path.join(model_args.peft_path, 'adapter_model.bin')) model.v_head.load_state_dict({ "summary.weight": lora_state_dict["v_head.summary.weight"], "summary.bias": lora_state_dict["v_head.summary.bias"], }) # 打印模型信息 print('*********************模型*******************') print_trainable_params(model) # 启用梯度检查点(可节省内存) model.gradient_checkpointing_enable() # 设置config.use_cache为False,这通常用于禁用transformers库中的某些缓存机制 model.config.use_cache = False return model, tokenizer # 处理数据的函数 def process_data(model_args, data_args, training_args, tokenizer): # 使用分词器对数据进行预处理和分词 def process_tokenize(examples): # 初始化模型输入 model_inputs = {"input_ids": [], "label_ids": []} # 获取数据列的名称 columns = list(examples.keys()) # logger.info(f"列名称: {columns}") # 根据数据参数获取提示模板 template = PROMPT_TEMPLATE[data_args.template] # 遍历每一个示例 for index in range(len(examples[columns[0]])): # 检查数据中是否包含特定的列 if 'chosen' not in columns or 'rejected' not in columns: # 断言数据中必须包含instruction、input和output这三个列 assert 'instruction' in columns and 'input' in columns and 'output' in columns # 获取instruction、input和output的内容 instruction, input, output = examples['instruction'][index], examples['input'][index], examples['output'][index] # 如果input非空,则将其添加到instruction中 if input is not None and input != "": instruction = instruction + '\n' + input # 确保output的长度大于1 assert len(output) > 1 # 分别获取prompt、chosen和rejected的内容 prompt, chosen, rejected = instruction, output[0], output[1] else: # 确保数据中包含prompt、rejected和chosen这三个列 assert 'prompt' in columns and 'rejected' in columns and 'chosen' in columns prompt, chosen, rejected = examples['prompt'][index], examples['chosen'][index], examples['rejected'][index] # 使用模板格式化prompt source = template.format_map({'instruction': prompt}) # 使用分词器编码source、chosen和rejected source_ids = tokenizer.encode(text=source, add_special_tokens=False) accepts_ids = tokenizer.encode(text=chosen, add_special_tokens=False) rejects_ids = tokenizer.encode(text=rejected, add_special_tokens=False) # 如果编码后的长度超过最大长度,进行截断 if len(source_ids) > training_args.max_prompt_length - 1: source_ids = source_ids[:training_args.max_prompt_length - 1] if len(accepts_ids) > training_args.max_response_length - 1: accepts_ids = accepts_ids[:training_args.max_response_length - 1] if len(rejects_ids) > training_args.max_response_length - 1: rejects_ids = rejects_ids[:training_args.max_response_length - 1] # 构造接受和拒绝的序列及其对应的标签 source_accepts_ids = source_ids + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id] source_accepts_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id] source_rejects_ids = source_ids + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id] source_rejects_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id] # 计算接受和拒绝序列的长度,并找到最大长度 source_accepts_length, source_rejects_length = len(source_accepts_ids), len(source_rejects_ids) max_length = max(source_accepts_length, source_rejects_length) # 填充序列到最大长度 source_accepts_ids = source_accepts_ids + [tokenizer.pad_token_id] * (max_length - source_accepts_length) source_accepts_labels = source_accepts_labels + [IGNORE_INDEX] * (max_length - source_accepts_length) source_rejects_ids = source_rejects_ids + [tokenizer.pad_token_id] * (max_length - source_rejects_length) source_rejects_labels = source_rejects_labels + [IGNORE_INDEX] * (max_length - source_rejects_length) # 合并接受和拒绝的序列以及标签 inputs_ids = source_accepts_ids + source_rejects_ids labels = source_accepts_labels + source_rejects_labels # 将处理后的序列和标签添加到模型输入中 model_inputs["input_ids"].append(inputs_ids) model_inputs["label_ids"].append(labels) return model_inputs # 处理数据集 logger.info("处理数据集") with training_args.main_process_first(desc="处理数据集"): # 如果指定了数据集目录 if data_args.dataset_dir is not None: all_datasets = [] path = Path(data_args.dataset_dir) files = [file.name for file in path.glob("*.json")] for file in files: data_path = os.path.join(path, file) # 从json文件加载数据集 raw_dataset = load_dataset( "json", data_files=data_path, ) columns = list(raw_dataset.column_names.values())[0] # 使用分词函数处理数据集 tokenized_data = raw_dataset.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True ) # 将处理后的数据集添加到列表中 all_datasets.append(tokenized_data['train']) # 如果只有一个数据集,则直接使用,否则将多个数据集合并 if len(all_datasets) == 1: all_datasets = all_datasets[0] else: all_datasets = concatenate_datasets(all_datasets) # 将数据集分割为训练集和测试集 all_datasets = all_datasets.train_test_split(test_size=data_args.split_ratio) # 如果指定了训练文件和验证文件的路径 elif data_args.train_file is not None and data_args.validation_file is not None: all_datasets = {} # 从json文件加载训练数据集 raw_train_datasets = load_dataset( "json", data_files=data_args.train_file, cache_dir=data_args.data_cache_dir ) columns = list(raw_train_datasets.column_names.values())[0] # 使用分词函数处理训练数据集 all_datasets['train'] = raw_train_datasets.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True )['train'] # 从json文件加载验证数据集 raw_valid_datasets = load_dataset( "json", data_files=data_args.validation_file, cache_dir=data_args.data_cache_dir ) # 使用分词函数处理验证数据集 all_datasets['test'] = raw_valid_datasets.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True )['train'] else: # 如果数据集文件路径不正确,则抛出错误 raise ValueError( "数据集文件路径不正确。 " "您可以提供 --dataset_dir 或提供两个文件 --train_file 和 --validation_file。 " ) return all_datasets def main(): # 主函数入口,解析参数,创建模型,处理数据,进行训练等 # 解析命令行参数 model_args, data_args, training_args = parser_arguments(logger) # 设置随机种子以保证实验的可重复性 transformers.set_seed(training_args.seed) # 创建模型和分词器 model, tokenizer = create_model(model_args, data_args, training_args) # 处理数据 all_datasets = process_data(model_args, data_args, training_args, tokenizer) # 创建训练器,并传入模型、训练参数、数据集等 trainer = RMPeftTrainer( model=model, args=training_args, train_dataset=all_datasets['train'] if training_args.do_train else None, eval_dataset=all_datasets['test'] if training_args.do_eval else None, tokenizer=tokenizer, data_collator=PairDataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=compute_metrics_for_pair, ) # 如果设置为训练模式 if training_args.do_train: # 开始训练 output = trainer.train() # 记录训练指标 trainer.log_metrics("train", output.metrics) # 保存训练指标 trainer.save_metrics("train", output.metrics) # 保存模型和训练器的状态 trainer.save_state() trainer.save_model() # 程序入口 if __name__ == "__main__": main() class RMPeftTrainer(PeftTrainer): ... def compute_loss(self, model, inputs, return_outputs=False): # 进行模型的前向传播,计算接受情况(accepts)的因果语言模型(Causal Language Model, CLM)损失和价值估计(value) _, accepts_clm_loss, accepts_value = model( input_ids=inputs["accepts_input_ids"], attention_mask=inputs["accepts_attention_mask"], labels=inputs["accepts_labels"], return_dict=True ) # 只计算拒绝情况(rejects)的价值估计 _, _, rejects_value = model( input_ids=inputs["rejects_input_ids"], attention_mask=inputs["rejects_attention_mask"], return_dict=True ) # 在训练涉及奖励模型(reward model)的场景中,接受情况(accepts)和拒绝情况(rejects)计算内容的差异主要是因为他们在模型学习目标中扮演不同的角色。 # 在强化学习或任何基于决策的模型训练中,区分“好”的行为和“坏”的行为是至关重要的。接受情况通常涉及模型应当增强或优化的行为,而拒绝情况涉及应当避免的行为。 # 因此,训练中两者的处理方式和计算内容不同,以确保模型能准确学习到哪些行为会带来正面的奖励,哪些会带来负面的结果。 # 接受情况不仅需要计算值(value),也要计算因果语言模型(CLM)的损失,因为接受情况通常是正向样本,模型需要准确预测其结果,并根据这些结果调整其行为。 # 相比之下,拒绝情况主要关注于值的预测,因为我们关心的是模型评估其为不良选择的能力。在某些设计中,这意味着我们可能不需要从CLM损失中学习太多关于拒绝情况的信息,而更关注于如何通过值预测来避免这些情况。 # 通常,接受情况更重要,因为它们直接关联到模型在实际应用中所追求的目标(如生成合适的响应、做出正确的决策等)。拒绝情况虽然重要,但在许多情况下它们的作用是辅助性的,主要用来提供一个对比,帮助模型学习避免不良结果。 # 获取接受和拒绝标签 accepts_labels, rejects_labels = inputs["accepts_labels"], inputs["rejects_labels"] # 生成行动掩码,以区分有效数据点和忽略索引(IGNORE_INDEX) accepts_action_masks = accepts_labels.ne(IGNORE_INDEX).long() rejects_action_masks = rejects_labels.ne(IGNORE_INDEX).long() # 使用行动掩码过滤价值估计,忽略无效的标签位置 accepts_value = accepts_value * accepts_action_masks rejects_value = rejects_value * rejects_action_masks # 计算每个批次的大小 batch_size = accepts_value.shape[0] # 计算有效输入的长度,以排除填充的部分 accepts_seq_lengths = (torch.ne(inputs["accepts_input_ids"], self.tokenizer.pad_token_id).sum(-1) - 1).to(accepts_value.device) rejects_seq_lengths = (torch.ne(inputs["rejects_input_ids"], self.tokenizer.pad_token_id).sum(-1) - 1).to(rejects_value.device) # 提取每个序列最后一个有效token的价值估计 accepts_end_token_value = accepts_value[torch.arange(batch_size, device=accepts_value.device), accepts_seq_lengths] rejects_end_token_value = rejects_value[torch.arange(batch_size, device=rejects_value.device), rejects_seq_lengths] # 根据设置选择使用最后一个奖励还是整体奖励来计算loss1 if self.args.use_last_reward: # 使用最后一个token的价值估计来计算log-sigmoid损失 loss1 = -torch.nn.functional.logsigmoid(accepts_end_token_value - rejects_end_token_value).mean() else: # 使用整个序列的价值估计来计算log-sigmoid损失 loss1 = -torch.nn.functional.logsigmoid(accepts_value - rejects_value).mean() # 计算因果语言模型的损失权重 loss2 = self.args.clm_loss_weight * accepts_clm_loss # 合并两部分损失 loss = loss1 + loss2 # 准备输出,包括每种情况的价值估计 outputs = dict( accepts_end_token_value=accepts_end_token_value, # shape: (batch_size,) rejects_end_token_value=rejects_end_token_value, # shape: (batch_size,) ) # 根据参数选择返回损失值还是损失值和输出 return (loss, outputs) if return_outputs else loss
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter