import pickle
from NER_MODEL import *
from transformers import BertTokenizer, RobertaForTokenClassification
tokenizer = BertTokenizer.from_pretrained("RoBERTa_zh_Large_PyTorch")
max_seq_len = 40
train_corpus = pickle.load(open('./data/corpus_train.pkl', 'rb'))
train_questions = [train_corpus[i]['question'] for i in range(len(train_corpus))]
train_entitys = [train_corpus[i]['gold_entitys'] for i in range(len(train_corpus))]
train_entitys = [[entity[1:-1].split('_')[0] for entity in line] for line in train_entitys]
test_corpus = pickle.load(open('./data/corpus_test.pkl', 'rb'))
test_questions = [test_corpus[i]['question'] for i in range(len(test_corpus))]
test_entitys = [test_corpus[i]['gold_entitys'] for i in range(len(test_corpus))]
test_entitys = [[entity[1:-1].split('_')[0] for entity in line] for line in test_entitys]
def find_lcsubstr(s1, s2):
m = [[0 for i in range(len(s2) + 1)] for j in range(len(s1) + 1)] # 生成0矩阵,为方便后续计算,比字符串长度多了一列
mmax = 0 # 最长匹配的长度
p = 0 # 最长匹配对应在s1中的最后一位
for i in range(len(s1)):
for j in range(len(s2)):
if s1[i] == s2[j]:
m[i + 1][j + 1] = m[i][j] + 1
if m[i + 1][j + 1] > mmax:
mmax = m[i + 1][j + 1]
p = i + 1
return s1[p - mmax:p]
def GetXY(questions, entitys):
X1, X2,X3, Y = [], [], [], []
for i in range(len(questions)):
q = questions[i]
#x1, x2 = tokenizer.encode(first=q, max_len=max_seq_len) # 分别是 词索引序列和分块索引序列
encoded_dict = tokenizer(q, max_length =max_seq_len,pad_to_max_length=True, return_tensors='pt') # 分别是 词索引序列和分块索引序列
x1, x2, x3 = encoded_dict["input_ids"][0],encoded_dict["token_type_ids"][0],encoded_dict["attention_mask"][0]
y = [[0] for j in range(max_seq_len)]
assert len(x1) == len(y)
for e in entitys[i]:
# 得到实体名和问题的最长连续公共子串
e = find_lcsubstr(e, q)
if e in q:
begin = q.index(e) + 1
end = begin + len(e)
if end < max_seq_len - 1:
for pos in range(begin, end):
y[pos] = [1]
X1.append(x1.tolist())
X2.append(x2.tolist())
X3.append(x3.tolist())
Y.append(y)
X1 = torch.tensor(X1).long()
X2 = torch.tensor(X2).long()
X3 = torch.tensor(X3).long()
Y = torch.tensor(np.array(Y)).squeeze().long()
return X1, X2, X3, Y
trainx1, trainx2, trainx3,trainy = GetXY(train_questions, train_entitys) # (num_sample,max_len)
testx1, testx2,testx3, testy = GetXY(test_questions, test_entitys)
import torch
from torch.utils.data import TensorDataset, random_split
# 把input 放入 TensorDataset。
train_dataset = TensorDataset(trainx1, trainx2, trainx3,trainy)
test_dataset = TensorDataset(testx1, testx2, testx3,testy)
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
# 推荐batch_size 为 16 或者 32
batch_size = 8
# 为训练数据集和验证数据集设计DataLoaders.
train_dataloader = DataLoader(
train_dataset, # 训练数据.
sampler = RandomSampler(train_dataset), # 打乱顺序
batch_size = batch_size
)
validation_dataloader = DataLoader(
test_dataset, # 验证数据.
sampler = RandomSampler(test_dataset), # 打乱顺序
batch_size = batch_size
)
model = enity_identifing(tokenizer.vocab_size,1000,bert_model)
# AdamW 是一个 huggingface library 的类,'W' 是'Weight Decay fix"的意思。
optimizer = AdamW(model.parameters(),
lr = 2e-5, # args.learning_rate - 默认是 5e-5
eps = 1e-8 # args.adam_epsilon - 默认是 1e-8, 是为了防止衰减率分母除到0
)
from transformers import get_linear_schedule_with_warmup
# bert 推荐 epochs 在2到4之间为好。
epochs = 50
# training steps 的数量: [number of batches] x [number of epochs].
total_steps = len(train_dataloader) * epochs
# 设计 learning rate scheduler.
scheduler = get_linear_schedule_with_warmup(optimizer,
num_warmup_steps = 0, # Default value in run_glue.py
num_training_steps = total_steps)
def flat_accuracy(preds, labels,attention):
scores = (preds*attention == labels*attention)
rights = 0
for score in scores:
if sum(score) == len(labels[0]):
rights += 1
#return np.sum((pred_flat == labels_flat)*atten)/ np.sum(atten)
return rights/len(labels)
import time
import datetime
def format_time(elapsed):
elapsed_rounded = int(round((elapsed)))
# 返回 hh:mm:ss 形式的时间
return str(datetime.timedelta(seconds=elapsed_rounded))
import os
import random
import numpy as np
from transformers import WEIGHTS_NAME, CONFIG_NAME
output_dir = './ner1/'
output_model_file = os.path.join(output_dir, WEIGHTS_NAME)
# 代码参考 https://github.com/huggingface/transformers/blob/5bfcd0485ece086ebcbed2d008813037968a9e58/examples/run_glue.py#L128
# 设置随机种子.
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)
# 记录training ,validation loss ,validation accuracy and timings.
maxf = 0.0
def computeF(gold_entity, pre_entity):
'''
根据标注的实体位置和预测的实体位置,计算prf,完全匹配
输入: Python-list 3D,值为每个实体的起始位置列表[begin,end]
输出: float
'''
truenum = 0
prenum = 0
goldnum = 0
for i in range(len(gold_entity)):
goldnum += len(gold_entity[i])
prenum += len(pre_entity[i])
truenum += len(set(gold_entity[i]).intersection(set(pre_entity[i])))
try:
precise = float(truenum) / float(prenum)
recall = float(truenum) / float(goldnum)
f = float(2 * precise * recall / (precise + recall))
except:
precise = recall = f = 0.0
print('本轮实体的F值是 %f' % (f))
return precise, recall, f
def restore_entity_from_labels_on_corpus(predicty, questions):
def restore_entity_from_labels(labels, question):
entitys = []
str = ''
labels = labels[1:-1]
for i in range(min(len(labels), len(question))):
if labels[i] == 1:
str += question[i]
else:
if len(str):
entitys.append(str)
str = ''
if len(str):
entitys.append(str)
return entitys
all_entitys = []
for i in range(len(predicty)):
all_entitys.append(restore_entity_from_labels(predicty[i], questions[i]))
return all_entitys
def train():
training_stats = []
# 设置总时间.
total_t0 = time.time()
best_val_accuracy = 0
for epoch_i in range(0, epochs):
print('Epoch {:} / {:}'.format(epoch_i + 1, epochs))
# 记录每个 epoch 所用的时间
t0 = time.time()
total_train_loss = 0
total_train_accuracy = 0
model.train()
for step, batch in enumerate(train_dataloader):
# 每隔40个batch 输出一下所用时间.
if step % 100 == 0 and not step == 0:
elapsed = format_time(time.time() - t0)
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))
# `batch` 包括3个 tensors:
# [0]: input ids
# [1]: attention masks
# [2]: labels
b_input_ids