分词器大家都很熟悉,这里我们从源码分析下jieba分词器的实现

github地址: https://github.com/fxsjy/jieba

分析源码前我们分析下jieba的分词是怎么实现的

1 基于前缀词典实现的词表扫描,然后生成基于句中汉字所有可能成词的有向无环图(DAG)

2 采用动态规划计算最大概率路径,找出基于词频的最大切分组合

3 对于未登录词和连着多个单个词,采用了HMM模型,使用viterbi算法来计算隐藏序列

4 提前统计HMM的估计参数 PI/A/B 三个参数矩阵 (本文不打算深入HMM 不清楚的请自行搜索资料)

5 最后我们基于jieba实现一个自己的分词器

import jieba
sentence = '我说上海是中国首都'
data = list(jieba.cut(sentence)

1  进入cut 函数

def cut(self, sentence, cut_all=False, HMM=True, use_paddle=False):#is_paddle_installed = check_paddle_install['is_paddle_installed']sentence = strdecode(sentence)# 这里是看是否使用第三方paddle分词# 检测paddle是否安装if use_paddle and is_paddle_installed:# if sentence is null, it will raise core exception in paddle.if sentence is None or len(sentence) == 0:returnimport jieba.lac_small.predict as predictresults = predict.get_sent(sentence)for sent in results:if sent is None:continueyield sentreturnre_han = re_han_defaultre_skip = re_skip_defaultif cut_all:cut_block = self.__cut_allelif HMM:# 我们主要分析的方法 使用 HMMcut_block = self.__cut_DAG

2 进入__cut_DAG

def __cut_DAG(self, sentence):# 得到句子的有向无环图 比如# 得到 有向无环图DAG = self.get_DAG(sentence)route = {}# 计算最大概率路径self.calc(sentence, DAG, route)x = 0# 定义一个 临时 变量存储要处理的词buf = ''N = len(sentence)# 遍历句子while x < N:# 取出该词到后面词的最大路径处的词y = route[x][1] + 1# 取出这个最大概率路径词l_word = sentence[x:y]if y - x == 1:# 如果这个词长度为1 用临时变量buf 存起来 后面判断是否需要用hmm来进一步分词buf += l_wordelse:# 如果该词是个词组 长度大于1if buf:# 如果buf里只存了一个词 直接返回 不需要用hmm来进一步分词if len(buf) == 1:yield buf# 清空bufbuf = ''else:# 如果存储在buf中的多个词不在词库中 用 hmm进行分词"""这里重点分析下 方便理解更清楚eg  我说北京是中国首都前两个词  我 说 都是单个的 所有会进入上面的if  buf = '我说'然后 到 北京是  会进入 else  如果  我说 不在词库中就会进入hmm分词 北京 被后面 的 yield l_word 直接返回  然后清空 buf """if not self.FREQ.get(buf):# HMM分词recognized = finalseg.cut(buf)# 因为这里是个生成器 需要遍历进行返回for t in recognized:yield telse:# 如果buf中的多个词在词库中 直接遍历单个返回 不是当成词组返回,因为它单个成词的概率最大for elem in buf:yield elem# 清空buf  其实 这个 buf可以放到 if else 外面 因为都需要清空buf = ''# 对于本来已经是词组的词直接返回  不需要进一步处理yield l_wordx = y# 如果buf 还有词未处理 继续处理if buf:# 如果只有一个词 直接返回 后面其实就和上面的if else 一样了if len(buf) == 1:yield bufelif not self.FREQ.get(buf):recognized = finalseg.cut(buf)for t in recognized:yield telse:for elem in buf:yield elem

进入 get_DAG

def get_DAG(self, sentence):# 检测是否初始化# 初始化主要加载前缀字典 self.FREQ  self.total 为了后续计算概率 详情见 gen_pfdict 函数self.check_initialized()DAG = {}N = len(sentence)# 遍历句子# 构建有向无环图# 我说北京是中国首都# DAG: {0: [0], 1: [1], 2: [2, 3], 3: [3], 4: [4], 5: [5, 6], 6: [6], 7: [7, 8], 8: [8]}for k in xrange(N):tmplist = []i = kfrag = sentence[k]while i < N and frag in self.FREQ:if self.FREQ[frag]:tmplist.append(i)i += 1frag = sentence[k:i + 1]if not tmplist:tmplist.append(k)DAG[k] = tmplistreturn DAG

进入 get_pfdict

def gen_pfdict(f):# f 其实是 dict.txt 文件 里面存所有词、次数、词性lfreq = {}ltotal = 0f_name = resolve_filename(f)for lineno, line in enumerate(f, 1):try:line = line.strip().decode('utf-8')# 里面存所有词、次数、词性 这里取 词和次数word, freq = line.split(' ')[:2]freq = int(freq)lfreq[word] = freq# 记录总的次数 方便后续计算概率ltotal += freq# 遍历词的所有前缀存储for ch in xrange(len(word)):wfrag = word[:ch + 1]if wfrag not in lfreq:# 设置词的前缀出现的次数是0# 大约会使词库增加40%左右lfreq[wfrag] = 0except ValueError:raise ValueError('invalid dictionary entry in %s at Line %s: %s' % (f_name, lineno, line))f.close()return lfreq, ltotal

进入 calc

def calc(self, sentence, DAG, route):# 采用动态规划 计算最大概率的切分路径# 句子长度N = len(sentence)# 初始化  route 存储当前词到后面词的最大概率路径route[N] = (0, 0)# 很多词的出现次数其实是很小的除以total计算出的概率值太小  所有这里用对数概率来计算# 将除法变成减法logtotal = log(self.total)for idx in xrange(N - 1, -1, -1):# 反向计算路径切分最大概率# 这里为什么要反向计算 是因为 DAG 里面每个词存了该词到后面词的所有路径 route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -logtotal + route[x + 1][0], x) for x in DAG[idx])

最后分析下 HMM分词

def __cut(sentence):global emit_P# 通过veterbi方法计算最佳隐藏序列prob, pos_list = viterbi(sentence, 'BMES', start_P, trans_P, emit_P)begin, nexti = 0, 0# print pos_list, sentence# 处理sentence 通过隐藏序列得到对应的分词for i, char in enumerate(sentence):pos = pos_list[i]if pos == 'B':begin = ielif pos == 'E':yield sentence[begin:i + 1]nexti = i + 1elif pos == 'S':yield charnexti = i + 1if nexti < len(sentence):yield sentence[nexti:]

最后分析 viterbi方法

def viterbi(obs, states, start_p, trans_p, emit_p):# HMM decoding / inference  已知 λ=(A,B,π) 和 观察序列 O1, O2, ..., Ot 这里的观测序列就是出现的词# 计算隐藏序列 隐藏序列就是 BMES 组成的序列V = [{}]  # tabularpath = {}# 初始化 V 和 path# V 里面存储的是 当前时刻处于某一个状态的最大概率# Path 存储的是当前状态下的最大概率路径for y in states:  # init# 这里是对数概率  所有乘法变成 加法V[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)path[y] = [y]# 计算每个时间点t 某个状态下的最大概率值for t in xrange(1, len(obs)):V.append({})newpath = {}# 遍历所有状态 BMES for y in states:# 得到当前状态下得到该词的概率em_p = emit_p[y].get(obs[t], MIN_FLOAT)# 这里其实也是需要遍历所有前一个状态到当前状态下的所有概率值 取最大 # 因为 BMES 有一定的规律 并不是所有状态都可以到达该状态 所以只需要遍历 PrevStatus# PrevStatus = {#     'B': 'ES',#     'M': 'MB',#     'S': 'SE',#     'E': 'BM'# }(prob, state) = max([(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])V[t][y] = prob# 计算该状态下的最佳路径newpath[y] = path[state] + [y]# 存储最佳路径path = newpath# 因为句子最后一个词一定为 E或者S 所以只需要 遍历 ES 得到最佳路径(prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')return (prob, path[state])

OK 基本源码已经分析完了, 现在我们回想下 基本jieba我们是不是也能实现一个自己的分词?

接下来我们开始搭建一个自己的分词

第一步: 根据语库资料得到词库

import os
from collections import Counter"""根据语料生成dict 文件
"""class GenerateDict:def __init__(self, data_files):self.data_files = data_filesself._parse_file()def _parse_file(self):if os.path.exists('dict.txt'):returnif len(self.data_files) == 0:returncounter = Counter()for file in self.data_files:if not os.path.exists(file):continuef = open(file, encoding='utf-8')lines = f.readlines()for line in lines:word_arr = line.strip().split()counter.update(word_arr)f.close()if len(counter) == 0:returnsorted_arr = sorted(counter.items(), key=lambda item: item[1], reverse=True)out_file = open('dict.txt', 'w', encoding='utf-8')for word, num in sorted_arr:line = word + ' ' + str(num) + '\n'out_file.write(line)out_file.close()if __name__ == '__main__':data_files = ['data.txt']GenerateDict(data_files)

第二步 由于使用的HMM 我们需要自己统计参数λ=(A,B,π)并存储下来

"""
生成 HMM 参数
(A, B, pi) 发射概率、转移概率、初始概率
"""import os
from math import log
import jsonMIN_FLOAT = -3.14e100class GenerateProb:def __init__(self, data_files):self.data_files = data_filesself.STATUS = 'BMES'# 初始概率self.PI = {}# 发射概率self.A = {}# 转移概率self.B = {}self._parse_file()def _parse_file(self):if len(self.data_files) == 0:returnfor file in self.data_files:if not os.path.exists(file):continuef = open(file, encoding='utf-8')lines = f.readlines()for line in lines:word_arr = line.strip().split()labels = []for index, word in enumerate(word_arr):if len(word) == 1:label = 'S'else:label = 'B' + 'M' * (len(word) - 2) + 'E'# 取第一个字if index == 0:key = label[0]if key in self.PI:self.PI[key] += 1else:self.PI[key] = 1# 对于每一个词统计发射数for i, item in enumerate(label):ch = word[i]if item not in self.A:self.A[item] = {}self.A[item][ch] = 1else:if ch not in self.A[item]:self.A[item][ch] = {}self.A[item][ch] = 1else:self.A[item][ch] += 1labels.extend(label)# 统计Bfor i in range(1, len(labels)):pre_status = labels[i - 1]cur_status = labels[i]if pre_status not in self.B:self.B[pre_status] = {}self.B[pre_status][cur_status] = 1else:if cur_status not in self.B[pre_status]:self.B[pre_status][cur_status] = {}self.B[pre_status][cur_status] = 1else:self.B[pre_status][cur_status] += 1f.close()# 计算 PI A Bself.__calc_params()def __calc_params(self):# 统计PItotal = 0.for key, value in self.PI.items():total += valuefor state in self.STATUS:if state not in self.PI:self.PI[state] = MIN_FLOATelse:self.PI[state] = log(self.PI[state] / total)# 统计 Bfor key, item in self.B.items():total = 0.for inner_key, value in item.items():total += valuefor inner_key, value in item.items():item[inner_key] = log(value / total)# 统计 Afor key, item in self.A.items():total = 0.for inner_key, value in item.items():total += valuefor inner_key, value in item.items():item[inner_key] = log(value / total)if __name__ == '__main__':data_files = ['data.txt']prob = GenerateProb(data_files)data = {'PI': prob.PI,'B': prob.B,'A': prob.A}if not os.path.exists('prob.json'):print('write prob to json')with open('prob.json', 'w') as f:json.dump(data, f)

第三步 然后实现自己的分词

from math import log
import hmmclass Tokenizer:def __init__(self):self.initialized = Falseself.FREQ, self.total = {}, 0self.initialize()def initialize(self):freq_dict = {}total = 0f = open('dict.txt', encoding='utf-8')lines = f.readlines()for line in lines:line = line.strip()word_freq = line.split()if len(word_freq) < 2:continueword, freq = word_freq[0:2]freq = int(freq)freq_dict[word] = freqtotal += freqfor ch in range(len(word)):sub_word = word[:ch + 1]if sub_word not in freq_dict:freq_dict[sub_word] = 0f.close()self.FREQ, self.total = freq_dict, totalself.initialized = Truedef check_initialized(self):if not self.initialized:self.initialize()def get_DAG(self, sentence):self.check_initialized()DAG = {}N = len(sentence)for k in range(N):tmplist = []i = kfrag = sentence[k]while i < N and frag in self.FREQ:if self.FREQ[frag]:tmplist.append(i)i += 1frag = sentence[k:i + 1]if not tmplist:tmplist.append(k)DAG[k] = tmplistreturn DAGdef calc(self, sentence, DAG, route):N = len(sentence)route[N] = (0, 0)logtotal = log(self.total)for idx in range(N - 1, -1, -1):route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -logtotal + route[x + 1][0], x) for x in DAG[idx])def __cut_DAG(self, sentence):DAG = self.get_DAG(sentence)route = {}self.calc(sentence, DAG, route)x = 0N = len(sentence)result = []buf = ''while x < N:y = route[x][1] + 1tmp_word = sentence[x:y]if y - x == 1:buf += tmp_wordelse:if buf:if len(buf) == 1:result.append(buf)else:if buf not in self.FREQ:# HMM 分词recognized = hmm.cut(buf)for elem in recognized:result.append(elem)else:for elem in buf:result.append(elem)buf = ''result.append(tmp_word)x = yif buf:if len(buf) == 1:result.append(buf)elif buf not in self.FREQ:# HMM 分词recognized = hmm.cut(buf)for elem in recognized:result.append(elem)else:for elem in buf:result.append(elem)return resultdef cut(self, sentence, HMM=True):if HMM:cut_block = self.__cut_DAGelse:raise NotImplemented("error not implement")return cut_block(sentence)tk = Tokenizer()
cut = tk.cut

