NLP工具——自制zero-shot事件抽取器

  • 0. 简介
  • 1. 抽取全部潜在的事件
  • 2. 抽取特定类型的事件
  • 3. 结语

0. 简介

在事件抽取任务中,数据的获取是一件非常关键工作,由于数据标注的成本较高,高价值数据获取较难,所以few-shot和zero-shot的任务一直是事件抽取领域研究的一个重点。

今天介绍的这个工具是我利用stanza句法分析写的,写出来已经有很长的时间了。介绍这个工具的目的不是说它也是一个针对零样本学习或是小样本学习的研究,它就是一个简单的应用工具,完全是基于规则写的,没有任何技术含量,它的有效性也完全来自于stanza的句法分析功能。

但是在实际应用中,我们可以利用这些句法结构,采用弱监督的策略去生成一批silver数据,然后再投入人工标注,在silver数据的基础上形成gold数据,这样一来就可以有效地减少人工标注的成本。

对于stanza工具不是很熟悉的同学,可以看一下我之前的一篇博客(实际上,本文所介绍的工具也是同一时期实现的):
NLP工具——Stanza依存关系含义详解

1. 抽取全部潜在的事件

这个抽取其实就是在拼接重组句子的整个主谓宾定状补的结构,废话不多说,直接上代码;

