# 引入必要的库 import os import sys import torch import logging import math import numpy as np from typing import Dict import transformers from transformers import ( AutoConfig, AutoTokenizer, LlamaForCausalLM, LlamaTokenizer, Trainer, DataCollatorWithPadding, AutoModelForCausalLM, BitsAndBytesConfig, ) # 将上级目录添加到系统路径中,这样可以引用上级目录中的模块 sys.path.append("..") # 引入自定义模块,包括模型配置、任务类型定义等 from peft import LoraConfig, PeftModel, TaskType, get_peft_model from pathlib import Path from datasets import load_dataset, concatenate_datasets from itertools import chain from utils.parser_args import parser_arguments from utils.metrics import compute_metrics_for_pair from utils.trainer import PeftTrainer, RMPeftTrainer from trl import AutoModelForCausalLMWithValueHead from utils.data_collator import PairDataCollatorWithPadding from utils.utils import PROMPT_TEMPLATE # 设置日志记录器 logger = logging.getLogger(__name__) # 定义一个忽略索引常量,通常用于计算交叉熵时忽略某些特定的标签 IGNORE_INDEX = -100 # 定义模型类别的字典,便于后续根据类型创建模型和分词器 MODEL_CLASSES = { "llama": (AutoConfig, LlamaTokenizer, LlamaForCausalLM), "auto": (AutoConfig, AutoTokenizer, AutoModelForCausalLM), } # 打印模型的可训练参数数量的函数 def print_trainable_params(model: torch.nn.Module) -> None: # 引用自:https://github.com/LLaMA-Efficient-Tuning-main/src/utils/other.py # 用于计算和打印模型可训练参数和总参数的数量 trainable_params, all_param = 0, 0 for param in model.parameters(): num_params = param.numel() # 如果使用了DS Zero 3并且权重初始化为空 if num_params == 0 and hasattr(param, "ds_numel"): num_params = param.ds_numel all_param += num_params if param.requires_grad: trainable_params += num_params print(f"可训练参数数量: {trainable_params} || 总参数数量: {all_param} || 可训练参数百分比: {100 * trainable_params / all_param:.4f}") # 创建模型的函数 def create_model(model_args, data_args, training_args): # 加载模型配置、分词器、模型类 config_class, tokenizer_class, model_class = MODEL_CLASSES[model_args.model_type] # 如果没有提供分词器的路径,则从预训练模型路径加载分词器 if model_args.tokenizer_name_or_path is None: tokenizer = tokenizer_class.from_pretrained(model_args.model_name_or_path, use_fast=model_args.use_fast_tokenizer) else: tokenizer = tokenizer_class.from_pretrained(model_args.tokenizer_name_or_path, use_fast=model_args.use_fast_tokenizer) # 设置pad的token id,如果分词器中没有设置pad_token_id,则默认为0 tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id # 定义模型配置参数 config_kwargs = { "trust_remote_code": True, "torch_dtype": model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype), "low_cpu_mem_usage": True, } # 如果需要以4bit加载模型,设置相关配置 if model_args.load_in_4bit: config_kwargs["load_in_4bit"] = True config_kwargs["quantization_config"] = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", ) # 从预训练模型加载 model = model_class.from_pretrained( pretrained_model_name_or_path=model_args.model_name_or_path, from_tf=bool(".ckpt" in model_args.model_name_or_path), **config_kwargs ) # 如果提供了预训练模型路径,加载预训练模型 if model_args.peft_path is not None: logger.info(f"加载预训练模型: {model_args.peft_path}") model = PeftModel.from_pretrained(model, model_args.peft_path, is_trainable=True) else: logger.info("初始化新的PEFT模型") # 配置LoRA(Low-Rank Adaptation)的参数 lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, inference_mode=False, target_modules=training_args.lora_target.split(','), r=training_args.lora_rank, lora_alpha=training_args.lora_alpha, lora_dropout=training_args.lora_dropout, ) # 使用LoRA配置获取PEFT模型 model = get_peft_model(model, peft_config=lora_config) # 从预训练模型加载含有值头(value head)的因果语言模型 model = AutoModelForCausalLMWithValueHead.from_pretrained(model) # 如果提供了预训练模型路径,并且需要加载adapter模型 if model_args.peft_path is not None: lora_state_dict = torch.load(os.path.join(model_args.peft_path, 'adapter_model.bin')) model.v_head.load_state_dict({ "summary.weight": lora_state_dict["v_head.summary.weight"], "summary.bias": lora_state_dict["v_head.summary.bias"], }) # 打印模型信息 print('*********************模型*******************') print_trainable_params(model) # 启用梯度检查点(可节省内存) model.gradient_checkpointing_enable() # 设置config.use_cache为False,这通常用于禁用transformers库中的某些缓存机制 model.config.use_cache = False return model, tokenizer # 处理数据的函数 def process_data(model_args, data_args, training_args, tokenizer): # 使用分词器对数据进行预处理和分词 def process_tokenize(examples): # 初始化模型输入 model_inputs = {"input_ids": [], "label_ids": []} # 获取数据列的名称 columns = list(examples.keys()) # logger.info(f"列名称: {columns}") # 根据数据参数获取提示模板 template = PROMPT_TEMPLATE[data_args.template] # 遍历每一个示例 for index in range(len(examples[columns[0]])): # 检查数据中是否包含特定的列 if 'chosen' not in columns or 'rejected' not in columns: # 断言数据中必须包含instruction、input和output这三个列 assert 'instruction' in columns and 'input' in columns and 'output' in columns # 获取instruction、input和output的内容 instruction, input, output = examples['instruction'][index], examples['input'][index], examples['output'][index] # 如果input非空,则将其添加到instruction中 if input is not None and input != "": instruction = instruction + '\n' + input # 确保output的长度大于1 assert len(output) > 1 # 分别获取prompt、chosen和rejected的内容 prompt, chosen, rejected = instruction, output[0], output[1] else: # 确保数据中包含prompt、rejected和chosen这三个列 assert 'prompt' in columns and 'rejected' in columns and 'chosen' in columns prompt, chosen, rejected = examples['prompt'][index], examples['chosen'][index], examples['rejected'][index] # 使用模板格式化prompt source = template.format_map({'instruction': prompt}) # 使用分词器编码source、chosen和rejected source_ids = tokenizer.encode(text=source, add_special_tokens=False) accepts_ids = tokenizer.encode(text=chosen, add_special_tokens=False) rejects_ids = tokenizer.encode(text=rejected, add_special_tokens=False) # 如果编码后的长度超过最大长度,进行截断 if len(source_ids) > training_args.max_prompt_length - 1: source_ids = source_ids[:training_args.max_prompt_length - 1] if len(accepts_ids) > training_args.max_response_length - 1: accepts_ids = accepts_ids[:training_args.max_response_length - 1] if len(rejects_ids) > training_args.max_response_length - 1: rejects_ids = rejects_ids[:training_args.max_response_length - 1] # 构造接受和拒绝的序列及其对应的标签 source_accepts_ids = source_ids + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id] source_accepts_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id] source_rejects_ids = source_ids + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id] source_rejects_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id] # 计算接受和拒绝序列的长度,并找到最大长度 source_accepts_length, source_rejects_length = len(source_accepts_ids), len(source_rejects_ids) max_length = max(source_accepts_length, source_rejects_length) # 填充序列到最大长度 source_accepts_ids = source_accepts_ids + [tokenizer.pad_token_id] * (max_length - source_accepts_length) source_accepts_labels = source_accepts_labels + [IGNORE_INDEX] * (max_length - source_accepts_length) source_rejects_ids = source_rejects_ids + [tokenizer.pad_token_id] * (max_length - source_rejects_length) source_rejects_labels = source_rejects_labels + [IGNORE_INDEX] * (max_length - source_rejects_length) # 合并接受和拒绝的序列以及标签 inputs_ids = source_accepts_ids + source_rejects_ids labels = source_accepts_labels + source_rejects_labels # 将处理后的序列和标签添加到模型输入中 model_inputs["input_ids"].append(inputs_ids) model_inputs["label_ids"].append(labels) return model_inputs # 处理数据集 logger.info("处理数据集") with training_args.main_process_first(desc="处理数据集"): # 如果指定了数据集目录 if data_args.dataset_dir is not None: all_datasets = [] path = Path(data_args.dataset_dir) files = [file.name for file in path.glob("*.json")] for file in files: data_path = os.path.join(path, file) # 从json文件加载数据集 raw_dataset = load_dataset( "json", data_files=data_path, ) columns = list(raw_dataset.column_names.values())[0] # 使用分词函数处理数据集 tokenized_data = raw_dataset.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True ) # 将处理后的数据集添加到列表中 all_datasets.append(tokenized_data['train']) # 如果只有一个数据集,则直接使用,否则将多个数据集合并 if len(all_datasets) == 1: all_datasets = all_datasets[0] else: all_datasets = concatenate_datasets(all_datasets) # 将数据集分割为训练集和测试集 all_datasets = all_datasets.train_test_split(test_size=data_args.split_ratio) # 如果指定了训练文件和验证文件的路径 elif data_args.train_file is not None and data_args.validation_file is not None: all_datasets = {} # 从json文件加载训练数据集 raw_train_datasets = load_dataset( "json", data_files=data_args.train_file, cache_dir=data_args.data_cache_dir ) columns = list(raw_train_datasets.column_names.values())[0] # 使用分词函数处理训练数据集 all_datasets['train'] = raw_train_datasets.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True )['train'] # 从json文件加载验证数据集 raw_valid_datasets = load_dataset( "json", data_files=data_args.validation_file, cache_dir=data_args.data_cache_dir ) # 使用分词函数处理验证数据集 all_datasets['test'] = raw_valid_datasets.map( process_tokenize, batched=True, num_proc=training_args.dataloader_num_workers, remove_columns=columns, load_from_cache_file=True )['train'] else: # 如果数据集文件路径不正确,则抛出错误 raise ValueError( "数据集文件路径不正确。 " "您可以提供 --dataset_dir 或提供两个文件 --train_file 和 --validation_file。 " ) return all_datasets def main(): # 主函数入口,解析参数,创建模型,处理数据,进行训练等 # 解析命令行参数 model_args, data_args, training_args = parser_arguments(logger) # 设置随机种子以保证实验的可重复性 transformers.set_seed(training_args.seed) # 创建模型和分词器 model, tokenizer = create_model(model_args, data_args, training_args) # 处理数据 all_datasets = process_data(model_args, data_args, training_args, tokenizer) # 创建训练器,并传入模型、训练参数、数据集等 trainer = RMPeftTrainer( model=model, args=training_args, train_dataset=all_datasets['train'] if training_args.do_train else None, eval_dataset=all_datasets['test'] if training_args.do_eval else None, tokenizer=tokenizer, data_collator=PairDataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=compute_metrics_for_pair, ) # 如果设置为训练模式 if training_args.do_train: # 开始训练 output = trainer.train() # 记录训练指标 trainer.log_metrics("train", output.metrics) # 保存训练指标 trainer.save_metrics("train", output.metrics) # 保存模型和训练器的状态 trainer.save_state() trainer.save_model() # 程序入口 if __name__ == "__main__": main()