ai_platform_nlu/nlp/text_classification.py

202 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! -*- coding:utf-8 -*-
import os
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" #保证程序cuda序号与实际cuda序号对应
# os.environ['CUDA_VISIBLE_DEVICES'] = "0, 1" #代表仅使用第01号GPU
os.environ['TF_KERAS'] = '1'
import numpy as np
import pandas as pd
import tensorflow as tf
from logzero import logger
from bert4keras.backend import keras, set_gelu
from bert4keras.tokenizers import Tokenizer
from bert4keras.models import build_transformer_model
from bert4keras.optimizers import Adam, extend_with_piecewise_linear_lr
from bert4keras.snippets import sequence_padding, DataGenerator
from bert4keras.snippets import open
from keras.layers import Lambda, Dense
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True # 按需分配显存
tf_session = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=config)
tf.compat.v1.keras.backend.set_session(tf_session)
from sklearn.model_selection import train_test_split
set_gelu('tanh') # 切换gelu版本
MAX_LEN = 128
BATCH_SIZE = 32
AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR')
# config_path = '/home/zhaojh/pretrain_models/roberta_base/bert_config.json'
# checkpoint_path = '/home/zhaojh/pretrain_models/roberta_base/bert_model.ckpt'
# dict_path = '/home/zhaojh/pretrain_models/roberta_base/vocab.txt'
# 建立分词器
# tokenizer = Tokenizer(dict_path, do_lower_case=True)
# def label_to_digit(data_df: pd.DataFrame, labels):
# data = list()
# for i in range(data_df.shape[0]):
# data.append((data_df.iloc[i]['text'], int(labels.index(data_df.iloc[i]['label']))))
# return data
# def load_data(all_data:pd.DataFrame):
# """加载数据
# 单条格式:(文本, 标签id)
# """
# logger.info(f"数据集大小:{all_data.shape}")
# assert ('text' in all_data.columns and 'label' in all_data.columns)
# use_data = all_data[['text', 'label']].copy()
# labels = use_data.label.unique().tolist()
# train, valid = train_test_split(use_data, test_size=0.3, random_state=42, shuffle=True)
# valid, test = train_test_split(valid, test_size=0.7, random_state=42, shuffle=True)
# # 转换数据集
# train_data = label_to_digit(train, labels)
# valid_data = label_to_digit(valid, labels)
# test_data = label_to_digit(test, labels)
# train_generator = MyDataGenerator(train_data, BATCH_SIZE)
# valid_generator = MyDataGenerator(valid_data, BATCH_SIZE)
# test_generator = MyDataGenerator(test_data, BATCH_SIZE)
# return train_generator, valid_generator, test_generator, labels
# class MyDataGenerator(DataGenerator):
# """数据生成器
# """
# def __iter__(self, random=False):
# batch_token_ids, batch_segment_ids, batch_labels = [], [], []
# for is_end, (text, label) in self.sample(random):
# token_ids, segment_ids = tokenizer.encode(text, maxlen=MAX_LEN)
# batch_token_ids.append(token_ids)
# batch_segment_ids.append(segment_ids)
# batch_labels.append([label])
# if len(batch_token_ids) == self.batch_size or is_end:
# batch_token_ids = sequence_padding(batch_token_ids)
# batch_segment_ids = sequence_padding(batch_segment_ids)
# batch_labels = sequence_padding(batch_labels)
# yield [batch_token_ids, batch_segment_ids], batch_labels
# batch_token_ids, batch_segment_ids, batch_labels = [], [], []
def build_model(config_path, ckpt_path, num_classes, lr):
# 加载预训练模型
bert = build_transformer_model(
config_path=config_path,
checkpoint_path=ckpt_path,
model='bert',
return_keras_model=False,
)
output = Lambda(lambda x: x[:, 0], name='CLS-token')(bert.model.output)
if num_classes == 2:
activitor = 'sigmoid'
else:
activitor = 'softmax'
output = Dense(
units=num_classes,
activation=activitor,
kernel_initializer=bert.initializer
)(output)
model = keras.models.Model(bert.model.input, output)
# 派生为带分段线性学习率的优化器。
AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR')
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=AdamLR(learning_rate=lr, lr_schedule={
1000: 1,
2000: 0.1
}),
metrics=['accuracy'],
)
return model
def evaluate(data, model):
total, right = 0., 0.
for x_true, y_true in data:
y_pred = model.predict(x_true).argmax(axis=1)
y_true = y_true[:, 0]
total += len(y_true)
right += (y_true == y_pred).sum()
return right / total
# class Evaluator(keras.callbacks.Callback):
# """评估与保存
# """
# def __init__(self, model, valid_generator, test_generator):
# super().__init__()
# self.best_val_acc = 0.
# self.model = model
# self.valid = valid_generator
# self.test = test_generator
# def on_epoch_end(self, epoch, logs=None):
# val_acc = evaluate(self.valid, self.model)
# if val_acc > self.best_val_acc:
# self.best_val_acc = val_acc
# self.model.save_weights('../models/text_classifier/best_model.weights')
# test_acc = evaluate(self.test, self.model)
# logger.info(f"val_acc: {val_acc}, best_val_acc:{self.best_val_acc}, test_acc: {test_acc}")
# def train_model(train_generator, valid_generator, test_generator, num_classes, epochs, lr):
# model = build_model(num_classes, lr)
# evaluator = Evaluator(model, valid_generator, test_generator)
# model.fit(
# train_generator.forfit(),
# steps_per_epoch=len(train_generator),
# epochs=epochs,
# callbacks=[evaluator]
# )
# model.load_weights('../models/text_classifier/best_model.weights')
# return model
def run_cls(model, tokenizer, test_text: str, labels=["negative", "positive"]):
token_ids, segment_ids = tokenizer.encode(test_text, maxlen=MAX_LEN)
tok_ids = sequence_padding([token_ids])
seg_ids = sequence_padding([segment_ids])
predicted_data = model.predict([tok_ids, seg_ids])[0]
return labels[np.argmax(predicted_data)]
# def train(df:pd.DataFrame, epochs:int=20, lr:float=1e-3):
# train, valid, test, labels = load_data(df)
# model = train_model(train, valid, test, len(labels), epochs, lr)
# # score = evaluate(test.forfit(), model)
# model.save('../models/text_classifier/best_model.h5')
# return model, labels
def load_cls_model(config_path: str, ckpt_path: str, weight_path: str):
"""load local classification model
Args:
config_path (str): config.json
ckpt_path (str): ckpt
weight_path (str): best weight
Returns:
keras.Model: pretrained model
"""
model = build_model(config_path, ckpt_path, 2, 0.001)
model.load_weights(weight_path)
return model
if __name__ == '__main__':
from keras.models import load_model
dict_path = '../models/tokenizer/vocab.txt'
tokenizer = Tokenizer(dict_path, do_lower_case=True)
model = load_model('../models/text_classifier/best_model.h5')
rst = run_cls(model, tokenizer, "这部电影太棒了")
print(rst)