import json import ast import astunparse from transformers import PreTrainedTokenizer from torch.utils.data import Dataset from copy import deepcopy from typing importDict, List
# text constants FUNCTION_CALL_NAME = 'tool_call' FUNCTION_CALL_PREFIX = '```python\n' FUNCTION_CALL_POSTFIX = '\n```' TOOL_DEFINITION_PREFIX = 'Answer the following questions as best as you can. You have access to the following tools:\n' CONVERSATOIN_KEY = 'conversations' TOOL_DESC_KEY = 'tools'
# Note: `loss_mask` here means whether *the prediction* of the token should take loss tokens, loss_masks = [tokenizer.get_command("[gMASK]"), tokenizer.get_command("sop")], [0, 0]
def_update(_tokens: List[int], value: int = 1): value = int(value) tokens.extend(_tokens) loss_masks.extend([value] * len(_tokens))
# insert system prompt for tools if tool_key in item: conversations.insert(0, { "role": "system", "content": TOOL_DEFINITION_PREFIX + json.dumps(item[tool_key], indent=4, ensure_ascii=False) } )
for idx, conv inenumerate(conversations): loss = conv.get("loss", True) if conv['role'] in {'system', 'user'}: loss = False if conv['role'] == 'tool': # function call python code value = FUNCTION_CALL_PREFIX + format_function_call(FUNCTION_CALL_NAME, conv["parameters"]) + FUNCTION_CALL_POSTFIX text = tokenizer.build_single_message("assistant", conv["name"], value) _update(text, loss)
# function call result value = conv.get('observation', None) ifnotisinstance(value, str): value = json.dumps(value, ensure_ascii=False) text = tokenizer.build_single_message("observation", "", value) _update(text, False) else: text = tokenizer.build_single_message(conv['role'], "", conv["content"]) _update(text, loss)
# labels are used inside the model target_based_loss_mask = [False] + loss_masks[:-1] labels = [(t if m else -100) for t, m inzip(tokens, target_based_loss_mask)]
""" 文件中定义了模型定义和训练过程中的命令行参数 """ from dataclasses import dataclass, field from typing importOptional
@dataclass classModelArguments: """ Arguments pertaining to which model/config/tokenizer we are going to fine-tune from. """
model_name_or_path: str = field( metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"} ) checkpoint_path: Optional[str] = field( default=None, metadata={"help": "Path to pt2 or lora finetuned checkpoint dir"} ) ptuning_checkpoint: str = field( default=None, metadata={"help": "Path to p-tuning v2 checkpoints"} ) config_name: Optional[str] = field( default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"} ) tokenizer_name: Optional[str] = field( default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"} ) cache_dir: Optional[str] = field( default=None, metadata={"help": "Where to store the pretrained models downloaded from huggingface.co"}, ) use_fast_tokenizer: bool = field( default=True, metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."}, ) model_revision: str = field( default="main", metadata={"help": "The specific model version to use (can be a branch name, tag name or commit id)."}, ) use_auth_token: bool = field( default=False, metadata={ "help": ( "Will use the token generated when running `huggingface-cli login` (necessary to use this script " "with private models)." ) }, ) resize_position_embeddings: Optional[bool] = field( default=None, metadata={ "help": ( "Whether to automatically resize the position embeddings if `max_source_length` exceeds " "the model's position embeddings." ) }, ) quantization_bit: Optional[int] = field( default=None ) pre_seq_len: Optional[int] = field( default=None ) prefix_projection: bool = field( default=False )
@dataclass classDataTrainingArguments: """ Arguments pertaining to what data we are going to input our model for training and eval. """ train_file: Optional[str] = field( default=None, metadata={"help": "The input training data file (a jsonlines or csv file)."} ) validation_file: Optional[str] = field( default=None, metadata={"help": "The input validation data file (a jsonlines or csv file)."} ) test_file: Optional[str] = field( default=None, metadata={"help": "The input test data file (a jsonlines or csv file)."} )
max_seq_length: Optional[int] = field( default=2048, metadata={ "help": ( "The maximum total input sequence length after tokenization. Sequences longer " "than this will be truncated." ) }, )
max_source_length: Optional[int] = field( default=1024, metadata={ "help": ( "The maximum total input sequence length after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." ) }, ) max_target_length: Optional[int] = field( default=128, metadata={ "help": ( "The maximum total sequence length for target text after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." ) }, )
overwrite_cache: bool = field( default=False, metadata={"help": "Overwrite the cached training and evaluation sets"} )
preprocessing_num_workers: Optional[int] = field( default=None, metadata={"help": "The number of processes to use for the preprocessing."}, )
max_seq_length: Optional[int] = field( default=1024, metadata={ "help": ( "The maximum total input sequence length after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." ) }, )
pad_to_max_length: bool = field( default=False, metadata={ "help": ( "Whether to pad all samples to model maximum sentence length. " "If False, will pad the samples dynamically when batching to the maximum length in the batch. More " "efficient on GPU but very bad for TPU." ) }, )
max_train_samples: Optional[int] = field( default=None, metadata={ "help": ( "For debugging purposes or quicker training, truncate the number of training examples to this " "value if set." ) }, )
""" The Trainer class, to easily train a 🤗 Transformers from scratch or finetune it on a new task. """ import os from typing importOptional from transformers import Trainer
import torch from transformers.modeling_utils import PreTrainedModel, unwrap_model from transformers.utils import logging
def_save(self, output_dir: Optional[str] = None, state_dict=None): # If we are executing this function, we are the process zero, so we don't check for that. output_dir = output_dir if output_dir isnotNoneelseself.args.output_dir os.makedirs(output_dir, exist_ok=True) logger.info(f"Saving model checkpoint to {output_dir}") # Save a trained model and configuration using `save_pretrained()`. # They can then be reloaded using `from_pretrained()` ifnotisinstance(self.model, PreTrainedModel): ifisinstance(unwrap_model(self.model), PreTrainedModel): if state_dict isNone: state_dict = self.model.state_dict() unwrap_model(self.model).save_pretrained(output_dir, safe_serialization=False, state_dict=state_dict) else: logger.info("Trainer.model is not a `PreTrainedModel`, only saving its state dict.") if state_dict isNone: state_dict = self.model.state_dict() torch.save(state_dict, os.path.join(output_dir, WEIGHTS_NAME)) else: ifself.save_changed: print("Saving PrefixEncoder") state_dict = self.model.state_dict() filtered_state_dict = {} for k, v inself.model.named_parameters(): if v.requires_grad: filtered_state_dict[k] = state_dict[k] self.model.save_pretrained(output_dir, safe_serialization=False, state_dict=filtered_state_dict) else: print("Saving the whole model") self.model.save_pretrained(output_dir, safe_serialization=False, state_dict=state_dict) ifself.tokenizer isnotNone: self.tokenizer.save_pretrained(output_dir, safe_serialization=False)
# Good practice: save your training arguments together with the trained model torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))
#if training_args.local_rank < 1: # sanity_check(train_dataset[0]['input_ids'], train_dataset[0]['labels'], tokenizer) if training_args.do_eval: withopen(data_args.validation_file, "r", encoding="utf-8") as f: eval_data = [json.loads(line) for line in f]
from dataclasses import dataclass, field from typing importOptional
@dataclass classModelArguments: """ Arguments pertaining to which model/config/tokenizer we are going to fine-tune from. """ model_name_or_path: str = field( metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"} )
@dataclass classDataTrainingArguments: """ Arguments pertaining to what data we are going to input our model for training and eval. """ prompt_column: Optional[str] = field( default=None, metadata={"help": "The name of the column in the datasets containing the full texts (for summarization)."}, ) response_column: Optional[str] = field( default=None, metadata={"help": "The name of the column in the datasets containing the summaries (for summarization)."}, ) history_column: Optional[str] = field( default=None, metadata={"help": "The name of the column in the datasets containing the history of chat."}, ) train_file: Optional[str] = field( default=None, metadata={"help": "The input training data file (a jsonlines or csv file)."} ) validation_file: Optional[str] = field( default=None, metadata={ "help": ( "An optional input evaluation data file to evaluate the metrics (rouge) on (a jsonlines or csv file)." ) }, ) test_file: Optional[str] = field( default=None, metadata={ "help": "An optional input test data file to evaluate the metrics (rouge) on (a jsonlines or csv file)." }, ) overwrite_cache: bool = field( default=False, metadata={"help": "Overwrite the cached training and evaluation sets"} ) preprocessing_num_workers: Optional[int] = field( default=None, metadata={"help": "The number of processes to use for the preprocessing."}, ) max_source_length: Optional[int] = field( default=1024, metadata={ "help": ( "The maximum total input sequence length after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." ) }, ) max_target_length: Optional[int] = field( default=256, metadata={ "help": ( "The maximum total sequence length for target text after tokenization. Sequences longer " "than this will be truncated, sequences shorter will be padded." ) }, ) ignore_pad_token_for_loss: bool = field( default=True, metadata={ "help": "Whether to ignore the tokens corresponding to padded labels in the loss computation or not." }, )
defbuild_prompt(context): ifisinstance(context,str): context = json.loads(context) prompt = '' for turn in context: if turn["role"] in ["user","assistant"]: prompt += f'<|start_header_id|>{turn["role"]}<|end_header_id|>\n{turn["content"]}<|eot_id|>\n' else: if turn["role"] == "search": obj = turn["arguments"] filtered_obj = {k: v for k, v in obj.items() if v isnotNone} prompt += '<|start_header_id|>search<|end_header_id|>\n' prompt += json.dumps(filtered_obj,indent=4,ensure_ascii=False) else: obj = turn["records"] prompt += '<|start_header_id|>return<|end_header_id|>\n' prompt += json.dumps(obj,indent=4,ensure_ascii=False) prompt += '<|eot_id|>\n' return prompt
defbuild_response(response): ifisinstance(response,str): response = json.loads(response) if response["role"] == "assistant": return'<|start_header_id|>assistant<|end_header_id|>\n' + response["content"] + '<|eot_id|>' else: obj = response["arguments"] filtered_obj = {k: v for k, v in obj.items() if v isnotNone} return'<|start_header_id|>search<|end_header_id|>\n' + json.dumps(filtered_obj,indent=4,ensure_ascii=False) + '<|eot_id|>'
if training_args.do_train: withopen(data_args.train_file, "r", encoding="utf-8") as f: train_data = [json.loads(line) for line in f] train_dataset = InputOutputDataset(train_data, tokenizer, data_args) if training_args.do_eval: withopen(data_args.validation_file, "r", encoding="utf-8") as f: eval_data = [json.loads(line) for line in f] eval_dataset = InputOutputDataset(eval_data, tokenizer, data_args)
trainer = Trainer( model=model, tokenizer=tokenizer, data_collator=data_collator, args=training_args, train_dataset=train_dataset if training_args.do_train elseNone, eval_dataset=eval_dataset if training_args.do_eval elseNone, )
if training_args.do_train: model.gradient_checkpointing_enable() model.enable_input_require_grads() trainer.train() if training_args.do_eval: trainer.evaluate()