class DepEventExtractor:"""【依存关系事件抽取】约定变量命名规则:dep_res: self.get_dep_res()的结果, 具有结构:[{'sent': xxx, 'sent_id': 1, 'dep': []}, {'sent': xxx, 'sent_id': 2, 'dep': []}]其中dep在_extract系列函数中, 记作dep_tree, 其中的元素的id, 即第几个token, 记作cur_root_id---------------Upgrade(v2):1. 考虑被动语态2. 考虑弱动词占位,强动词后移的情况3. 考虑从句结构---------------Upgrade(v3):1. 加入词性判断2. 加入时态词和否定词3. 增加无连词并列结构---------------Upgrade(v4):1. 对状语进一步区分时间地点---------------ver: 2021-12-07by: changhongyu"""def __init__(self, stanza_model):""":param stanza_model: stanza.Pipeline: 实例化的stanza模型"""self.stanza_model = stanza_modelself.debug_pat_name = 'xxx'@staticmethoddef get_depparse_res(doc_res):final_res = []for i, item in enumerate(doc_res.sentences):sent = item.textdep_array = []for word in item.dependencies:# print(word)if not word[2].head:head_id = -1else:head_id = word[2].head - 1dep_array.append({"head": word[0].text, "dep_relation": word[1], "words": word[2].text,"id": word[2].id - 1, "head_id": head_id, "upos": word[2].upos, "lemma": word[2].lemma, "ner": item.tokens[word[2].id - 1].ner})final_res.append({"sent": sent, "sent_id": i + 1, "dep": dep_array})return final_resdef get_dep_res(self, text):"""获取依存分析结果"""doc_res = self.stanza_model(text)dep_res = self.get_depparse_res(doc_res)return dep_res@staticmethoddef _extract_trigger(dep_tree):"""抽取ROOT -root-> trigger"""for i, dep in enumerate(dep_tree):if dep['head'] == 'ROOT':dep['token_start'] = i  # 在原句中的第几个tokendep['token_end'] = ireturn i, depreturn None, Nonedef _extract_pattern(self, dep_tree, pattern_name, cur_root_id, reverse=False):"""抽取某种结构如果多个主语,后出现的对nsubj会有conj依存:param reverse: bool: 如果reverse, 则主语宾语反向抽取"""patterns = []if not reverse:id_to_extract = 'head_id'else:id_to_extract = 'id'# root_head = dep_tree[cur_root_id]['dep'].split()[0]  # 因为dep的内容会增加,所以取0for i, dep in enumerate(dep_tree):if pattern_name.endswith('*'):case = dep['dep_relation'].startswith(pattern_name[:-1]) and dep[id_to_extract] == cur_root_idelse:case = dep['dep_relation'] == pattern_name and dep[id_to_extract] == cur_root_idif pattern_name == self.debug_pat_name:# 调试用print(dep['dep_relation'], pattern_name)print(dep['head_id'], cur_root_id)print('---')# if dep['dep_relation'] == pattern_name and dep['head'] == root_head:if case:dep['token_start'] = idep['token_end'] = ipatterns.append([i, dep])return patternsdef _fill_trigger(self, dep_tree, trigger_id, trigger):"""补全触发词"""trigger_x_patterns = self._extract_pattern(dep_tree, 'compound*', trigger_id)if len(trigger_x_patterns) > 1:print("Warning: More than 1 nummod pattern occurred at trigger: ", trigger)for trigger_x_id, trigger_x in trigger_x_patterns:trigger['token_start'] = min(trigger['token_start'], trigger_x['token_start'])trigger['token_end'] = max(trigger['token_end'], trigger_x['token_end'])trigger['words'] = ''.join(tok + ' ' for tok in self.tokens[trigger['token_start']: trigger['token_end']+1])[: -1]# 补齐辅助词trigger = self._fill_with_aux(dep_tree, trigger_id, trigger)return triggerdef _fill_with_flat(self, dep_tree, node_id, node):"""补齐扁平多词并列结构调用: self._extract_pattern()Example: Donald --> Doanld J. Trump"""flat_patterns = self._extract_pattern(dep_tree, 'flat', node_id)for flat_pat_id, flat_pat in flat_patterns:node['words'] += ' 'node['words'] += flat_pat['words']node['token_end'] = flat_pat_idreturn nodedef _fill_with_compound(self, dep_tree, node_id, node):"""补齐名词并列结构,只保留紧邻的并列结构compound分为单一主从compound和连续compound调用: self._extract_pattern()Example: cream --> ice cream"""compound_patterns = self._extract_pattern(dep_tree, 'compound', node_id, reverse=False)for compound_pat_id, compound_pat in compound_patterns:# 对并列结构补齐并列结构compound_pat = self._fill_with_compound(dep_tree, compound_pat_id, compound_pat)node['token_start'] = min(node['token_start'], compound_pat['token_start'])node['token_end'] = max(node['token_end'], compound_pat['token_end'])node['words'] = ''.join(tok + ' ' for tok in self.tokens[node['token_start']: node['token_end']+1])[: -1]return nodedef _fill_with_amod(self, dep_tree, node_id, node):"""补齐形容词调用: self._extract_pattern()Example: apple --> big red apple"""amod_patterns = self._extract_pattern(dep_tree, 'amod', node_id)for amod_pat_id, amod_pat in amod_patterns:# 对修饰语补全amod_pat = self._fill_a_node(dep_tree, amod_pat_id, amod_pat)node['token_start'] = min(node['token_start'], amod_pat['token_start'])node['token_end'] = max(node['token_end'], amod_pat['token_end'])node['words'] = ''.join(tok + ' ' for tok in self.tokens[node['token_start']: node['token_end']+1])[: -1]return nodedef _fill_with_nummod(self, dep_tree, node_id, node):"""补齐数字修饰语数字修饰语与节点紧邻,只有一个token,且不会与扁平结构同时出现调用: self._extract_pattern()Example: dollars --> forty dollars"""nummod_patterns = self._extract_pattern(dep_tree, 'nummod', node_id)if len(nummod_patterns) > 1:print("Warning: More than 1 nummod pattern occurred at node: ", node)for nummod_pat_id, nummod_pat in nummod_patterns:node['token_start'] = min(node['token_start'], nummod_pat['token_start'])node['token_end'] = max(node['token_end'], nummod_pat['token_end'])node['words'] = ''.join(tok + ' ' for tok in self.tokens[node['token_start']: node['token_end']+1])[: -1]return nodedef _fill_with_det(self, dep_tree, node_id, node):"""补齐限定词限定词包括冠词和疑问代词,不会与数字修饰或扁平结构同时出现调用: self._extract_pattern()Example: apple --> an apple"""det_patterns = self._extract_pattern(dep_tree, 'det', node_id)if len(det_patterns) > 1:print("Warning: More than 1 det pattern occurred at node: ", node)for det_pat_id, det_pat in det_patterns:node['words'] = det_pat['words'] + ' ' + node['words']node['token_start'] = det_pat_idreturn nodedef _fill_with_nmod(self, dep_tree, node_id, node):"""补全名词修饰结构直接把span拓展到修饰结构的结尾调用: self._extract_pattern()self._fill_a_node()Example: apple --> a couple of apples"""nmod_patterns = self._extract_pattern(dep_tree, 'nmod*', node_id)if len(nmod_patterns) > 1:print("Warning: More than 1 nmod pattern occurred at node: ", node)for nmod_pat_id, nmod_pat in nmod_patterns:# 对修饰语补全nmod_pat = self._fill_a_node(dep_tree, nmod_pat_id, nmod_pat)node['token_start'] = min(node['token_start'], nmod_pat['token_start'])node['token_end'] = max(node['token_end'], nmod_pat['token_end'])node['words'] = ''.join(tok + ' ' for tok in self.tokens[node['token_start']: node['token_end']+1])[: -1]return nodedef _fill_with_case(self, dep_tree, node_id, node):"""为状语补齐介词Example: last week --> during last week"""case_patterns = self._extract_pattern(dep_tree, 'case', node_id)if len(case_patterns) > 1:print("Warning: More than 1 case pattern occurred at node: ", node)for case_pat_id, case_pat in case_patterns:node['words'] = case_pat['words'] + ' ' + node['words']node['token_start'] = case_pat_idreturn nodedef _fill_with_aux(self, dep_tree, node_id, node):"""为动词补齐时态辅助词Example: go --> will go"""aux_patterns = self._extract_pattern(dep_tree, 'aux', node_id)if len(aux_patterns) > 1:print("Warning: More than 1 aux pattern occurred at node: ", node)for aux_pat_id, aux_pat in aux_patterns:node['words'] = aux_pat['words'] + ' ' + node['words']node['token_start'] = aux_pat_idreturn nodedef _fill_a_node(self, dep_tree, node_id, node, is_obl=False):"""对一个节点(一般为名词节点)进行补齐调用: self._fill_with_flat()self._fill_with_nummod()self._fill_with_det()"""# 补齐扁平多词并列结构node = self._fill_with_flat(dep_tree, node_id, node)# 补齐名词并列结构node = self._fill_with_compound(dep_tree, node_id, node)# 补齐形容词node = self._fill_with_amod(dep_tree, node_id, node)# 补齐数字修饰语node = self._fill_with_nummod(dep_tree, node_id, node)# 补齐限定词node = self._fill_with_det(dep_tree, node_id, node)# 补齐名词修饰结构node = self._fill_with_nmod(dep_tree, node_id, node)if is_obl:node = self._fill_with_case(dep_tree, node_id, node)return nodedef _get_conj_patterns(self, dep_tree, node_id):"""获取某个节点的并列结构调用: self._extract_pattern()self._fill_a_node()"""conj_patterns = self._extract_pattern(dep_tree, 'conj', node_id)for conj_pat_id, conj_pat in conj_patterns:# 补齐节点conj_pat = self._fill_a_node(dep_tree, conj_pat_id, conj_pat)return conj_patternsdef _adjust_obl_type_with_ner(self, node):"""使用ner的结果对状语类型进行调整,调整为时间状语、地点状语:param node: dict: {'head', 'dep_relations', 'words', 'id', 'head_id', 'upos', 'lemma', 'ner'}:return type_: str: 'obl', 'obl:tmod', 'obl:lmod'"""if 'DATE' in node['ner']:return 'obl:tmod'elif 'GPE' in node['ner']:return 'obl:lmod'elif 'PERSON' in node['ner']:return 'obl:pmod'else:return 'obl'def _get_core_arguments(self, dep_tree, argument_type, trigger_id):"""获取一个事件的核心论元调用: self._extract_pattern()self._fill_a_node()self._get_conj_patterns()Notes: 尽管某一个角色的核心论元可能有多个, 但是句法结构上与上层节点相连的只有一个---------------:param dep_tree::param argument_type: 获取nsubj, obj, or iobj:param trigger_id: 该事件的触发词位置:return argument: 直接与上层节点相连的核心论元:return conj_arguments: 与argument连词并列的其他核心论元list"""assert argument_type in ['nsubj', 'obj', 'iobj', 'nsubj:pass']arguments = self._extract_pattern(dep_tree, argument_type, trigger_id)if len(arguments):argument_id, argument = arguments[0]# 补齐节点argument = self._fill_a_node(dep_tree, argument_id, argument)# 获取连词结构conj_arguments = self._get_conj_patterns(dep_tree, argument_id)          return argument, conj_argumentsreturn None, Nonedef _get_none_core_arguments(self, dep_tree, argument_type, trigger_id):"""获取非核心论元非核心论元允许同时存在多个"""assert argument_type in ['obl', 'obl:tmod']arguments = self._extract_pattern(dep_tree, argument_type, trigger_id)all_arguments = []for argument_id, argument in arguments:# 补齐节点argument = self._fill_a_node(dep_tree, argument_id, argument, is_obl=True)# 获取连词结构conj_arguments = [item[1] for item in self._get_conj_patterns(dep_tree, argument_id)]all_arguments.append(argument)all_arguments.extend(conj_arguments)return all_argumentsdef _extract_event(self, dep_tree, trigger, trigger_id):"""给定触发词位置,抽取一个事件"""event = dict()# 取名词主语nsubj, conj_patterns_nsubj = self._get_core_arguments(dep_tree, 'nsubj', trigger_id)# 取宾语obj, conj_patterns_obj = self._get_core_arguments(dep_tree, 'obj', trigger_id)# 取间接宾语iobj, conj_patterns_iobj = self._get_core_arguments(dep_tree, 'iobj', trigger_id)# 取一般状语, 时间状语也可能出现在这其中obls = self._get_none_core_arguments(dep_tree, 'obl', trigger_id)# 取时间状语oblts = self._get_none_core_arguments(dep_tree, 'obl:tmod', trigger_id)if not nsubj:# 被动语态主语nsubj_pass, conj_patterns_nsubj_pass = self._get_core_arguments(dep_tree, 'nsubj:pass', trigger_id)# 事件整合event['trigger'] = triggerif nsubj:event['nsubj'] = [nsubj] + [pat[1] for pat in conj_patterns_nsubj]elif nsubj_pass:event['nsubj:pass'] = [nsubj_pass] + [pat[1] for pat in conj_patterns_nsubj_pass]if obj:event['obj'] = [obj] + [pat[1] for pat in conj_patterns_obj]if iobj:event['iobj'] = [iobj] + [pat[1] for pat in conj_patterns_iobj]if obls:for obl in obls:type_ = self._adjust_obl_type_with_ner(obl)if type_ not in event:event[type_] = [obl]else:event[type_].append(obl)if oblts:if 'obl:tmod' in event:event['obl:tmod'].extend(oblts)else:event['obl:tmod'] = obltsreturn eventdef _extract_clausal_event(self, dep_tree, trigger, trigger_id, sent_id, clausal_type='ccomp'):"""抽取从句"""clausal_events = []assert clausal_type in ['ccomp', 'xcomp']comp_patterns = self._extract_pattern(dep_tree, clausal_type, trigger_id)for comp_trigger_id, comp_trigger in comp_patterns:comp_trigger = self._fill_trigger(dep_tree, comp_trigger_id, comp_trigger)clausal_event = self._extract_event(dep_tree, comp_trigger, comp_trigger_id)clausal_event['head_trigger'] = triggerclausal_event['sent_id'] = sent_idclausal_events.append(clausal_event)return clausal_eventsdef __call__(self, text, debug=None, return_dict=False, del_useless=True):"""获取事件列表:param text: str:param debug: str: debug的依存关系类型:param return_dict: bool: 是否返回dict:param del_useless: bool: 是否删除无用key:return event_list: list of dict"""if debug:self.debug_pat_name = debugevent_list = []dep_res = self.get_dep_res(text)sents = [dep_tree['sent'] for dep_tree in dep_res]sent_tokens = []# 对每一句话进行抽取, 不考虑从句的话,每一句话一个trigger,生成一个事件for sent_id, dep_tree in enumerate(dep_res):dep_tree = dep_tree['dep']self.tokens = [node['words'] for node in dep_tree]sent_tokens.append(self.tokens)# 取主动词做triggertrigger_id, trigger = self._extract_trigger(dep_tree)trigger['tri_dep_id'] = trigger_idif not trigger:continuetrigger = self._fill_trigger(dep_tree, trigger_id, trigger)event = self._extract_event(dep_tree, trigger, trigger_id)event['sent_id'] = sent_id# 找从句事件clausal_comp = self._extract_clausal_event(dep_tree, trigger, trigger_id, sent_id, 'ccomp')open_clausal_comp = self._extract_clausal_event(dep_tree, trigger, trigger_id, sent_id, 'xcomp')if len(clausal_comp):event['clausal_comp'] = clausal_compif len(open_clausal_comp):event['open_clausal_comp'] = open_clausal_compevent_list.append(event)# 找trigger的并列事件conj_patterns = self._extract_pattern(dep_tree, 'conj', trigger_id)# 无连词并列事件parataxis_patterns = self._extract_pattern(dep_tree, 'parataxis', trigger_id)conj_patterns.extend(parataxis_patterns)for conj_trigger_id, conj_trigger in conj_patterns:event = self._extract_event(dep_tree, conj_trigger, conj_trigger_id)event['sent_id'] = sent_id# 找并列事件的从句事件clausal_comp_conj = self._extract_clausal_event(dep_tree, conj_trigger, conj_trigger_id, sent_id, 'ccomp')open_clausal_comp_conj = self._extract_clausal_event(dep_tree, conj_trigger, conj_trigger_id, sent_id, 'xcomp')if len(clausal_comp_conj):event['clausal_comp'] = clausal_comp_conjif len(open_clausal_comp_conj):event['open_clausal_comp'] = open_clausal_comp_conjevent_list.append(event)if del_useless:event_list = self.del_useless_key(event_list)event_list = [event for event in event_list if len(event) > 2]if return_dict:return sents, sent_tokens, event_listelse:return EventResult(sents, sent_tokens, event_list, dep_res)def del_useless_key(self, event_list):"""删除无关键值对"""for event in event_list:event['trigger'].pop('head')event['trigger'].pop('dep_relation')event['trigger'].pop('id')for k in event:if k == 'clausal_comp':event[k] = self.del_useless_key(event[k])if k == 'sent_id':continueelif k == 'trigger':event[k].pop('head_id')event[k].pop('ner')continueelif k == 'head_trigger':continuefor node in event[k]:for useless_key in ['head', 'dep_relation', 'id', 'lemma', 'upos', 'ner']:  # 'head_id'if useless_key in node:node.pop(useless_key)return event_list