第四步 实现HMM分词

import os
import jsonMIN_FLOAT = -3.14e100# load start_P, trans_P, emit_P
if os.path.exists('../prob.json'):f = open('../prob.json')data = json.load(f)start_P = data['PI']trans_P = data['B']emit_P = data['A']
else:from .prob_start import P as start_Pfrom .prob_trans import P as trans_Pfrom .prob_emit import P as emit_P# B - begin
# M - Mid
# S - Single
# E - End
STATES = 'BMES'
# 每个状态的前一个状态可能出现的状态  比如  E -> B  S -> B  B不能转到B M不能转到B
PrevStatus = {'B': 'ES','M': 'MB','S': 'SE','E': 'BM'
}def viterbi(obs, states, start_p, trans_p, emit_p):V = [{}]  # tabularpath = {}for y in states:  # initV[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)path[y] = [y]for t in range(1, len(obs)):V.append({})newpath = {}for y in states:em_p = emit_p[y].get(obs[t], MIN_FLOAT)(prob, state) = max([(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])V[t][y] = probnewpath[y] = path[state] + [y]path = newpath(prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')return prob, path[state]def cut(sentence):global emit_Pprob, pos_list = viterbi(sentence, STATES, start_P, trans_P, emit_P)begin, nexti = 0, 0for i, char in enumerate(sentence):pos = pos_list[i]if pos == 'B':begin = ielif pos == 'E':yield sentence[begin:i + 1]nexti = i + 1elif pos == 'S':yield charnexti = i + 1if nexti < len(sentence):yield sentence[nexti:]

