禁用wandb功能

1
2
import os
os.environ["WANDB_DISABLED"] = "true"

wandb:深度学习模型的训练过程的参数和网络的在线可视化记录,暂不需要。禁用,使得在模型被训练前不需要选择登录还是不登陆及其凭证。

获取模型

1
2
3
4
5
6
from datasets import load_dataset, load_metric

timit=load_dataset('timit_asr',data_dir='/home/tellw/datasets/data/lisa/data/timit/raw/TIMIT/')
print(timit)

timit=timit.remove_columns(['phonetic_detail','word_detail','dialect_region','id','sentence_type','speaker_id'])

无法通过huggingface的load_dataset接口下载TIMIT数据集,TIMIT的官方下载网站也不好找到下载链接,从The DARPA TIMIT Acoustic-Phonetic Continuous Speech Corpus下载TIMIT数据集,解压到/home/tellw/datasets/目录下,去掉TIMIT没用的列

导入一些库

1
2
3
4
5
6
7
from datasets import ClassLabel
import random
import pandas as pd
import numpy as np

import subprocess
import torch

支持数据集的可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def exec_shell(cmd,ignore_err=False):
process=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
output,err=process.communicate()
retcode=process.poll()
if retcode==0 or ignore_err:
return output,err
else:
return -1000,f'execute "{cmd}" failed'

def show_random_elements(dataset,num_examples=10,beforeTrain=False):
assert num_examples<=len(dataset),'Can\'t pick more elements'
picks=[]
for _ in range(num_examples):
pick=random.randint(0,len(dataset)-1)
while pick in picks:
pick=random.randint(0,len(dataset)-1)
picks.append(pick)
df=pd.DataFrame(dataset[picks])
print(df)
if beforeTrain:
for f in df['file']:
exec_shell(f'vlc {f}')

# show_random_elements(timit['train'],beforeTrain=True)

预处理数据

1
2
3
4
5
6
7
8
9
10
import re
chars_to_ignore_regex='[\,\?\.\!\-\;\:\"]'

def remove_special_characters(batch):
batch['text']=re.sub(chars_to_ignore_regex, '',batch['text']).lower()
return batch

timit=timit.map(remove_special_characters)

# show_random_elements(timit['train'],beforeTrain=True)

获取词汇表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def extract_all_chars(batch):
all_text=" ".join(batch['text'])
vocab=list(set(all_text))
return {'vocab':[vocab],'all_text':[all_text]}

vocabs=timit.map(extract_all_chars,batched=True,batch_size=-1,keep_in_memory=True,remove_columns=timit.column_names['train'])

vocab_list=list(set(vocabs['train']['vocab'][0])|set(vocabs['test']['vocab'][0]))

vocab_dict={v: k for k,v in enumerate(vocab_list)}
print(vocab_dict)
vocab_dict['|']=vocab_dict[' ']
del vocab_dict[' ']

vocab_dict['[UNK]']=len(vocab_dict)
vocab_dict['[PAD]']=len(vocab_dict)

import json
with open('vocab.json','w') as vocab_file:
json.dump(vocab_dict,vocab_file)

只需要执行一次,则模型和标签都采用这样的文本到数字的映射逻辑,不同的词汇表进来操作会得到不一样的结果

初次初始化模型和处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# mp='/home/tellw/test/all/checkpoint-200/'
mp=None

from transformers import Wav2Vec2Processor
from transformers import Wav2Vec2ForCTC

def init(mp=None):
if mp is None:

from transformers import Wav2Vec2CTCTokenizer

tokenizer=Wav2Vec2CTCTokenizer('./vocab.json',unk_token='[UNK]',pad_token='[PAD]',word_delimiter_token='|')

from transformers import Wav2Vec2FeatureExtractor

feature_extractor=Wav2Vec2FeatureExtractor(feature_size=1,sampling_rate=16000,padding_value=0.0,do_normalize=True,return_attention_mask=False)

processor=Wav2Vec2Processor(feature_extractor=feature_extractor,tokenizer=tokenizer)

model=Wav2Vec2ForCTC.from_pretrained('facebook/wav2vec2-base',ctc_loss_reduction='mean',pad_token_id=processor.tokenizer.pad_token_id)
else:
processor=Wav2Vec2Processor.from_pretrained(mp)
model=Wav2Vec2ForCTC.from_pretrained(mp)
return processor,model

processor,model=init(mp)

第一次执行时还没有检查点,从huggingface下载模型文件,经过训练后,将训练后的数据存储为检查点,那之后从已有的检查点加载模型

加载指标

1
wer_metric=load_metric('wer')

词错率

提取语音特征,编码标签

1
2
3
4
5
6
7
8
def prepare_dataset(batch):
audio=batch['audio']
batch['input_ids']=processor(audio['array'],sampling_rate=audio['sampling_rate']).input_values[0]
with processor.as_target_processor():
batch['labels']=processor(batch['text']).input_ids
return batch