在实例化这个类之前,我们需要先实例化一个stanza模型,如果没有安装stanza的话需要先安装一下,本文不做具体的介绍。

import stanza
nlp = stanza.Pipeline(lang='en', dir='Stanza_En_v1/Stanza_En_model/models', use_gpu=True)
# 这里的这个路径dir是下载的stanza模型的路径
# use_gpu是控制是否使用gpu,stanza本身的封装没有提供选择哪块卡,如果想指定具体哪快卡可以直接问我

为了更好地显示事件抽取的结果,我定义了一个结果类:

class EventResult:"""【事件抽取结果显示】"""def __init__(self, sents, sent_tokens, event_list, dep_res):self.sents = sentsself.sent_tokens = sent_tokensself.dict = event_listself.dep_res = dep_resself.label2color = {'nsubj': '\033[1;34m','nsubj:pass': '\033[1;34m','trigger': '\033[1;35m','obj': '\033[1;33m','iobj': '\033[1;33m','obl': '\033[1;32m','obl:tmod': '\033[1;32m',}self.label2color4 = {'nsubj': '\033[4;34m','nsubj:pass': '\033[4;34m','trigger': '\033[4;35m','obj': '\033[4;33m','iobj': '\033[4;33m','obl': '\033[4;32m','obl:tmod': '\033[4;32m',}def to_dict(self):return self.dictdef __len__(self):return len(self.dict)def _to_triples(self, event, is_clausal=False):argument_triples = []for k in event:if k == 'sent_id':continueelif k == 'head_trigger':continueelif k == 'clausal_comp' or k == 'open_clausal_comp':for clausal in event[k]:argument_triples += self._to_triples(clausal, is_clausal=True)elif k == 'trigger':argument_triples.append([k, event[k]['token_start'], event[k]['token_end'], event['sent_id'], is_clausal])else:try:for arg in event[k]:if type(arg) == str:continuetry:argument_triples.append([k, arg['token_start'], arg['token_end'], event['sent_id'], is_clausal])except:print(arg)except:print(event)continuereturn sorted(argument_triples, key=lambda x: x[1])def __repr__(self):res = ''all_triples = []for event in self.dict:triples = self._to_triples(event)all_triples += triplespointer = 0if len(all_triples):prev_sent_id = all_triples[0][3]for triple in all_triples:if triple[0] not in self.label2color:continueif not triple[4]:# 如果是主句color = self.label2color[triple[0]]else:color = self.label2color4[triple[0]]# 补齐上一句cur_sent_id = triple[3]if cur_sent_id != prev_sent_id:# 中间跳过的句子# for i in range(prev_sent_id+1, cur_sent_id):#     res += self.sents[i]#     res += '\n\n'for i in range(pointer, len(self.sent_tokens[prev_sent_id])):res += self.sent_tokens[prev_sent_id][i]res += ' 'res += '\n\n'# 上一个论元结束到这个论元开始for i in range(pointer, triple[1]):res += (self.sent_tokens[triple[3]][i])res += ' 'res += (color)# 当前论元for i in range(triple[1], triple[2]+1):res += (self.sent_tokens[triple[3]][i])res += ' 'res += ('\033[0m')pointer = triple[2]+1prev_sent_id = cur_sent_id# 最后一个事件之后补全for i in range(pointer, len(self.sent_tokens[-1])):res += (self.sent_tokens[-1][i])res += ' 'return res

