import itertools import numpy as np from nlp import chunker, cleaner def undo_sequential(output): """ Transforms a 3D one-hot array of the type (documents,token,category) in a 2D array of the type (documents,token_category). :param output: a one-hot 3D array :return: a 2D array """ return np.argmax(output,axis=2) def get_words(docs, selections): """ Gets the selected words in the provided documents. :param docs: the document to analyze :param selections: the words selected in the documents :return: a dictionary with the documents and for each a list of the selected words """ i = 0 obtained_words = {} for doc, words in docs.items(): k = 0 obtained_words_doc = [] in_word = False for token in selections[i]: if token == 1 and k < len(words): obtained_words_doc.append([words[k]]) in_word = True elif token == 2 and k < len(words) and in_word: obtained_words_doc[len(obtained_words_doc) - 1].append(words[k]) else: in_word = False k += 1 # remove duplicate selections obtained_words_doc.sort() obtained_words_doc = list(w for w, _ in itertools.groupby(obtained_words_doc)) obtained_words[doc] = obtained_words_doc i += 1 return obtained_words def get_top_words(docs,output,words_limit): """ Gets the selected words in the provided documents. :param docs: the document to analyze :param output: the output of the network :param words_limit: how many words to extract :return: a dictionary with the documents and for each a list of the selected words """ selections = undo_sequential(output) i = 0 obtained_words = {} for doc, words in docs.items(): k = 0 obtained_words_doc = [] obtained_words_weights = [] in_word = False for token in selections[i]: if token == 1 and k < len(words): obtained_words_doc.append([words[k]]) obtained_words_weights.append(output[i,k,1]) in_word = True elif token == 2 and k < len(words) and in_word: obtained_words_doc[len(obtained_words_doc) - 1].append(words[k]) obtained_words_weights[len(obtained_words_weights) - 1] = \ obtained_words_weights[len(obtained_words_weights) - 1] + \ ((output[i,k,2] - obtained_words_weights[len(obtained_words_weights) - 1]) / (len(obtained_words_doc[len(obtained_words_doc) - 1]))) # We calculate the average at the nth step this way: # If A_i is the average at the ith step and x_i is the ith item of the sequence, then # A_k = A_{k-1} + ((x_k - A_{k-1}) / k) else: in_word = False k += 1 if words_limit < len(obtained_words_doc): # there are more selections than the limit! cut them obtained_words_and_scores = {} for index, words in enumerate(obtained_words_doc): obtained_words_and_scores[index] = obtained_words_weights[index] sorted_words = sorted(obtained_words_and_scores, key=obtained_words_and_scores.__getitem__,reverse=True) ok_obtained_words = [] cur_word = 0 while len(ok_obtained_words) < words_limit and cur_word < len(sorted_words): if obtained_words_doc[sorted_words[cur_word]] not in ok_obtained_words: ok_obtained_words.append(obtained_words_doc[sorted_words[cur_word]]) cur_word += 1 obtained_words_doc = ok_obtained_words else: # just remove duplicate selections obtained_words_doc.sort() obtained_words_doc = list(w for w, _ in itertools.groupby(obtained_words_doc)) obtained_words[doc] = obtained_words_doc i += 1 return obtained_words def get_valid_patterns(answer_set): """ Remove the answers from a set that do NOT match the keyphrase part-of-speech patterns. :param answer_set: a dictionary of documents and tokenized keyphrases :return: a dictionary of documents and tokenized keyphrases that match the part-of-speech patterns """ doc_filtered = {} for doc, kps in answer_set.items(): filtered_keyphrases = [] for kp in kps: for valid_kp in chunker.extract_valid_tokens(kp): filtered_keyphrases.append(valid_kp) # remove duplicates filtered_keyphrases.sort() filtered_keyphrases = list(w for w, _ in itertools.groupby(filtered_keyphrases)) doc_filtered[doc] = filtered_keyphrases return doc_filtered def clean_answers(answer_set): """ Cleans the keyphrases by removing the tokens that are not PoS tagged with the allowed tags. :param answer_set: a dictionary of documents and tokenized keyphrases :return: a dictionary of documents and their cleaned tokenized keyphrases """ doc_filtered = {} for doc, kps in answer_set.items(): filtered_keyphrases = [] for kp in kps: clean_kp = cleaner.clean_tokens(kp) if clean_kp: filtered_keyphrases.append(clean_kp) # 去重 filtered_keyphrases.sort() filtered_keyphrases = list(w for w, _ in itertools.groupby(filtered_keyphrases)) doc_filtered[doc] = filtered_keyphrases return doc_filtered def get_answers(candidate_tokens,predict_set,predict_result,dictionary): """ Build the dictionary of the selected answer for a QA-based network. :param candidate_tokens: the dictionary of the documents and their candidate KPs :param predict_set: the input of the network :param predict_result: the output of the network :param dictionary: the previously-fit word index :return: the dictionary of the selected KPs """ # Here the ideas is: we go through the dictionary of the candidates, we find the corresponding # model input, and we add the candidate to the answer set if the model predicted class 1 (i.e. that the candidate # was a correct KP # First, get the actual predictions: if np.shape(predict_result)[1] == 1: # If we have just 1 output neuron, reshape and put make the output in 0,1 values predictions_flattened = np.round(np.reshape(predict_result,np.shape(predict_result)[0])) else: # If we're working with categorical output, flatten the (num_samples,2) array to a (num_samples) one # This way transform a 2D array e.g. [[0.6,0.4] ... [0.2,0.8]] to a 1D array e.g. [0...1] predictions_flattened = np.argmax(predict_result, axis=1) i = 0 answers = {} for doc_id, candidate_list in candidate_tokens.items() : answers[doc_id] = [] for candidate in candidate_list: # Sanity check: was the order preserved? assert candidate == dictionary.tokens_to_words(predict_set[1][i]) if predictions_flattened[i] == 1 : answers[doc_id].append(candidate) i += 1 return answers def get_top_answers(candidate_tokens,predict_set,predict_result,dictionary,limit): """ Build the dictionary of the selected answer for a QA-based network. :param candidate_tokens: the dictionary of the documents and their candidate KPs :param predict_set: the input of the network :param predict_result: the output of the network :param dictionary: the previously-fit word index :return: the dictionary of the selected KPs """ # Here the ideas is: we go through the dictionary of the candidates, we find the corresponding # model input, and we add the candidate to the answer set if the model predicted class 1 (i.e. that the candidate # was a correct KP # First, get the actual predictions: if np.shape(predict_result)[1] == 1: # If we have just 1 output neuron, reshape and put the output in 0,1 values predictions_flattened = np.round(np.reshape(predict_result,np.shape(predict_result)[0])) else: # If we're working with categorical output, flatten the (num_samples,2) array to a (num_samples) one # This way transform a 2D array e.g. [[0.6,0.4] ... [0.2,0.8]] to a 1D array e.g. [0...1] predictions_flattened = np.argmax(predict_result, axis=1) i = 0 answers = {} scores = {} for doc_id, candidate_list in candidate_tokens.items() : answers[doc_id] = [] scores[doc_id] = [] for candidate in candidate_list: # Sanity check: was the order preserved? assert candidate == dictionary.tokens_to_words(predict_set[1][i]) if predictions_flattened[i] == 1 : answers[doc_id].append(candidate) if np.shape(predict_result)[1] == 1: scores[doc_id].append(predict_result[i][0]) else: scores[doc_id].append(predict_result[i][1]) i += 1 if len(answers[doc_id]) > limit : answers[doc_id] = [x for _,x in sorted(zip(scores[doc_id],answers[doc_id]),reverse=True)][:limit] return answers