最后测试

import jieba
import tokenizersentence = '我说北京是中国首都'data = list(jieba.cut(sentence))
print('jieba:', data)
data = tokenizer.cut(sentence)
print('tokenizer:', data)

输出结果:

从结果上可以看出来: 效果还算不错

当然这只是个大概功能,大家也可以按照自己的想法进行功能添加和修改。

最后附上代码的github地址:

https://github.com/under-the-moon/myjiebahttps://github.com/under-the-moon/myjieba

 以上均为原创,转载请添加来源!

jieba分词的源码解析,并从零实现自己的分词器相关推荐

  1. spring boot 源码解析15-spring mvc零配置

    前言 spring boot 是基于spring 4 的基础上的一个框架,spring 4 有一个新特效–>基于java config 实现零配置.而在企业的实际工作中,spring 都是和sp ...

  2. 【NLP】Words Normalization+PorterStemmer源码解析

    Words Normalization 目录 Words Normalization Stemming(词干提取) Lemmatisation(词形还原) PorterStemmer源码解析 1.de ...

  3. StarRocks Analyzer 源码解析

    导读:欢迎来到 StarRocks 源码解析系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你逐步了解这款明星开源数据库产品.本期将主要介绍 StarRocks Par ...

  4. 《Attention is all you need》源码解析+算法详解

    Attention is all you need 源码解析 最近学习Transformer模型的时候,并且好好读了一下Google的<Attention is all you need> ...

  5. 谷歌BERT预训练源码解析(一):训练数据生成

    目录 预训练源码结构简介 输入输出 源码解析 参数 主函数 创建训练实例 下一句预测&实例生成 随机遮蔽 输出 结果一览 预训练源码结构简介 关于BERT,简单来说,它是一个基于Transfo ...

  6. ElasticSearch源码解析(五):排序(评分公式)

    ElasticSearch源码解析(五):排序(评分公式) 转载自:http://blog.csdn.net/molong1208/article/details/50623948   一.目的 一个 ...

  7. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  8. Attention is all you need pytorch实现 源码解析01 - 数据预处理、词表的构建

    我们今天开始分析著名的attention is all you need 论文的pytorch实现的源码解析. 由于项目很大,所以我们会分开几讲来进行讲解. 先上源码:https://github.c ...

  9. openGauss数据库源码解析系列文章—— AI技术之“自调优”

    上一篇介绍了第七章执行器解析中"7.6 向量化引擎"及"7.7 小结"的相关内容,本篇我们开启第八章 AI技术中"8.1 概述"及" ...