(这个类在显示的时候好像有一点小bug,我后来懒得调了)

然后就可以利用这个stanza模型去实例化我们的事件抽取工具:

dee = DepEventExtractor(nlp)

接下来我们就可以抽取事件了:

text = '"I will make America great again!", he said.'
dee_res = dee(text, del_useless=False)
print(dee(text))

看一下显示的结果:

如果想要转成结构化数据,可以直接对这个结果to_dict():

dee_res.to_dict()'''
[{'trigger': {'head': 'ROOT','dep_relation': 'root','words': 'said','id': 11,'head_id': -1,'upos': 'VERB','lemma': 'say','ner': 'O','token_start': 11,'token_end': 11,'tri_dep_id': 11},'nsubj': [{'head': 'said','dep_relation': 'nsubj','words': 'he','id': 10,'head_id': 11,'upos': 'PRON','lemma': 'he','ner': 'O','token_start': 10,'token_end': 10}],'sent_id': 0,'clausal_comp': [{'trigger': {'head': 'said','dep_relation': 'ccomp','words': 'will make','id': 3,'head_id': 11,'upos': 'VERB','lemma': 'make','ner': 'O','token_start': 2,'token_end': 3},'nsubj': [{'head': 'make','dep_relation': 'nsubj','words': 'I','id': 1,'head_id': 3,'upos': 'PRON','lemma': 'I','ner': 'O','token_start': 1,'token_end': 1}],'obj': [{'head': 'make','dep_relation': 'obj','words': 'America','id': 4,'head_id': 3,'upos': 'PROPN','lemma': 'America','ner': 'S-GPE','token_start': 4,'token_end': 4}],'head_trigger': {'head': 'ROOT','dep_relation': 'root','words': 'said','id': 11,'head_id': -1,'upos': 'VERB','lemma': 'say','ner': 'O','token_start': 11,'token_end': 11,'tri_dep_id': 11},'sent_id': 0,'words': 'I will make America great again ! " ,','token_start': 1,'token_end': 9}],'grown_pattern': [{'trigger': {'head': 'said','dep_relation': 'ccomp','words': 'will make','id': 3,'head_id': 11,'upos': 'VERB','lemma': 'make','ner': 'O','token_start': 2,'token_end': 3},'nsubj': [{'head': 'make','dep_relation': 'nsubj','words': 'I','id': 1,'head_id': 3,'upos': 'PRON','lemma': 'I','ner': 'O','token_start': 1,'token_end': 1}],'obj': [{'head': 'make','dep_relation': 'obj','words': 'America','id': 4,'head_id': 3,'upos': 'PROPN','lemma': 'America','ner': 'S-GPE','token_start': 4,'token_end': 4}],'head_trigger': {'head': 'ROOT','dep_relation': 'root','words': 'said','id': 11,'head_id': -1,'upos': 'VERB','lemma': 'say','ner': 'O','token_start': 11,'token_end': 11,'tri_dep_id': 11},'sent_id': 0,'words': 'I will make America great again ! " ,','token_start': 1,'token_end': 9}]}]
'''

