
这是2023年的第85篇文章
( 本文阅读时间:15分钟 )
01
Intro
02
样本处理
核心思路:函数式,流式,组合式,batch 做多路融合,对 datasets 兼容
MaxCompute(ODPS) :无法通过行号快速读取数据,但是有 Tunnel 接口支持从某个下标开始顺序读取数据。
文件系统:包括本地文件,HDFS,以及OSS等对象存储。本地文件虽然能用 lseek() 等函数快速跳转到某个位置(且该操作通常为 O(1)),但是如果每条样本字节数不一样,封装为随机读取还是非常复杂,但是做成流式读取就很容易。其他云上的存储介质更是如此。
消息队列:例如 MetaQ,天然流式的数据,可以主动拉取,也可以以订阅的方式封装流式读取的接口。
positive = Threaded(Map(func, ODPS(access_id, access_key, project, positive_sample_table_name, read_once=False)))negative = Threaded(Map(func, ODPS(access_id, access_key, project, negative_sample_table_name, read_once=False)))combined = Combine([positive, negative], sample_weight=[1.0, 1.0])
# 直接读取数据for data in combined:print(data)# 使用 huggingface datasets 模块# 之后可以直接用在 transformers.Trainer 类中参与训练import datasetstrain_dataset = datasets.IterableDataset.from_generator(combined,gen_kwargs={"ranks": [0,1,2,3], "world_size": 4} # 支持分布式训练)
2.2 技术问题:对分布式训练的支持
def _ODPS(access_id, access_key, project, table_name, partition_spec, read_once, retry,endpoint, ranks=None, world_size=None):# 加载 ranks + world_size 对应分片数据,实现略(计算读取 range 后,使用 PyODPS 加载数据)def ODPS(access_id, access_key, project, table_name, partition_spec=None, read_once=True, retry=True, endpoint="http://xxxxxx"):return partial(_ODPS, access_id, access_key, project, table_name, partition_spec, read_once, retry, endpoint)
03
模型开发
核心思路:
继承 PreTrainedModel,PreTrainedCofig,PreTrainedTokenizer 基类,与 transformers 体系打通。 通过 mixin / monkey patching 方式,扩充现有框架功能,例如对 OSS 模型加载/存储的支持。
# 加载我们项目组开发的分类模型(多任务层次分类)model = BertForMultiTaskHierarchicalClassification.from_pretrained("./local_dir")# or 从 OSS 直接加载model = BertForMultiTaskHierarchicalClassification.from_pretrained("oss://model_remote_dir")# 保存模型model.save_pretrained("./local_dir")model.save_pretrained("oss://model_remote_dir")# 加载我们使用 C++ 开发的 tokenizertokenizer = ShieldTokenizer.from_pretrained("oss://model_remote_dir")# 使用 AutoClass 实现相同功能,不需要指定特定的模型类名,由框架自动推断model = AutoModel.from_pretrained("oss://model_remote_dir")tokenizer = AutoTokenizer.from_pretrained("oss://model_remote_dir")# 扩展 transformers 默认的 pipeline# 增加 multitask-hierarchical-classification 任务pipe = pipeline("multitask-hierarchical-classification", model=model, tokenizer=tokenizer)print(pipe("测试文本"))
3.1 如何支持集团内的存储介质
class OSSRemoteModelMixin(object):"""支持用户在 from_pretrained 和 save_pretrained 时按照 oss://path 的格式指定路径 (bucket, ak, sk 需要在环境变量中指定, 见 util.oss_util 类)可以用于所有包含 from_pretrained 和 save_pretrained 方法的类 (config or tokenizer or model)"""def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):pretrained_model_name_or_path = convert_oss_to_local_path(cls, pretrained_model_name_or_path,kwargs.get('cache_dir', risk_shield.CACHE_DIR))return super(OSSRemoteModelMixin, cls).from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)def save_pretrained(self,save_directory: Union[str, os.PathLike],*args,**kwargs):prefix = "oss://"oss_path = save_directoryif save_directory.startswith(prefix):# save to a temp dir# .......# 将文件拷贝到 OSS,实现略return reselse:res = super(OSSRemoteModelMixin, self).save_pretrained(save_directory, *args, **kwargs)# 让模型继承自 OSSRemoteModelMixin 就会自动获得 OSS 存取的能力class BertForMultiTaskHierarchicalClassification(OSSRemoteModelMixin, BertPreTrainedModel):config_class = BertForMultiTaskHierarchicalClassificationConfigdef __init__(self, config:BertForMultiTaskHierarchicalClassificationConfig):# .....# 对于 AutoModel,则直接覆盖他们的 from_pretrained 方法。def patch_auto_class(cls):"""让 AutoClass 支持 OSS 路径"""old_from_pretrained = cls.from_pretraineddef new_from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):pretrained_model_name_or_path = \convert_oss_to_local_path(cls, pretrained_model_name_or_path,kwargs.get('cache_dir', risk_shield.CACHE_DIR))return old_from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)cls.from_pretrained = classmethod(new_from_pretrained)
类变量 vocab_files_names__getstate__, __setstate__,解决 C++ 对象的 pickle 问题_convert_token_to_id_convert_id_to_tokenconvert_ids_to_tokens__setattr__vocab_sizeget_vocabtokenizesave_vocabulary_pad 对 tokenize 后的结果进行 pad,主要用在 trainer 里的 collator_encode_plus 处理单条文本_batch_encode_plus 处理 batch 文本
3.3 训练代码
def compute_metrics(model, eval_pred):logits, labels = eval_pred# 针对层次分类设计的评估指标metrics = evaluate_multitask_hierarchical_classifier(model, labels)return metrics# 定义模型tokenizer = ShieldTokenizer.from_pretrained("oss://backbone")config = BertForMultiTaskHierarchicalClassificationConfig.from_pretrained("oss://backbone")config.multi_task_config = {# 这里的分类树仅是例子"main": {"hierarchical_tree":["父类别1", "父类别2",["父类别3",["父类别3-子类别1", "父类别3-子类别2", "父类别3-子类别3", "父类别3-子类别4"]]]}}model = BertForMultiTaskHierarchicalClassification.from_pretrained("./backbone")# 定义训练数据加载策略positive = Threaded(Map(func, ODPS(access_id, access_key, project,positive_sample_table_name,read_once=False)))negative = Threaded(Map(func, ODPS(access_id, access_key, project,negative_sample_table_name,read_once=False)))combined = Combine([positive, negative], sample_weight=[1.0, 1.0])train_ds = datasets.IterableDataset.from_generator(combined)training_arg = TrainingArguments(output_dir="./output",overwrite_output_dir=True,num_train_epochs=4,# ...# 其他训练参数# ...dataloader_num_workers=2,)trainer = Trainer(model=model,args=training_arg,train_dataset=train_ds,tokenizer=tokenizer,eval_dataset=val_ds,compute_metrics=partial(compute_metrics, model),# 针对层次分类开发的 collatordata_collator=MultiTaskHierarchicalClassifierCollator(tokenizer=tokenizer, model=model, max_length=max_length,task_label_smooth=task_label_smooth))# 将实验指标写入 tensorboard 并上传到 OSStrainer.add_callback(OSSTensorboardWriterCallback("experiment/v1/"))trainer.train()
第一个元素是 loss,trainer 自动优化该值。 后续的元素只能是 python dict / list / tuple / tensor,tensor 第一维的大小必须和 batch size 一致。最理想的情况就是一个二维 logits 矩阵。
3.4 模型部署
import risk_shieldfrom transformers import AutoTokenizer, AutoModelmodel = AutoModel.from_pretrained("oss://model.tar.gz")tokenizer = AutoTokenizer.from_pretrained("oss://model.tar.gz")# 导出 ONNXmodel.export_onnx(output_dir)# 或导出 TensorRTmodel.export_tensorrt(output_dir)# 或导出 Tensorflow(ODPS 部署)model.export_tf_saved_model(output_dir)# 导出切词器到同一目录tokenizer.save_pretrained(output_dir)########################## 部署时加载对应 pipelinefrom risk_shield import ONNXHierarchicalClassifierPipelinefrom risk_shield import TensorRTHierarchicalClassifierPipelinefrom risk_shield import TFSavedModelHierarchicalClassifierPipelinepipe = TensorRTHierarchicalClassifierPipeline(output_dir)result= pipe("测试文本")
3.5 最小化依赖
04
实验管理


05
工具链及可视化



06
“软件2.0”


-- 团队 A 同学发布模型到 OSSshield_publish ~/checkpoint_dir https://xxxx.aliyuncs.com/../xxxx.tar.gzWARNING:root:从本地目录上传:~/checkpoint_dirxxxx.tar.gz: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:50<00:00, 1.11s/it]WARNING:root:已发布OSS(url):https://xxxx.aliyuncs.com/../xxxx.tar.gz-- 团队 B 同学使用 OSS 上“内部开源”的模型AutoModel.from_pretrained("https://xxxx.aliyuncs.com/../xxxx.tar.gz")
相关资料
[02] https://wandb.ai/site
[03] https://karpathy.medium.com/software-2-0-a64152b37c35
[04] https://huggingface.co/docs/hub/index
[05] https://huggingface.co/spaces
[06] https://modelscope.cn/models
[07] https://huggingface.co/docs/hub/paddlenlp
[08] https://arxiv.org/abs/2307.07924