最新文章

  1. [Struts2应用开发] JSON的应用
  2. 从流感预测到智能决策,深度学习能帮企业做哪些事?
  3. linux下mq的mc.sh在哪?,RocketMQ的安装与使用
  4. python基础期末考试_python基础试题(4)
  5. JS partial-application
  6. html加入购物车的动画,vue实现加入购物车动画
  7. python的cubes怎么使用_如何使用python中的opengl?
  8. 马斯克:全力支持狗狗币主要持有者出售货币 持仓太集中是问题
  9. Jquery插件的编写和使用
  10. QObject::startTimer: Timers cannot be started from another thread
  11. 知道cve编号怎么搜poc_想在海南买房不知道该怎么选?快来看看这些热搜楼盘吧!...
  12. mysql判断表字段或索引是否存在,然后修改
  13. IDEA如何执行maven命令进行打包编译及常用命令
  14. 简化异常处理的Throwables类
  15. 6.苹果官方鼠标移动速度慢问题解决(Magic Mouse)
  16. Objective-C 与 C++ 的区别
  17. 常用编程语言介绍及特点
  18. ppt文件太大怎么压缩
  19. 表格thead设置border无效的原因之一
  20. 网站都变成灰色,一行代码就搞定了!

热门文章

  1. 05快速排序(不稳定)
  2. 【transformer】航空发动机寿命预测
  3. Word 2016 撰写论文(5): MathType 矩阵中的每个元素居中对齐
  4. 学生成绩查询系统(Java实现),你第一个Java小项目
  5. 009 PFA: Privacy-preserving Federated Adaptation for Effective Model Personalization(联邦个性化 自适应)
  6. python爬虫和数据可视化论文_Python爬虫之小说信息爬取与数据可视化分析
  7. 危险!80% 用户正在考虑放弃 Oracle JDK…
  8. Arduino开发之GP2Y0A21 Distance Sensor
  9. python数据结构算法_数据结构与算法(Python)
  10. CRC校验查表法原理及实现(CRC-16)