2. 抽取特定类型的事件

如果想要抽取某些类型的事件,那么可以通过指定触发词的方法实现。

首先写一个事件类型类:

class EventType:"""【事件类型】"""def __init__(self, event_name, domain_name, trigger_words):""":param event_name: str: 事件类型名称:param domain_name: str: 事件领域名称;param trigger_words: list: 事件触发词列表"""self.event_name = event_nameself.domain_name = domain_nameself.trigger_words = trigger_wordsdef add_trigger(self, trigger):self.trigger_words.append(trigger)def remove_trigger(self, trigger):if trigger not in self.trigger_words:print('Trigger {} not in `trigger_words`.'.format(trigger))else:self.trigger_words = self.trigger_words.remove(trigger)def get_trigger_lemmas(self):pass

以言论事件为例,通过给定一些触发词,实例化这个类:

speech_words = ['say', 'speak', 'talk', 'stress', 'report', 'tell', 'ask', 'warn', 'note', 'state', 'deny', 'explain', 'insist', 'suggest', 'acknowledge', 'believe', 'confirm', 'think', 'vow']
speech_event = EventType(event_name='speech', domain_name='common', trigger_words=speech_words)

然后写一个指定事件类型的抽取类:

class SpecificEventExtractor:"""【特定类型事件抽取】从依存关系事件抽取结果中,抽取特定的事件在dep抽取完之后使用Example:# (1) Create a dep model and get the dep resultdee = DepEventExtractor(nlp)text = "China's defense minister says resolving 'Taiwan question' is national priority"dee_res = dee(text, del_useless=False)# (2) Define a specific event type, for example, speechspeech_event = EventType(event_name='speech', domain_name='common', trigger_words=speech_words)# (3) Create a specific extractor and get all events of this typesee = SpecificEventExtractor(speech_event, dee_res)spcf_events = see.get_spcf_events()spcf_events_exp = see.get_expanded_events()# (4) Show the resultprint(spcf_events_exp.to_dict()[0]['grown_pattern'][0]['words'])---------------ver: 2021-11-16by: changhongyu"""def __init__(self, event_type, dee_result):""":param event_type: EventType: 事件类型类:param dee_result: EventResult: 事件抽取的结果"""self.event_type = event_typeself.dee_result = dee_resultdef get_spcf_events(self):"""获取事件:return spcf_event_list: list"""spcf_event_list = []for event in self.dee_result.to_dict():if event['trigger']['lemma'] in self.event_type.trigger_words:spcf_event_list.append(event)return EventResult(self.dee_result.sents, self.dee_result.sent_tokens, spcf_event_list, self.dee_result.dep_res)def get_expanded_events(self):"""以事件触发词作为根节点对事件宾语进行扩展"""spcf_events = self.get_spcf_events()for event in spcf_events.to_dict():if 'clausal_comp' not in event:continueevent['grown_pattern'] = [self._node_grow(ccomp, ccomp['trigger']['id'], event['sent_id']) \for ccomp in event['clausal_comp']]# print(event['trigger'])return spcf_eventsdef _node_grow(self, node, node_id, sent_id):"""将一个节点扩展到所有子节点"""if 'words' not in node:node['words'] = node['trigger']['words']if 'token_start' not in node:node['token_start'] = node['trigger']['token_start']if 'token_end' not in node:node['token_end'] = node['trigger']['token_end']cur_sent = self.dee_result.sents[sent_id]for child_node in self.dee_result.dep_res[sent_id]['dep']:if child_node['head_id'] == node_id and child_node['words'] != node['words']:if 'token_start' not in child_node:child_node['token_start'] = child_node['id']if 'token_end' not in child_node:child_node['token_end'] = child_node['id']child_node = self._node_grow(child_node, child_node['id'], sent_id)node['token_start'] = min(node['token_start'], child_node['token_start'])node['token_end'] = max(node['token_end'], child_node['token_end'])node['words'] = ''.join(tok + ' ' for tok in self.dee_result.sent_tokens[sent_id][node['token_start']: node['token_end']+1])[: -1]return node