timit=timit.map(prepare_dataset,remove_columns=timit.column_names['train'],num_proc=4)

训练函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def train():

global timit, wer_metric, processor, model

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
"""
Data collator that will dynamically pad the inputs received.
Args:
processor (:class:`~transformers.Wav2Vec2Processor`)
The processor used for proccessing the data.
padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
among:
* :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
sequence if provided).
* :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
maximum acceptable input length for the model if that argument is not provided.
* :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
different lengths).
max_length (:obj:`int`, `optional`):
Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
max_length_labels (:obj:`int`, `optional`):
Maximum length of the ``labels`` returned list and optionally padding length (see above).
pad_to_multiple_of (:obj:`int`, `optional`):
If set will pad the sequence to a multiple of the provided value.
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
7.5 (Volta).
"""

processor: Wav2Vec2Processor
padding: Union[bool, str] = True
max_length: Optional[int] = None
max_length_labels: Optional[int] = None
pad_to_multiple_of: Optional[int] = None
pad_to_multiple_of_labels: Optional[int] = None

def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
# split inputs and labels since they have to be of different lengths and need
# different padding methods

input_features = [{"input_values": feature["input_ids"]} for feature in features]
label_features = [{"input_ids": feature["labels"]} for feature in features]

batch = self.processor.pad(
input_features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors="pt",
)
with self.processor.as_target_processor():
labels_batch = self.processor.pad(
label_features,
padding=self.padding,
max_length=self.max_length_labels,
pad_to_multiple_of=self.pad_to_multiple_of_labels,
return_tensors="pt",
)

# replace padding with -100 to ignore loss correctly
labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

batch["labels"] = labels

return batch

data_collator=DataCollatorCTCWithPadding(processor=processor,padding=True)

def compute_metrics(pred):
pred_logits = pred.predictions
pred_ids = np.argmax(pred_logits, axis=-1)

pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

pred_str = processor.batch_decode(pred_ids)
# we do not want to group tokens when computing the metrics
label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

wer = wer_metric.compute(predictions=pred_str, references=label_str)

return {"wer": wer}

model.freeze_feature_extractor()

from transformers import TrainingArguments

training_args = TrainingArguments(
output_dir='0215',
group_by_length=True,
per_device_train_batch_size=16,
evaluation_strategy="steps",
num_train_epochs=100,
fp16=True,
gradient_checkpointing=True,
save_steps=200,
eval_steps=200,
logging_steps=100,
learning_rate=1e-4,
weight_decay=0.005,
warmup_steps=10,
save_total_limit=2,
)

from transformers import Trainer

trainer = Trainer(
model=model,
data_collator=data_collator,
args=training_args,
compute_metrics=compute_metrics,
train_dataset=timit["train"],
eval_dataset=timit["test"],
tokenizer=processor.tokenizer,
)
trainer.train()

推理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def inference():
global model, timit, processor

model.to('cuda')

def map_to_result(batch):
with torch.no_grad():
input_values=torch.tensor(batch['input_ids'],device='cuda').unsqueeze(0) # unsqueeze 增加一个维度
logits=model(input_values).logits
pred_ids=torch.argmax(logits,dim=-1)
batch['logits']=logits
batch['pred_str']=processor.batch_decode(pred_ids)[0]
batch['text']=processor.decode(batch['labels'],group_tokens=False)

return batch

results=timit['test'].map(map_to_result,remove_columns=timit['test'].column_names)

print('Test WER: {:.3f}'.format(wer_metric.compute(predictions=results['pred_str'],references=results['text'])))

show_random_elements(results)

with torch.no_grad():
logits=model(torch.tensor(timit['test'][:1]['input_ids'],device='cuda')).logits

pred_ids=torch.argmax(logits,dim=-1)

print(' '.join(processor.tokenizer.convert_ids_to_tokens(pred_ids[0].tolist())))
print(processor.batch_decode(pred_ids)[0])
print(processor.decode(timit['test'][0]['labels'],group_tokens=False))

主函数

1
2
3
if __name__=='__main__':
train()
# inference()

注意事项

~/.local/lib/python3.10/site-packages/transformers/trainer.py第611行改成self._signature_columns += list(set(["label", "label_ids", "input_ids"] + self.label_names)),这样做,避免了在模型的训练过程中删去数据集的input_ids列,从而无法加载输入数据。

源码文件

下载

训练HubertForCTC

模型声明部分改成model=HubertForCTC.from_pretrained('facebook/hubert-base-ls960',ctc_loss_reduction='mean',pad_token_id=processor.tokenizer.pad_token_id)即可

参考链接

Fine-Tune Wav2Vec2 for English ASR with 🤗 Transformers

facebook/hubert-base-ls960

facebook/wav2vec2-base

本文创建于2023.2.17/23.33,修改于2023.2.17/23.33