# 引入必要的库
import os
import sys
import torch
import logging
import math
import numpy as np
from typing import Dict
import transformers
from transformers import (
AutoConfig,
AutoTokenizer,
LlamaForCausalLM,
LlamaTokenizer,
Trainer,
DataCollatorWithPadding,
AutoModelForCausalLM,
BitsAndBytesConfig,
)
# 将上级目录添加到系统路径中,这样可以引用上级目录中的模块
sys.path.append("..")
# 引入自定义模块,包括模型配置、任务类型定义等
from peft import LoraConfig, PeftModel, TaskType, get_peft_model
from pathlib import Path
from datasets import load_dataset, concatenate_datasets
from itertools import chain
from utils.parser_args import parser_arguments
from utils.metrics import compute_metrics_for_pair
from utils.trainer import PeftTrainer, RMPeftTrainer
from trl import AutoModelForCausalLMWithValueHead
from utils.data_collator import PairDataCollatorWithPadding
from utils.utils import PROMPT_TEMPLATE
# 设置日志记录器
logger = logging.getLogger(__name__)
# 定义一个忽略索引常量,通常用于计算交叉熵时忽略某些特定的标签
IGNORE_INDEX = -100
# 定义模型类别的字典,便于后续根据类型创建模型和分词器
MODEL_CLASSES = {
"llama": (AutoConfig, LlamaTokenizer, LlamaForCausalLM),
"auto": (AutoConfig, AutoTokenizer, AutoModelForCausalLM),
}
# 打印模型的可训练参数数量的函数
def print_trainable_params(model: torch.nn.Module) -> None:
# 引用自:https://github.com/LLaMA-Efficient-Tuning-main/src/utils/other.py
# 用于计算和打印模型可训练参数和总参数的数量
trainable_params, all_param = 0, 0
for param in model.parameters():
num_params = param.numel()
# 如果使用了DS Zero 3并且权重初始化为空
if num_params == 0 and hasattr(param, "ds_numel"):
num_params = param.ds_numel
all_param += num_params
if param.requires_grad:
trainable_params += num_params
print(f"可训练参数数量: {trainable_params} || 总参数数量: {all_param} || 可训练参数百分比: {100 * trainable_params / all_param:.4f}")
# 创建模型的函数
def create_model(model_args, data_args, training_args):
# 加载模型配置、分词器、模型类
config_class, tokenizer_class, model_class = MODEL_CLASSES[model_args.model_type]
# 如果没有提供分词器的路径,则从预训练模型路径加载分词器
if model_args.tokenizer_name_or_path is None:
tokenizer = tokenizer_class.from_pretrained(model_args.model_name_or_path, use_fast=model_args.use_fast_tokenizer)
else:
tokenizer = tokenizer_class.from_pretrained(model_args.tokenizer_name_or_path, use_fast=model_args.use_fast_tokenizer)
# 设置pad的token id,如果分词器中没有设置pad_token_id,则默认为0
tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id
# 定义模型配置参数
config_kwargs = {
"trust_remote_code": True,
"torch_dtype": model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype),
"low_cpu_mem_usage": True,
}
# 如果需要以4bit加载模型,设置相关配置
if model_args.load_in_4bit:
config_kwargs["load_in_4bit"] = True
config_kwargs["quantization_config"] = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
# 从预训练模型加载
model = model_class.from_pretrained(
pretrained_model_name_or_path=model_args.model_name_or_path,
from_tf=bool(".ckpt" in model_args.model_name_or_path),
**config_kwargs
)
# 如果提供了预训练模型路径,加载预训练模型
if model_args.peft_path is not None:
logger.info(f"加载预训练模型: {model_args.peft_path}")
model = PeftModel.from_pretrained(model, model_args.peft_path, is_trainable=True)
else:
logger.info("初始化新的PEFT模型")
# 配置LoRA(Low-Rank Adaptation)的参数
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False, # 推理模式,这里设置为False,意味着在训练模式
target_modules=training_args.lora_target.split(','),
r=training_args.lora_rank, # LoRA的秩(rank),影响参数数量和模型容量
lora_alpha=training_args.lora_alpha, # LoRA的alpha参数,用于调节LoRA适配器的学习速率
lora_dropout=training_args.lora_dropout, # LoRA适配器中使用的dropout比率
)
# 使用LoRA配置获取PEFT模型
# PEFT模型在保持预训练模型性能的同时,通过只更新LoRA适配器中的参数,减少了整体的参数量,这使得模型更适合于特定任务的微调,同时计算效率更高。
model = get_peft_model(model, peft_config=lora_config)
# 从预训练模型加载含有值头(value head)的因果语言模型
# AutoModelForCausalLMWithValueHead是一个特殊的模型类型,用于强化学习或任务中需要同时预测动作和估计值(如价值函数)的情况。
# 这类模型结合了因果语言模型(Causal Language Model)的功能和额外的“值头”(Value Head),不仅生成下一个可能的词或序列(标准的语言模型功能),
# 还输出一个额外的值,这个值通常用来估计某个状态或动作的预期回报。这种结构在强化学习中特别有用,其中模型需要评估其行动的长期效益。
# 这种模型的使用,特别是在结合了PEFT和LoRA适配器的情况下,可能是为了提高模型在特定任务上的性能,同时确保模型的决策是在考虑长期回报的基础上进行的。
# 这种方法在处理复杂决策问题时非常有效,尤其是在需要考虑未来回报的策略学习中。
# 加权组合损失: Loss=λ×LM Loss+(1−λ)×Value Head Loss
model = AutoModelForCausalLMWithValueHead.from_pretrained(model)
# 如果提供了预训练模型路径,并且需要加载adapter模型
if model_args.peft_path is not None:
lora_state_dict = torch.load(os.path.join(model_args.peft_path, 'adapter_model.bin'))
model.v_head.load_state_dict({
"summary.weight": lora_state_dict["v_head.summary.weight"],
"summary.bias": lora_state_dict["v_head.summary.bias"],
})
# 打印模型信息
print('*********************模型*******************')
print_trainable_params(model)
# 启用梯度检查点(可节省内存)
model.gradient_checkpointing_enable()
# 设置config.use_cache为False,这通常用于禁用transformers库中的某些缓存机制
model.config.use_cache = False
return model, tokenizer
# 处理数据的函数
def process_data(model_args, data_args, training_args, tokenizer):
# 使用分词器对数据进行预处理和分词
def process_tokenize(examples):
# 初始化模型输入
model_inputs = {"input_ids": [], "label_ids": []}
# 获取数据列的名称
columns = list(examples.keys())
# logger.info(f"列名称: {columns}")
# 根据数据参数获取提示模板
template = PROMPT_TEMPLATE[data_args.template]
# 遍历每一个示例
for index in range(len(examples[columns[0]])):
# 检查数据中是否包含特定的列
if 'chosen' not in columns or 'rejected' not in columns:
# 断言数据中必须包含instruction、input和output这三个列
assert 'instruction' in columns and 'input' in columns and 'output' in columns
# 获取instruction、input和output的内容
instruction, input, output = examples['instruction'][index], examples['input'][index], examples['output'][index]
# 如果input非空,则将其添加到instruction中
if input is not None and input != "":
instruction = instruction + '\n' + input
# 确保output的长度大于1
assert len(output) > 1
# 分别获取prompt、chosen和rejected的内容
prompt, chosen, rejected = instruction, output[0], output[1]
else:
# 确保数据中包含prompt、rejected和chosen这三个列
assert 'prompt' in columns and 'rejected' in columns and 'chosen' in columns
prompt, chosen, rejected = examples['prompt'][index], examples['chosen'][index], examples['rejected'][index]
# 使用模板格式化prompt
source = template.format_map({'instruction': prompt})
# 使用分词器编码source、chosen和rejected
source_ids = tokenizer.encode(text=source, add_special_tokens=False)
accepts_ids = tokenizer.encode(text=chosen, add_special_tokens=False)
rejects_ids = tokenizer.encode(text=rejected, add_special_tokens=False)
# 如果编码后的长度超过最大长度,进行截断
if len(source_ids) > training_args.max_prompt_length - 1:
source_ids = source_ids[:training_args.max_prompt_length - 1]
if len(accepts_ids) > training_args.max_response_length - 1:
accepts_ids = accepts_ids[:training_args.max_response_length - 1]
if len(rejects_ids) > training_args.max_response_length - 1:
rejects_ids = rejects_ids[:training_args.max_response_length - 1]
# 构造接受和拒绝的序列及其对应的标签
source_accepts_ids = source_ids + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id]
source_accepts_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + accepts_ids + [tokenizer.eos_token_id]
source_rejects_ids = source_ids + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id]
source_rejects_labels = [IGNORE_INDEX] * len(source_ids) + [tokenizer.bos_token_id] + rejects_ids + [tokenizer.eos_token_id]
# 计算接受和拒绝序列的长度,并找到最大长度
source_accepts_length, source_rejects_length = len(source_accepts_ids), len(source_rejects_ids)
max_length = max(source_accepts_length, source_rejects_length)
# 填充序列到最大长度
source_accepts_ids = source_accepts_ids + [tokenizer.pad_token_id] * (max_length - source_accepts_length)
source_accepts_labels = source_accepts_labels + [IGNORE_INDEX] * (max_length - source_accepts_length)
source_rejects_ids = source_rejects_ids + [tokenizer.pad_token_id] * (max_length - source_rejects_length)
source_rejects_labels = source_rejects_labels + [IGNORE_INDEX] * (max_length - source_rejects_length)
# 合并接受和拒绝的序列以及标签
inputs_ids = source_accepts_ids + source_rejects_ids
labels = source_accepts_labels + source_rejects_labels
# 将处理后的序列和标签添加到模型输入中
model_inputs["input_ids"].append(inputs_ids)
model_inputs["label_ids"].append(labels)
return model_inputs
# 处理数据集
logger.info("处理数据集")
with training_args.main_process_first(desc="处理数据集"):
# 如果指定了数据集目录
if data_args.dataset_dir is not None:
all_datasets = []
path = Path(data_args.dataset_dir)
files = [file.name for file in path.glob("*.json")]
for file in files:
data_path = os.path.join(path, file)
# 从json文件加载数据集
raw_dataset = load_dataset(
"json",
data_files=data_path,
)
columns = list(raw_dataset.column_names.values())[0]
# 使用分词函数处理数据集
tokenized_data = raw_dataset.map(
process_tokenize,
batched=True,
num_proc=training_args.dataloader_num_workers,
remove_columns=columns,
load_from_cache_file=True
)
# 将处理后的数据集添加到列表中
all_datasets.append(tokenized_data['train'])
# 如果只有一个数据集,则直接使用,否则将多个数据集合并
if len(all_datasets) == 1:
all_datasets = all_datasets[0]
else:
all_datasets = concatenate_datasets(all_datasets)
# 将数据集分割为训练集和测试集
all_datasets = all_datasets.train_test_split(test_size=data_args.split_ratio)
# 如果指定了训练文件和验证文件的路径
elif data_args.train_file is not None and data_args.validation_file is not None:
all_datasets = {}
# 从json文件加载训练数据集
raw_train_datasets = load_dataset(
"json",
data_files=data_args.train_file,
cache_dir=data_args.data_cache_dir
)
columns = list(raw_train_datasets.column_names.values())[0]
# 使用分词函数处理训练数据集
all_datasets['train'] = raw_train_datasets.map(
process_tokenize,
batched=True,
num_proc=training_args.dataloader_num_workers,
remove_columns=columns,
load_from_cache_file=True
)['train']
# 从json文件加载验证数据集
raw_valid_datasets = load_dataset(
"json",
data_files=data_args.validation_file,
cache_dir=data_args.data_cache_dir
)
# 使用分词函数处理验证数据集
all_datasets['test'] = raw_valid_datasets.map(
process_tokenize,
batched=True,
num_proc=training_args.dataloader_num_workers,
remove_columns=columns,
load_from_cache_file=True
)['train']
else:
# 如果数据集文件路径不正确,则抛出错误
raise ValueError(
"数据集文件路径不正确。 "
"您可以提供 --dataset_dir 或提供两个文件 --train_file 和 --validation_file。 "
)
return all_datasets
def main():
# 主函数入口,解析参数,创建模型,处理数据,进行训练等
# 解析命令行参数
model_args, data_args, training_args = parser_arguments(logger)
# 设置随机种子以保证实验的可重复性
transformers.set_seed(training_args.seed)
# 创建模型和分词器
model, tokenizer = create_model(model_args, data_args, training_args)
# 处理数据
all_datasets = process_data(model_args, data_args, training_args, tokenizer)
# 创建训练器,并传入模型、训练参数、数据集等
trainer = RMPeftTrainer(
model=model,
args=training_args,
train_dataset=all_datasets['train'] if training_args.do_train else None,
eval_dataset=all_datasets['test'] if training_args.do_eval else None,
tokenizer=tokenizer,
data_collator=PairDataCollatorWithPadding(tokenizer=tokenizer),
compute_metrics=compute_metrics_for_pair,
)
# 如果设置为训练模式
if training_args.do_train:
# 开始训练
output = trainer.train()
# 记录训练指标
trainer.log_metrics("train", output.metrics)
# 保存训练指标
trainer.save_metrics("train", output.metrics)
# 保存模型和训练器的状态
trainer.save_state()
trainer.save_model()
# 程序入口
if __name__ == "__main__":
main()
class RMPeftTrainer(PeftTrainer):
...
def compute_loss(self, model, inputs, return_outputs=False):
# 进行模型的前向传播,计算接受情况(accepts)的因果语言模型(Causal Language Model, CLM)损失和价值估计(value)
_, accepts_clm_loss, accepts_value = model(
input_ids=inputs["accepts_input_ids"],
attention_mask=inputs["accepts_attention_mask"],
labels=inputs["accepts_labels"],
return_dict=True
)
# 只计算拒绝情况(rejects)的价值估计
_, _, rejects_value = model(
input_ids=inputs["rejects_input_ids"],
attention_mask=inputs["rejects_attention_mask"],
return_dict=True
)
# 在训练涉及奖励模型(reward model)的场景中,接受情况(accepts)和拒绝情况(rejects)计算内容的差异主要是因为他们在模型学习目标中扮演不同的角色。
# 在强化学习或任何基于决策的模型训练中,区分“好”的行为和“坏”的行为是至关重要的。接受情况通常涉及模型应当增强或优化的行为,而拒绝情况涉及应当避免的行为。
# 因此,训练中两者的处理方式和计算内容不同,以确保模型能准确学习到哪些行为会带来正面的奖励,哪些会带来负面的结果。
# 接受情况不仅需要计算值(value),也要计算因果语言模型(CLM)的损失,因为接受情况通常是正向样本,模型需要准确预测其结果,并根据这些结果调整其行为。
# 相比之下,拒绝情况主要关注于值的预测,因为我们关心的是模型评估其为不良选择的能力。在某些设计中,这意味着我们可能不需要从CLM损失中学习太多关于拒绝情况的信息,而更关注于如何通过值预测来避免这些情况。
# 通常,接受情况更重要,因为它们直接关联到模型在实际应用中所追求的目标(如生成合适的响应、做出正确的决策等)。拒绝情况虽然重要,但在许多情况下它们的作用是辅助性的,主要用来提供一个对比,帮助模型学习避免不良结果。
# 获取接受和拒绝标签
accepts_labels, rejects_labels = inputs["accepts_labels"], inputs["rejects_labels"]
# 生成行动掩码,以区分有效数据点和忽略索引(IGNORE_INDEX)
accepts_action_masks = accepts_labels.ne(IGNORE_INDEX).long()
rejects_action_masks = rejects_labels.ne(IGNORE_INDEX).long()
# 使用行动掩码过滤价值估计,忽略无效的标签位置
accepts_value = accepts_value * accepts_action_masks
rejects_value = rejects_value * rejects_action_masks
# 计算每个批次的大小
batch_size = accepts_value.shape[0]
# 计算有效输入的长度,以排除填充的部分
accepts_seq_lengths = (torch.ne(inputs["accepts_input_ids"], self.tokenizer.pad_token_id).sum(-1) - 1).to(accepts_value.device)
rejects_seq_lengths = (torch.ne(inputs["rejects_input_ids"], self.tokenizer.pad_token_id).sum(-1) - 1).to(rejects_value.device)
# 提取每个序列最后一个有效token的价值估计
accepts_end_token_value = accepts_value[torch.arange(batch_size, device=accepts_value.device), accepts_seq_lengths]
rejects_end_token_value = rejects_value[torch.arange(batch_size, device=rejects_value.device), rejects_seq_lengths]
# 根据设置选择使用最后一个奖励还是整体奖励来计算loss1
if self.args.use_last_reward:
# 使用最后一个token的价值估计来计算log-sigmoid损失
loss1 = -torch.nn.functional.logsigmoid(accepts_end_token_value - rejects_end_token_value).mean()
else:
# 使用整个序列的价值估计来计算log-sigmoid损失
loss1 = -torch.nn.functional.logsigmoid(accepts_value - rejects_value).mean()
# 计算因果语言模型的损失权重
loss2 = self.args.clm_loss_weight * accepts_clm_loss
# 合并两部分损失
loss = loss1 + loss2
# 准备输出,包括每种情况的价值估计
outputs = dict(
accepts_end_token_value=accepts_end_token_value, # shape: (batch_size,)
rejects_end_token_value=rejects_end_token_value, # shape: (batch_size,)
)
# 根据参数选择返回损失值还是损失值和输出
return (loss, outputs) if return_outputs else loss