在使用这个类的时候需要传入第1节中写的那个类的抽取结果,然后这个类对结果进行过滤:

text = "China's defense minister says resolving 'Taiwan question' is national priority"
dee_res = dee(text, del_useless=False)
print(dee(text))   # 这里显示可能是有问题的,实际结果以最终to_dict拿到的为准
see = SpecificEventExtractor(speech_event, dee_res)
spcf_events = see.get_expanded_events()# 查看言论的内容:
spcf_events.to_dict()[0]['grown_pattern'][0]['words']
# "resolving ' Taiwan question ' is national priority"

3. 结语

本文介绍了我自己实现的一个基于句法规则的弱监督事件抽取模型,可以用来生成一部分训练数据,但是不建议直接把它当作一个成熟的抽取模型来使用。如果对这个工具的使用有任何疑问,可以给我留言,如果想在这个基础上继续完善,欢迎对它进行改造。

本文到此为止就结束了。在今后的博客中,我还准备了很多我自己编写、整理的原创作品,期待的话请多多为我投币吧。

NLP工具——自制zero-shot事件抽取器相关推荐

  1. NLP工具——自制英文时间标准化工具

    NLP工具--自制英文时间标准化工具 0. 说明 1. 示例 2. 时间格式 3. 主类 0. 说明 这是我自己写的一个时间标准化工具,功能是把一些常见的时间词描述转换为相对标准的YYYY-MM-DD ...

  2. 文本信息常用的事件抽取模型

    3.3.4 常用的事件抽取模型 DMCNN DMCNN是一种基于动态池化(dynamic pooling)的卷积神经网络模型的事件抽取方法,来自中国科学院自动化研究所的论文<Event Extr ...

  3. 用MS SQL Server事件探查器来跟踪数据库的操作

    在MS SQL Server中,除了我们常用的企业管理器和查询分析器之外,还有一个非常有用的工具--事件探查器.由于这两天,我需要知道一个ASP.NET程序对某个数据库的哪些表进行了哪些操作,所以才发 ...

  4. 电脑一键重装系统后如何打开事件查看器

    很多小伙伴给电脑装了系统后不知道从哪里找到事件查看器,在事件查看器中我们可以看到电脑每次开机的时间信息,这样就能知道每次电脑使用的时间点了,下面就来看看电脑一键重装系统后如何打开事件查看器吧. 1.在 ...

  5. 搜索系统的蛛丝马迹——事件查看器

    阅读提示:如果你不关心系统的安全,不想找到产生故障的原因,不想知道前天什么时候上网.上了多长时间,那么本文肯定不适合你.因为在这里,除了可以学到关于事件查看器的基本知识,还会了解如何利用事件查看器里的 ...

  6. NLP事件抽取综述(上中下):中文事件抽取、开放域事件抽取、事件数据生成、跨语言事件抽取、小样本事件抽取、零样本事件抽取等类型

    https://github.com/xiaoqian19940510/Event-Extraction 近年来事件抽取方法总结,包括中文事件抽取.开放域事件抽取.事件数据生成.跨语言事件抽取.小样本 ...

  7. NLP关系抽取和事件抽取

    关系抽取 关系抽取又称实体关系抽取,以实体识别为前提,在实体识别之后,判断给定文本中的任意两个实体是否构成事先定义好的关系,是文本内容理解的重要支撑技术之一,对于问答系统,智能客服和语义搜索等应用都十 ...

  8. NLP事件抽取顶刊顶会模型汇总-2021

    SpERT(基于span)(使用BERT) Span-based Joint Entity and Relation Extraction with Transformer Pre-training ...

  9. 知识图谱(七)——事件抽取

    文章目录 一.任务概述 1.事件的定义 2.事件抽取的定义 3.相关评测和语料资源 二.限定域事件抽取 1.基于模式匹配的事件抽取方法 1)有监督的事件模式匹配 2)弱监督的事件模式匹配 3)优缺点: ...

