ai_platform_nlu/nlp/text_classification.py

202 lines
7.2 KiB
Python
Raw Permalink Normal View History

2022-12-07 10:49:21 +08:00
#! -*- coding:utf-8 -*-
import os
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" #保证程序cuda序号与实际cuda序号对应
# os.environ['CUDA_VISIBLE_DEVICES'] = "0, 1" #代表仅使用第01号GPU
2022-12-08 15:16:57 +08:00
os.environ['TF_KERAS'] = '1'
2022-12-07 10:49:21 +08:00
import numpy as np
import pandas as pd
import tensorflow as tf
from logzero import logger
from bert4keras.backend import keras, set_gelu
from bert4keras.tokenizers import Tokenizer
from bert4keras.models import build_transformer_model
from bert4keras.optimizers import Adam, extend_with_piecewise_linear_lr
from bert4keras.snippets import sequence_padding, DataGenerator
from bert4keras.snippets import open
from keras.layers import Lambda, Dense
config = tf.compat.v1.ConfigProto()
2022-12-08 15:16:57 +08:00
config.gpu_options.allow_growth = True # 按需分配显存
tf_session = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=config)
2022-12-07 10:49:21 +08:00
tf.compat.v1.keras.backend.set_session(tf_session)
from sklearn.model_selection import train_test_split
set_gelu('tanh') # 切换gelu版本
MAX_LEN = 128
BATCH_SIZE = 32
AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR')
2022-12-08 15:16:57 +08:00
2022-12-07 10:49:21 +08:00
# config_path = '/home/zhaojh/pretrain_models/roberta_base/bert_config.json'
# checkpoint_path = '/home/zhaojh/pretrain_models/roberta_base/bert_model.ckpt'
# dict_path = '/home/zhaojh/pretrain_models/roberta_base/vocab.txt'
# 建立分词器
# tokenizer = Tokenizer(dict_path, do_lower_case=True)
# def label_to_digit(data_df: pd.DataFrame, labels):
# data = list()
# for i in range(data_df.shape[0]):
# data.append((data_df.iloc[i]['text'], int(labels.index(data_df.iloc[i]['label']))))
# return data
# def load_data(all_data:pd.DataFrame):
# """加载数据
# 单条格式:(文本, 标签id)
# """
# logger.info(f"数据集大小:{all_data.shape}")
# assert ('text' in all_data.columns and 'label' in all_data.columns)
# use_data = all_data[['text', 'label']].copy()
# labels = use_data.label.unique().tolist()
# train, valid = train_test_split(use_data, test_size=0.3, random_state=42, shuffle=True)
# valid, test = train_test_split(valid, test_size=0.7, random_state=42, shuffle=True)
# # 转换数据集
# train_data = label_to_digit(train, labels)
# valid_data = label_to_digit(valid, labels)
# test_data = label_to_digit(test, labels)
# train_generator = MyDataGenerator(train_data, BATCH_SIZE)
# valid_generator = MyDataGenerator(valid_data, BATCH_SIZE)
# test_generator = MyDataGenerator(test_data, BATCH_SIZE)
# return train_generator, valid_generator, test_generator, labels
# class MyDataGenerator(DataGenerator):
# """数据生成器
# """
# def __iter__(self, random=False):
# batch_token_ids, batch_segment_ids, batch_labels = [], [], []
# for is_end, (text, label) in self.sample(random):
# token_ids, segment_ids = tokenizer.encode(text, maxlen=MAX_LEN)
# batch_token_ids.append(token_ids)
# batch_segment_ids.append(segment_ids)
# batch_labels.append([label])
# if len(batch_token_ids) == self.batch_size or is_end:
# batch_token_ids = sequence_padding(batch_token_ids)
# batch_segment_ids = sequence_padding(batch_segment_ids)
# batch_labels = sequence_padding(batch_labels)
# yield [batch_token_ids, batch_segment_ids], batch_labels
# batch_token_ids, batch_segment_ids, batch_labels = [], [], []
def build_model(config_path, ckpt_path, num_classes, lr):
# 加载预训练模型
bert = build_transformer_model(
config_path=config_path,
checkpoint_path=ckpt_path,
model='bert',
return_keras_model=False,
)
output = Lambda(lambda x: x[:, 0], name='CLS-token')(bert.model.output)
if num_classes == 2:
activitor = 'sigmoid'
else:
activitor = 'softmax'
output = Dense(
units=num_classes,
activation=activitor,
kernel_initializer=bert.initializer
)(output)
model = keras.models.Model(bert.model.input, output)
# 派生为带分段线性学习率的优化器。
AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR')
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=AdamLR(learning_rate=lr, lr_schedule={
1000: 1,
2000: 0.1
}),
metrics=['accuracy'],
)
return model
def evaluate(data, model):
total, right = 0., 0.
for x_true, y_true in data:
y_pred = model.predict(x_true).argmax(axis=1)
y_true = y_true[:, 0]
total += len(y_true)
right += (y_true == y_pred).sum()
return right / total
# class Evaluator(keras.callbacks.Callback):
# """评估与保存
# """
# def __init__(self, model, valid_generator, test_generator):
# super().__init__()
# self.best_val_acc = 0.
# self.model = model
# self.valid = valid_generator
# self.test = test_generator
# def on_epoch_end(self, epoch, logs=None):
# val_acc = evaluate(self.valid, self.model)
# if val_acc > self.best_val_acc:
# self.best_val_acc = val_acc
# self.model.save_weights('../models/text_classifier/best_model.weights')
# test_acc = evaluate(self.test, self.model)
# logger.info(f"val_acc: {val_acc}, best_val_acc:{self.best_val_acc}, test_acc: {test_acc}")
# def train_model(train_generator, valid_generator, test_generator, num_classes, epochs, lr):
# model = build_model(num_classes, lr)
# evaluator = Evaluator(model, valid_generator, test_generator)
# model.fit(
# train_generator.forfit(),
# steps_per_epoch=len(train_generator),
# epochs=epochs,
# callbacks=[evaluator]
# )
# model.load_weights('../models/text_classifier/best_model.weights')
# return model
2022-12-08 15:16:57 +08:00
def run_cls(model, tokenizer, test_text: str, labels=["negative", "positive"]):
2022-12-07 10:49:21 +08:00
token_ids, segment_ids = tokenizer.encode(test_text, maxlen=MAX_LEN)
tok_ids = sequence_padding([token_ids])
seg_ids = sequence_padding([segment_ids])
predicted_data = model.predict([tok_ids, seg_ids])[0]
return labels[np.argmax(predicted_data)]
# def train(df:pd.DataFrame, epochs:int=20, lr:float=1e-3):
# train, valid, test, labels = load_data(df)
# model = train_model(train, valid, test, len(labels), epochs, lr)
# # score = evaluate(test.forfit(), model)
# model.save('../models/text_classifier/best_model.h5')
# return model, labels
2022-12-08 15:16:57 +08:00
def load_cls_model(config_path: str, ckpt_path: str, weight_path: str):
2022-12-07 10:49:21 +08:00
"""load local classification model
Args:
config_path (str): config.json
ckpt_path (str): ckpt
weight_path (str): best weight
Returns:
keras.Model: pretrained model
"""
model = build_model(config_path, ckpt_path, 2, 0.001)
model.load_weights(weight_path)
return model
2022-12-08 15:16:57 +08:00
2022-12-07 10:49:21 +08:00
if __name__ == '__main__':
from keras.models import load_model
2022-12-08 15:16:57 +08:00
2022-12-07 10:49:21 +08:00
dict_path = '../models/tokenizer/vocab.txt'
tokenizer = Tokenizer(dict_path, do_lower_case=True)
model = load_model('../models/text_classifier/best_model.h5')
rst = run_cls(model, tokenizer, "这部电影太棒了")
2022-12-08 15:16:57 +08:00
print(rst)