#! -*- coding:utf-8 -*- import os # os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" #(保证程序cuda序号与实际cuda序号对应) # os.environ['CUDA_VISIBLE_DEVICES'] = "0, 1" #(代表仅使用第0,1号GPU) os.environ['TF_KERAS'] = '1' import numpy as np import pandas as pd import tensorflow as tf from logzero import logger from bert4keras.backend import keras, set_gelu from bert4keras.tokenizers import Tokenizer from bert4keras.models import build_transformer_model from bert4keras.optimizers import Adam, extend_with_piecewise_linear_lr from bert4keras.snippets import sequence_padding, DataGenerator from bert4keras.snippets import open from keras.layers import Lambda, Dense config = tf.compat.v1.ConfigProto() config.gpu_options.allow_growth = True # 按需分配显存 tf_session = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=config) tf.compat.v1.keras.backend.set_session(tf_session) from sklearn.model_selection import train_test_split set_gelu('tanh') # 切换gelu版本 MAX_LEN = 128 BATCH_SIZE = 32 AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR') # config_path = '/home/zhaojh/pretrain_models/roberta_base/bert_config.json' # checkpoint_path = '/home/zhaojh/pretrain_models/roberta_base/bert_model.ckpt' # dict_path = '/home/zhaojh/pretrain_models/roberta_base/vocab.txt' # 建立分词器 # tokenizer = Tokenizer(dict_path, do_lower_case=True) # def label_to_digit(data_df: pd.DataFrame, labels): # data = list() # for i in range(data_df.shape[0]): # data.append((data_df.iloc[i]['text'], int(labels.index(data_df.iloc[i]['label'])))) # return data # def load_data(all_data:pd.DataFrame): # """加载数据 # 单条格式:(文本, 标签id) # """ # logger.info(f"数据集大小:{all_data.shape}") # assert ('text' in all_data.columns and 'label' in all_data.columns) # use_data = all_data[['text', 'label']].copy() # labels = use_data.label.unique().tolist() # train, valid = train_test_split(use_data, test_size=0.3, random_state=42, shuffle=True) # valid, test = train_test_split(valid, test_size=0.7, random_state=42, shuffle=True) # # 转换数据集 # train_data = label_to_digit(train, labels) # valid_data = label_to_digit(valid, labels) # test_data = label_to_digit(test, labels) # train_generator = MyDataGenerator(train_data, BATCH_SIZE) # valid_generator = MyDataGenerator(valid_data, BATCH_SIZE) # test_generator = MyDataGenerator(test_data, BATCH_SIZE) # return train_generator, valid_generator, test_generator, labels # class MyDataGenerator(DataGenerator): # """数据生成器 # """ # def __iter__(self, random=False): # batch_token_ids, batch_segment_ids, batch_labels = [], [], [] # for is_end, (text, label) in self.sample(random): # token_ids, segment_ids = tokenizer.encode(text, maxlen=MAX_LEN) # batch_token_ids.append(token_ids) # batch_segment_ids.append(segment_ids) # batch_labels.append([label]) # if len(batch_token_ids) == self.batch_size or is_end: # batch_token_ids = sequence_padding(batch_token_ids) # batch_segment_ids = sequence_padding(batch_segment_ids) # batch_labels = sequence_padding(batch_labels) # yield [batch_token_ids, batch_segment_ids], batch_labels # batch_token_ids, batch_segment_ids, batch_labels = [], [], [] def build_model(config_path, ckpt_path, num_classes, lr): # 加载预训练模型 bert = build_transformer_model( config_path=config_path, checkpoint_path=ckpt_path, model='bert', return_keras_model=False, ) output = Lambda(lambda x: x[:, 0], name='CLS-token')(bert.model.output) if num_classes == 2: activitor = 'sigmoid' else: activitor = 'softmax' output = Dense( units=num_classes, activation=activitor, kernel_initializer=bert.initializer )(output) model = keras.models.Model(bert.model.input, output) # 派生为带分段线性学习率的优化器。 AdamLR = extend_with_piecewise_linear_lr(Adam, name='AdamLR') model.compile( loss='sparse_categorical_crossentropy', optimizer=AdamLR(learning_rate=lr, lr_schedule={ 1000: 1, 2000: 0.1 }), metrics=['accuracy'], ) return model def evaluate(data, model): total, right = 0., 0. for x_true, y_true in data: y_pred = model.predict(x_true).argmax(axis=1) y_true = y_true[:, 0] total += len(y_true) right += (y_true == y_pred).sum() return right / total # class Evaluator(keras.callbacks.Callback): # """评估与保存 # """ # def __init__(self, model, valid_generator, test_generator): # super().__init__() # self.best_val_acc = 0. # self.model = model # self.valid = valid_generator # self.test = test_generator # def on_epoch_end(self, epoch, logs=None): # val_acc = evaluate(self.valid, self.model) # if val_acc > self.best_val_acc: # self.best_val_acc = val_acc # self.model.save_weights('../models/text_classifier/best_model.weights') # test_acc = evaluate(self.test, self.model) # logger.info(f"val_acc: {val_acc}, best_val_acc:{self.best_val_acc}, test_acc: {test_acc}") # def train_model(train_generator, valid_generator, test_generator, num_classes, epochs, lr): # model = build_model(num_classes, lr) # evaluator = Evaluator(model, valid_generator, test_generator) # model.fit( # train_generator.forfit(), # steps_per_epoch=len(train_generator), # epochs=epochs, # callbacks=[evaluator] # ) # model.load_weights('../models/text_classifier/best_model.weights') # return model def run_cls(model, tokenizer, test_text: str, labels=["negative", "positive"]): token_ids, segment_ids = tokenizer.encode(test_text, maxlen=MAX_LEN) tok_ids = sequence_padding([token_ids]) seg_ids = sequence_padding([segment_ids]) predicted_data = model.predict([tok_ids, seg_ids])[0] return labels[np.argmax(predicted_data)] # def train(df:pd.DataFrame, epochs:int=20, lr:float=1e-3): # train, valid, test, labels = load_data(df) # model = train_model(train, valid, test, len(labels), epochs, lr) # # score = evaluate(test.forfit(), model) # model.save('../models/text_classifier/best_model.h5') # return model, labels def load_cls_model(config_path: str, ckpt_path: str, weight_path: str): """load local classification model Args: config_path (str): config.json ckpt_path (str): ckpt weight_path (str): best weight Returns: keras.Model: pretrained model """ model = build_model(config_path, ckpt_path, 2, 0.001) model.load_weights(weight_path) return model if __name__ == '__main__': from keras.models import load_model dict_path = '../models/tokenizer/vocab.txt' tokenizer = Tokenizer(dict_path, do_lower_case=True) model = load_model('../models/text_classifier/best_model.h5') rst = run_cls(model, tokenizer, "这部电影太棒了") print(rst)