最新文章

  1. Linux下利用rsync实现网站镜像同步
  2. APACHE TOMCAT INTERVIEW QUESTIONS ANSWERS【转】
  3. Referenced file contains errors (http://www.springframework.org/schema...错误--转载
  4. 【算法总结】图论相关
  5. php实现购物车 redis,redis 哈希数据类型简单操作(实现购物车案例)
  6. 网络编程知识点复习(第一次)
  7. simulink-EtherCAT工具箱常用模块的简要介绍
  8. Tomcat6 如何彻底卸载,才干净?
  9. Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  10. 如何显示隐藏文件和文件扩展名
  11. 动态规划入门(走楼梯问题 c++)
  12. LikeLib:区块链+云计算的结合技术现在成熟了吗?
  13. ARM6818开发板画任意矩形,圆形,三角形,五角星,6818开发板画太极,画五星红旗(含码源与思路)
  14. 如何获取QQ邮箱授权码——步骤详解
  15. FAThdc.inc
  16. Postman使用newman命令执行
  17. centOS7 安装图形界面
  18. 一手源刊|10月SCI/SSCI/EI刊源已更新, 新增多本TOP/CCF-B优质刊~
  19. 软件从业者成功的秘密
  20. 目标检测论文解读复现之一:基于改进YOLOv5的整车原木数量检测方法——TWD-YOLOv5(代码已复现)

热门文章

  1. Manjaro/Arch下Pango-ERROR Harfbuzz version too old的解决方法
  2. 未能加载文件或程序集“file:///C:\Program Files (x86)\SAP BusinessObjects\Crystal Reports for .NET Framework 4.0
  3. 一个完整的Installshield安装程序实例—艾泽拉斯之海洋女神出品(一)---基本设置一...
  4. 三月随笔——漫长的等待
  5. 解决IDEA按键失效
  6. 浏览器特性与安全策略
  7. 小米手机安卓手机nfc使用天府通教程及使用方法
  8. 人脸图片马赛克(OpenCv)
  9. 如何区分高频低频信号
  10. php ci框架 微信公众号 JSAPI支付