知己知彼,百战不殆。我们要打造一个能胜过人类的机器人,就必须要让机器人掌握人类的围棋思维模式,因此我们就需要使用人类棋手留下的棋盘数据训练机器人,让它从数据中掌握人类围棋思维存在的模式和套路。

幸运的是,我们能够通过围棋服务器拿到很多由人落子后产生的棋盘数据。很多围棋服务器公开了这些数据,这些围棋数据以一种叫Smart Game Format的方式存储,我们可以将其下载下来进行预处理后用于训练我们的神经网络,如此得到的网络,它的落子能力将远远超过上一节我们训练的网络机器人。

我们从当下最流行的围棋服务器下载棋盘数据,这个服务器叫KGS(Kiseido Go Server).在下载数据前,我们先了解具体的数据格式。它是一种文本格式数据,它通常用两个大写字母来表示棋盘属性,例如表示棋盘规格时使用的字母是SZ,然后在后面用大括号来容纳属性对应的数值,对于一个9*9的棋盘而言,对应的描述属性为SZ[9]。

它用W来表示白子,如果白子落在第三行第三列,对应的记录就是W[cc],也就是它使用字符次序来表示数字,因此c就表示数字3,同时B表示黑子,如果描述黑子落在第7行,第3列,对应的属性描述就是B[gc],字母g表示7。如果在某一步白棋或黑子pass,对应的描述就是B[],W[],也就是中括号内没有内容。

由此我们看下面一段数据对棋盘的描述:

(;FF[4] GM[1] SZ[9] HA[0] KM[6.5] RU[Japanese] RE[W+9.5] ;B[gc];W[cc];B[cg];W[gg];B[hf];W[gf];B[hg];W[hh];B[ge];W[df];B[dg] ;W[eh];B[cf];W[be];B[eg];W[fh];B[de];W[ec];B[fb];W[eb];B[ea];W[da] ;B[fa];W[cb];B[bf];W[fc];B[gb];W[fe];B[gd];W[ig];B[bd];W[he];B[ff] ;W[fg];B[ef];W[hd];B[fd];W[bi];B[bh];W[bc];B[cd];W[dc];B[ac];W[ab] ;B[ad];W[hc];B[ci];W[ed];B[ee];W[dh];B[ch];W[di];B[hb];W[ib];B[ha] ;W[ic];B[dd];W[ia];B[];
TW[aa][ba][bb][ca][db][ei][fi][gh][gi][hf][hg][hi][id][ie][if] [ih][ii]
TB[ae][af][ag][ah][ai][be][bg][bi][ce][df][fe][ga] W[])

其中FF[4]表示数据格式的版本号,有点类似于操作系统版本。GM[1]表示比赛第一盘,HA表示让子,HA[0]表示没有让子。RU[Japanese]表示围棋遵循日本规则,RE[W+9.5]表示白子以9.5分优势获胜,KM[6.5]表示第二落子的人获得6.5分补偿。接下来 以分好分割的就是双方落子方式。最后TW表示的是白子地盘,TB表示黑子占据的地盘。

理解了数据格式后,我们可以通过网址 https://www.u-go.net/gamerecords/
下载棋盘数据:

这上面都存储了六段以上高手对弈的棋盘数据。我们接下来将会创建一个爬虫机器人,爬去网页,分析里面链接后自动将数据下载到本地并解压,在后面我们会具体给出爬虫的实现代码,当爬虫运行后,它会解析页面,找出下载链接,依次把文件下载到指定文件夹中,其运行信息如下:

>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2006-19-10388-.tar.gz
worker is running
>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2005-19-13941-.tar.gz
worker is running
>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2004-19-12106-.tar.gz
worker is running
>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2003-19-7582-.tar.gz
worker is running
>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2002-19-3646-.tar.gz
worker is running
>>>Downloading content/gdrive/My Drive/GO_RECORD/KGS-2001-19-2298-.tar.gz

下载完数据后,我们会用代码解读棋盘数据,并将数据所表示的棋盘落子过程重放一遍,棋盘数据的解读烦琐耗时,为了将精力集中到网络训练上,我们将直接使用一个已经完成的数据解读类来帮我们解读棋盘数据。

首先我们先构造一段虚拟棋盘数据:

 "(;GM[1] FF[4] SZ[9];B[ee];W[ef];B[ff])" + \
";W[df];B[fe];W[fc];B[ef];W[gd];B[fb]"

然后使用棋盘数据读取工具类Sgf_game读取上面信息,将其转换成白棋和黑棋的落子信息,然后启动一个虚拟棋盘,将上面的落子步骤显示出来,当我们正确读取上面棋盘信息后,我们可以输出以下模拟棋盘:

19  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
18  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
17  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
16  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
15  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
14  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
13  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
12  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
11  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
10  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 9  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 8  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 7  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 6  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 5  .  .  .  . x .  .  .  .  .  .  .  .  .  .  .  .  .  . 4  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 3  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 2  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 1  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . A B C D E F G H J K L M N O P Q R S T
19  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
18  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
17  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
16  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
15  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
14  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
13  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
12  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
11  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
10  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 9  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 8  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 7  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 6  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 5  .  .  .  . x .  .  .  .  .  .  .  .  .  .  .  .  .  . 4  .  .  .  . o .  .  .  .  .  .  .  .  .  .  .  .  .  . 3  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 2  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 1  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . A B C D E F G H J K L M N O P Q R S T
19  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
18  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
17  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
16  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
15  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
14  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
13  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
12  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
11  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .
10  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 9  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 8  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 7  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 6  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 5  .  .  .  . x .  .  .  .  .  .  .  .  .  .  .  .  .  . 4  .  .  .  . ox .  .  .  .  .  .  .  .  .  .  .  .  . 3  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 2  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 1  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  .  . A B C D E F G H J K L M N O P Q R S T

完成了数据的解析后,我们就得创建数据处理器,将下载的棋盘数据转换成网络可以识别的向量格式,然后喂给网络,滋养网络的发育。它首先从下载的棋盘描述文件中选取出一部分进行解压,然后读取解压后的数据文件,将它描述的棋盘转换为上一节对应的棋盘编码,同时将当前棋盘与下一步落子对应起来。

我们要把数据分割成两部分,其中时间在2014年前的数据作为测试数据,之后的数据作为训练数据。我们把数据读入内存,按照上面描描述解析数据后,将解析后的数据存储起来以便以后使用,因为数据解析是非常耗时耗力的”脏活累活“,我们尽量做一次即可。

首先我们完成下载数据的代码:

import os
import sys
import multiprocessing
import six
from urllib.request import urlopen, urlretrieve#创建下载线程函数
def  worker(url_and_target):try:(url, target_path) = url_and_targetprint('>>>Downloading ' + target_path)urlretrieve(url, target_path)except (KeyboardInterrupt, SystemExit):print('Exiting download worker')class KGSDownloader:def __init__(self, kgs_url = 'https://www.u-go.net/gamerecords/', download_page = 'kgs_index.html',data_directory = '/content/gdrive/My Drive/GO_RECORD/'):self.kgs_url = kgs_urlself.download_page = download_pageself.data_directory = data_directory#下载文件信息self.file_info = []#下载数据对应的urlself.urls = []#启动下载页面解析流程self.loading()def  download_files(self):print('begin download')'''根据CPU核数创建下载线程同时下载棋盘数据'''if not os.path.isdir(self.data_directory):os.makedirs(self.data_directory)urls_to_download = []print('file_info: ', self.file_info)for file_info in self.file_info:url = file_info['url']file_name = file_info['filename']#如果文件没有下载过就进行下载print('filename is : ', file_name)if not os.path.isfile(self.data_directory + '/' + file_name):urls_to_download.append((url, self.data_directory + '/' + file_name))cores = multiprocessing.cpu_count()#根据CPU核数创建线程池pool = multiprocessing.Pool(processes = cores)print('cores: ', cores)try:#将要下载的文件URL分发给每个下载线程print('pool imap: ', urls_to_download)it = pool.imap(worker, urls_to_download)print('it: ', it)for _ in it:pass#关闭线程池防止资源泄露pool.close()pool.join()print('pool imap end')except KeyboardInterrupt:print('>>>Caught KeyboardInterrupt, terminating works')pool.terminate()pool.join()sys.exit(-1)def  create_download_page(self):print('create_download_page: ', )if os.path.isfile(self.download_page):print('>>> Reading download page: ', self.download_page)download_file = open(self.download_page, 'r')download_contents = download_file.read()print('contents: ', download_contents)download_file.close()else:print('>>> Downloading download page')fp = urlopen(self.kgs_url)data = six.text_type(fp.read())fp.close()download_contents = datadownload_file = open(self.download_page, 'w')download_file.write(download_contents)download_file.close()return download_contentsdef  loading(self):'''从html页面中将下载数据的文件名以及对应的url抽取出来'''download_contents = self.create_download_page()print('download contents: ', download_contents)split_page = [item for item in download_contents.split('<a href="') if item.startswith("https://")]for item in split_page:#在html页面源码中,数据下载链接在"Download"字符串前面download_url = item.split('">Download')[0]if download_url.endswith('.tar.gz'):self.urls.append(download_url)'''下载文件名格式如下:KGS-2019_01-19-2095-.tar.gz2019是年份,2095是盘数2015年之前的文件名在年份之后跟着的是'-'而不是'_'这点要注意'''for url in self.urls:filename = os.path.basename(url)split_file_name = filename.split('-')num_games = int(split_file_name[len(split_file_name) - 2])print(filename + ' ' + str(num_games))self.file_info.append({'url': url, 'filename':filename,'num_games' : num_games})downloader = KGSDownloader()
downloader.download_files()

在上面代码中,我们启动一个线程池,你的电脑有几核,它就能生成几个线程同时下载数据。首先代码先从解析下载页面的html代码,从中解析出下载链接,最后再将下载链接依次分发给下载线程进行下载。

当把数据下载完毕后,我们需要从下载的数据中选取需要的数据。下载数据总共有17000盘棋局作用,我们使用下面代码从下载数据中选取需要的数据量:

import random
import os'''
把数据分成两部分,一部分是测试数据,一部分是训练数据,为了保持数据集稳定,我们只采用不晚于2014年12月的数据。
下面代码先将下载数据中选定一定的棋盘数作为测试数据集,剩下的全部作为训练数据集
'''class  Sampler:def  __init__(self, data_dir = '/content/gdrive/My Drive/GO_RECORD/',num_test_games = 100,cap_year = 2015, seed = 1337):self.data_dir = data_dirself.num_test_games = num_test_gamesself.test_games = []self.train_games = []self.test_folder = 'test_samples.py'self.cap_year = cap_yearrandom.seed(seed)self.compute_test_samples()def  draw_data(self, data_type, num_samples):'''data_type 表明要抽取的数据是训练数据还是测试数据'''if  data_type == 'test':return  self.test_gameselif  data_type == 'train' and num_samples is not None:return self.draw_training_sampels(num_samples)elif  data_type == 'train' and num_samples is None:return self.draw_all_training()raise  ValueError(data_type + ' is not a valid data type')def  draw_samples(self, num_sample_games):available_games = []loader = KGSDownloader(data_directory = self.data_dir)for fileinfo in loader.file_info:filename = fileinfo['filename']year = int(filename.split('-')[1].split('_')[0])if year > self.cap_year:continuenum_games = fileinfo['num_games']for i in range(num_games):available_games.append((filename, i))print('>>>Total number of games used: ' + str(len(available_games)))sample_set = set()while len(sample_set) < num_sample_games:sample = random.choice(available_games)if sample not in sample_set:sample_set.add(sample)print('Drawn ' + str(num_sample_games) + ' samples:' )return list(sample_set)def  draw_training_games(self):'''从下载数据中抽取训练数据,这些数据对应于2014年之后的棋盘记录同时cap_year以后的数据暂时不考虑以便维持训练数据的稳定性'''loader = KGSDownloader(data_directory = self.data_dir)for file_info in loader.file_info:filename = file_info['filename']year = int(filename.split('-')[1].split('_')[0])if  year > self.cap_year:continuenum_games = file_info['num_games']for i in range(num_games):sample = (filename, i)if sample not in self.test_games:self.train_games.append(sample)print('total num training games: ' + str(len(self.train_games)))def  compute_test_samples(self):'''将2014年之前的棋盘数据作为测试数据放置到test_folder文件夹中'''if not os.path.isfile(self.test_folder):test_games = self.draw_samples(self.num_test_games)test_sample_file = open(self.test_folder, 'w')for sample in test_games:test_sample_file.write(str(sample) + '\n')test_sample_file.close()test_sample_file = open(self.test_folder, 'r')sample_contents = test_sample_file.read()test_sample_file.close()for line in sample_contents.split('\n'):if line != "":(filename, index) = eval(line)self.test_games.append((filename, index))def  draw_training_sampels(self, num_sample_games):available_games = []loader = KGSDownloader(data_directory = self.data_dir)for fileinfo in loader.file_info:filename = fileinfo['filename']year = int(filename.split('-')[1].split('_')[0])if year > self.cap_year:continuenum_games = fileinfo['num_games']for i in range(num_games):available_games.append((filename, i))print('total num games: ' + str(len(available_games)))sample_set = set()while len(sample_set) < num_sample_games:#从所有数据中随机选取一个sample = random.choice(available_games)#由于测试数据集都是在2014年之前,因此不属于测试数据集的数据都可以作为训练数据if sample not in self.test_games:sample_set.add(sample)print('Drawn ' + str(num_sample_games) + ' samples:')return list(sample_set)def  draw_all_training(self):available_games = []loader = KGSDownloader(data_directory = self.data_dir)for fileinfo in loader.file_info:filename = fileinfo['filename']year = int(filename.split('-')[1].split('_')[0])if year > self.cap_year:continueif 'num_games' in fileinfo.keys():num_games = fileinfo['num_games']else:continuefor i in range(num_games):available_games.append((filename, i))print('total num games: ' + str(len(available_games)))sample_set = set()for sample in available_games:if sample not in self.test_games:sample_set.add(sample)print('Drawn all samples, ie ' + str(len(sample_set)) + ' samples:')return list(sample_set)

上面代码将数据分成两部分,一部分作为测试数据,一部分作为训练数据。同时提供了灵活性,例如支持我们从17000盘数据中抽样出100盘数据等等。最后我们把数据读入内存,然后转换成前面我们讲过的棋盘编码:

import tarfile
import gzip
import glob
import shutil
import os.pathclass GoDataProcessor:def  __init__(self, encoder = 'oneplane', data_directory = '/content/gdrive/My Drive/GO_RECORD'):if encoder == 'oneplane':self.encoder = OnePlaneEncoder(19) self.data_dir = data_directorydef  load_go_data(self, data_type = 'train', num_samples = 1000):#从下载数据中抽取出给定数量的训练数据sampler = Sampler(data_dir = self.data_dir)data = sampler.draw_data(data_type, num_samples)#将选中的文件数据进行解压,index 表示第几盘zip_names = set()indices_by_zip_name = {}for filename, index in data:zip_names.add(filename)if filename not in indices_by_zip_name:indices_by_zip_name[filename] = []indices_by_zip_name[filename].append(index)for zip_name in zip_names:#创建解压后的文件名base_name = zip_name.replace('.tar.gz', '')data_file_name = base_name + data_typeprint('process zip file with name: ', self.data_dir + '/' + data_file_name)print('is file check: ', os.path.isfile(self.data_dir + '/' + data_file_name))#if not os.path.isfile(self.data_dir + '/' + data_file_name):self.process_zip(zip_name, data_file_name, indices_by_zip_name[zip_name])#将数据描述的棋盘转为为上一节描述的编码和对应的落子features_and_labels = self.consolidate_games(data_type, data)return  features_and_labelsdef  process_zip(self, zip_file_name, data_file_name, game_list):#先把数据文件解压出来tar_file = self.unzip_data(zip_file_name)zip_file = tarfile.open(self.data_dir + '/' + tar_file)#获得.tar.gz解压后文件夹中所有文件的名字集合name_list = zip_file.getnames()#这里得到当前棋盘总共下了多少步棋,game_list对应的是第几盘比赛total_examples = self.num_total_examples(zip_file, game_list, name_list)#shape = [19. 19]shape = self.encoder.shape()#feature_shape 是数组,每个元素是[19,19]二维向量feature_shape = np.insert(shape, 0, np.asarray([total_examples]))features = np.zeros(feature_shape)#lables是一维数组,每个元素对应落子位置labels = np.zeros((total_examples, ))print('process_zip with features len: ', len(features))features_len = len(features)counter = 0for index in game_list:name = name_list[index + 1]if not name.endswith('.sgf'):raise ValueError(name + ' is not a valid sgf')sgf_content = zip_file.extractfile(name).read()sgf = Sgf_game.from_string(sgf_content)'''水平高的一方可能会让子,于是另一方可直接连续落子,我们先处理这种情况'''game_state, first_move_done = self.get_handicap(sgf)for item in sgf.main_sequence_iter():#依次将落子步骤读取出来color, move_tuple = item.get_move()point = Noneif color is not None:if move_tuple is not None:row, col = move_tuplepoint = Point(row + 1, col + 1)move = Move.play(point)else:move = Move.pass_turn()if  first_move_done and point is not None:#如果有让子,那么把对方落子后的棋盘当做训练数据,然后另一方落子方式当做训练标签features[counter] = self.encoder.encode(game_state)labels[counter] = self.encoder.encode_point(point)counter += 1#先按照落子步骤形成棋盘,下一次读取落子时它就会变成训练数据game_state = game_state.apply_move(move)first_move_done = Truefeature_file_base = self.data_dir + '/' + data_file_name + '_features_%d'label_file_base = self.data_dir + '/' + data_file_name + '_label_%d'#我们将加工好的数据存储成文件chunk = 0chunksize = 1024#每1024条记录当做一个chunk,每一个chunk单独存储while features.shape[0] >= chunksize:feature_file = feature_file_base % chunklabel_file = label_file_base % chunkchunk += 1current_features, features = features[:chunksize], features[chunksize:]current_labels, labels = labels[:chunksize], labels[chunksize:]np.save(feature_file, current_features)np.save(label_file, current_labels)def  unzip_data(self, zip_file_name):#.tar.gz文件经过了两层压缩,首先解压gz压缩this_gz = gzip.open(self.data_dir + '/' + zip_file_name)#去掉尾部的.gz后缀tar_file = zip_file_name[0:-3]#创建.tar文件,将解压后gz压缩后的内容拷贝到该文件this_tar = open(self.data_dir + '/' + tar_file, 'wb')shutil.copyfileobj(this_gz, this_tar)return tar_filedef num_total_examples(self, zip_file, game_list, name_list):'''#根据棋盘描述文件中的落子次数推算出训练数据的长度,每一次落子前的棋盘会成为训练数据,落子则对应训练标签,一旦落子后形成的棋盘就会成为新的训练数据'''total_examples = 0for index in game_list:name = name_list[index + 1]if name.endswith('.sgf'):#zip_file对应解压后的tar文件,其中包含很多.sgf文件,这里把指定的sgf文件内容读取出来sfg_content = zip_file.extractfile(name).read()sgf = Sgf_game.from_string(sfg_content)game_state, first_move_done = self.get_handicap(sgf)num_moves = 0for item in sgf.main_sequence_iter():color, move = item.get_move()if color is not None:if first_move_done:num_moves += 1first_move_done = Truetotal_examples = total_examples + num_moveselse:raise ValueError(name + ' is not a valid sgf')return total_examples@staticmethoddef  get_handicap(sgf):#将让子时对应的落子摆到棋盘上go_board = Board(19, 19)first_move_done = Falsemove = Nonegame_state = GameState.new_game(19)if sgf.get_handicap() is not None and sgf.get_handicap() != 0:for setup in sgf.get_root().get_setup_stones():for move in setup:row, col = movego_board.place_stone(Player.black, Point(row + 1, col + 1))first_move_done = Truegame_state = GameState(go_board, Player.white, None, move)return game_state, first_move_done#前面我们把数据存储成多个小段,这里我们把多个小段读入内存合作一个整体def  consolidate_games(self, data_type, samples):files_needed = set(file_name for file_name , index in samples)file_names = []for zip_file_name in files_needed:file_name = zip_file_name.replace('.tar.gz', '') + data_typefile_names.append(file_name)feature_list = []label_list = []for file_name in file_names:file_prefix = file_name.replace('.tar.gz', '')base = self.data_dir + '/' + file_prefix + '_features_*.npy'print('consolidate with file: ', base)for feature_file in glob.glob(base):label_file = feature_file.replace('features', 'labels')x = np.load(feature_file)y = np.load(label_file)x = x.astype('float32')y = to_categorical(y.astype(int), 19 * 19)feature_list.append(x)label_list.append(y)features = np.concatenate(feature_list, axis = 0)labels = np.concatenate(label_list, axis = 0)np.save('{}/features_{}.npy'.format(self.data_dir, data_type), features)np.save('{}/labels_{}.npy'.format(self.data_dir, data_type), labels)return features, labels

上面的代码将下载后的棋盘数据解压,然后读取sfg格式文件,并将它们编码转换成前面我们说过的棋盘编码,由此我们就可以获得用于训练网络的数据。但运行上面的代码将非常缓慢耗时,因此我们要使用多进程机制加载数据以便提升速度和效率,首先我们将创建一个DataGenerator,它将像水泵一样将数据抽取出来传递给网络:

class DataGenerator:'''创建一个数据抽取水泵,按照网络需要每次从数据池中抽取一小部分数据用于网络训练'''def  __init__(self, data_directory, samples):self.data_directory = data_directory#samples表示要抽取的数据量self.samples = samplesself.files = set(file_name for file_name, index in samples)def  get_num_samples(self, batch_size = 128, num_classes = 19 * 19):'''为了加快数据读取速度,我们’按需‘抽取数据而不是一下子读取大量数据'''if  self.num_samples is not None:return  self.num_sampleselse:self.num_samples = 0for X, y in self._generate(batch_size = batch_size, num_classes = num_classes):self.num_samples += X.shape[0]return  self.num_samplesdef  _generate(self, batch_size, num_classes):for zip_file_name in self.files:file_name = zip_file_name.replace('.tar.gz', '') + 'train'base = self.data_director + '/' + file_name + '_features_*.npy'for feature_file in glob.glob(base):label_file = feature_file.replace('features', 'labels')x = np.load(feature_file)y = np.load(label_file)x = x.astype('float32')y = to_categorical(y.astype(int), num_classes)while x.shape[0] >= batch_size:x_batch, x = x[:batch_size], x[batch_size:]y_batch, y = y[:batch_size], y[batch_size:]yield x_batch, y_batchdef  generate(self, batch_size = 128, num_classes = 19 * 19):while  True:for item in self._generate(batch_size, num_classes):yield  item

接下来我们改进GoDataProcessor,使用多线程去实现文件的解压,读取并编码成训练数据:

#将前面的GoDataProcessor改进为多线程版本
import tarfile
import gzip
import glob
import shutil
import os.path
import numpy as npdef  worker(jobinfo):#工作线程try:'''实例化GoDataProcessor,调用它的process_zip解压给定压缩文件,同时解析sgf文件,将它们转换为棋盘编码,这个过程可以使用多线程加速'''clazz, encoder, zip_file, data_file_name, game_list = jobinfoclazz(encoder=encoder).process_zip(zip_file, data_file_name, game_list)except (KeyboardInterrupt, SystemExit):raise  Exception('>>> Exiting child process.')class GoDataProcessor:def  __init__(self, encoder = 'oneplane', data_directory = '/content/gdrive/My Drive/GO_RECORD'):if encoder == 'oneplane':self.encoder = OnePlaneEncoder(19) self.encoder_string = encoderself.data_dir = data_directorydef  load_go_data(self, data_type = 'train', num_samples = 1000,use_generator = False):#从下载数据中抽取出给定数量的训练数据sampler = Sampler(data_dir = self.data_dir)data = sampler.draw_data(data_type, num_samples)#启动线程池self.map_to_workers(data_type, data)if use_generator:#将解析后的数据分批次喂给网络generator = DataGenerator(self.data_dir, data)return generatorelse:#按照老方式一下子将所有数据推给网络features_and_labels = self.consolidate_games(data_type, data)return  features_and_labelsdef  process_zip(self, zip_file_name, data_file_name, game_list):#先把数据文件解压出来tar_file = self.unzip_data(zip_file_name)zip_file = tarfile.open(self.data_dir + '/' + tar_file)#获得.tar.gz解压后文件夹中所有文件的名字集合name_list = zip_file.getnames()#这里得到当前棋盘总共下了多少步棋,game_list对应的是第几盘比赛total_examples = self.num_total_examples(zip_file, game_list, name_list)#shape = [19. 19]shape = self.encoder.shape()#feature_shape 是数组,每个元素是[19,19]二维向量feature_shape = np.insert(shape, 0, np.asarray([total_examples]))features = np.zeros(feature_shape)#lables是一维数组,每个元素对应落子位置labels = np.zeros((total_examples, ))print('process_zip with features len: ', len(features))features_len = len(features)counter = 0for index in game_list:name = name_list[index + 1]if not name.endswith('.sgf'):raise ValueError(name + ' is not a valid sgf')sgf_content = zip_file.extractfile(name).read()sgf = Sgf_game.from_string(sgf_content)'''水平高的一方可能会让子,于是另一方可直接连续落子,我们先处理这种情况'''game_state, first_move_done = self.get_handicap(sgf)for item in sgf.main_sequence_iter():#依次将落子步骤读取出来color, move_tuple = item.get_move()point = Noneif color is not None:if move_tuple is not None:row, col = move_tuplepoint = Point(row + 1, col + 1)move = Move.play(point)else:move = Move.pass_turn()if  first_move_done and point is not None:#如果有让子,那么把对方落子后的棋盘当做训练数据,然后另一方落子方式当做训练标签features[counter] = self.encoder.encode(game_state)labels[counter] = self.encoder.encode_point(point)counter += 1#先按照落子步骤形成棋盘,下一次读取落子时它就会变成训练数据game_state = game_state.apply_move(move)first_move_done = Truefeature_file_base = self.data_dir + '/' + data_file_name + '_features_%d'label_file_base = self.data_dir + '/' + data_file_name + '_label_%d'#我们将加工好的数据存储成文件chunk = 0chunksize = 1024#每1024条记录当做一个chunk,每一个chunk单独存储while features.shape[0] >= chunksize:feature_file = feature_file_base % chunklabel_file = label_file_base % chunkchunk += 1current_features, features = features[:chunksize], features[chunksize:]current_labels, labels = labels[:chunksize], labels[chunksize:]np.save(feature_file, current_features)np.save(label_file, current_labels)def  unzip_data(self, zip_file_name):#.tar.gz文件经过了两层压缩,首先解压gz压缩this_gz = gzip.open(self.data_dir + '/' + zip_file_name)#去掉尾部的.gz后缀tar_file = zip_file_name[0:-3]#创建.tar文件,将解压后gz压缩后的内容拷贝到该文件this_tar = open(self.data_dir + '/' + tar_file, 'wb')shutil.copyfileobj(this_gz, this_tar)return tar_filedef num_total_examples(self, zip_file, game_list, name_list):'''#根据棋盘描述文件中的落子次数推算出训练数据的长度,每一次落子前的棋盘会成为训练数据,落子则对应训练标签,一旦落子后形成的棋盘就会成为新的训练数据'''total_examples = 0for index in game_list:name = name_list[index + 1]if name.endswith('.sgf'):#zip_file对应解压后的tar文件,其中包含很多.sgf文件,这里把指定的sgf文件内容读取出来sfg_content = zip_file.extractfile(name).read()sgf = Sgf_game.from_string(sfg_content)game_state, first_move_done = self.get_handicap(sgf)num_moves = 0for item in sgf.main_sequence_iter():color, move = item.get_move()if color is not None:if first_move_done:num_moves += 1first_move_done = Truetotal_examples = total_examples + num_moveselse:raise ValueError(name + ' is not a valid sgf')return total_examples@staticmethoddef  get_handicap(sgf):#将让子时对应的落子摆到棋盘上go_board = Board(19, 19)first_move_done = Falsemove = Nonegame_state = GameState.new_game(19)if sgf.get_handicap() is not None and sgf.get_handicap() != 0:for setup in sgf.get_root().get_setup_stones():for move in setup:row, col = movego_board.place_stone(Player.black, Point(row + 1, col + 1))first_move_done = Truegame_state = GameState(go_board, Player.white, None, move)return game_state, first_move_done#前面我们把数据存储成多个小段,这里我们把多个小段读入内存合作一个整体def  consolidate_games(self, data_type, samples):files_needed = set(file_name for file_name , index in samples)file_names = []for zip_file_name in files_needed:file_name = zip_file_name.replace('.tar.gz', '') + data_typefile_names.append(file_name)feature_list = []label_list = []for file_name in file_names:file_prefix = file_name.replace('.tar.gz', '')base = self.data_dir + '/' + file_prefix + '_features_*.npy'print('consolidate with file: ', base)for feature_file in glob.glob(base):label_file = feature_file.replace('features', 'labels')x = np.load(feature_file)y = np.load(label_file)x = x.astype('float32')y = to_categorical(y.astype(int), 19 * 19)feature_list.append(x)label_list.append(y)features = np.concatenate(feature_list, axis = 0)labels = np.concatenate(label_list, axis = 0)np.save('{}/features_{}.npy'.format(self.data_dir, data_type), features)np.save('{}/labels_{}.npy'.format(self.data_dir, data_type), labels)return features, labelsdef  map_to_workers(self, data_type, samples):#将选中的文件数据进行解压,index 表示第几盘zip_names = set()indices_by_zip_name = {}for filename, index in samples:zip_names.add(filename)if filename not in indices_by_zip_name:indices_by_zip_name[filename] = []indices_by_zip_name[filename].append(index)zips_to_process = []for zip_name in zip_names:#创建解压后的文件名base_name = zip_name.replace('.tar.gz', '')data_file_name = base_name + data_typezips_to_process.append((self.__class__, self.encoder_string, zip_name,data_file_name, indices_by_zip_name[zip_name]))cores = multiprocessing.cpu_count()pool = multiprocessing.Pool(processes = cores)p = pool.map_async(worker, zips_to_process)try:_ = p.get()except KeyboardInterrupt:pool.terminate()pool.join()sys.exit(-1)

上面代码跟以前代码差别不大,唯一差别在于使用多线程执行process_zip,也就是将文件的解压,读取,以及编码成训练数据的过程线程化,从而依赖多线程成倍提升效率。

本节代码比较繁琐,请参考视频加深理解。

更详细的讲解和代码调试演示过程,请点击链接

更多技术信息,包括操作系统,编译器,面试算法,机器学习,人工智能,请关照我的公众号:

使用人类棋手棋盘数据训练围棋机器人,实现数据预处理相关推荐

  1. 增强式学习:如何使用Q-Learning算法训练围棋机器人

    上一节我们构造出如下结构的神经网络: 本节我们看看如何使用该网络训练围棋机器人.我们在标题中提到Q-Learning,它实际上是一种使用上面网络进行训练的算法流程.首先我们先定义执行Q-Learnin ...

  2. 人工智能训练师:用数据“喂养”AI,教它们“更懂”人类

    近日,上海市人工智能行业协会发布了<人工智能训练师职业技能等级直接认定通过名单>,悠络客有四位小伙伴也在名单之列,顺利通过人工智能训练师技师(二级)认定. 图片来源:上海市人工智能行业协会 ...

  3. 年薪90万因上班无聊起诉公司/ 马斯克不让OpenAI用推特数据训练/ 旧金山警方获准使用致命机器人…今日更多新鲜事在此...

    日报君 发自 凹非寺 量子位 | 公众号 QbitAI 大家好! 今天是12月5日星期一,开工日! 科技圈都发生了哪些新鲜事? 一起来看看吧! 神舟十四号载人飞船返回舱成功着陆 据中国载人航天工程办公 ...

  4. 无借鉴棋手知识而精通围棋

    简单智能  AIChangeLife AIChangeLife 微信号 AIChangeLife 功能介绍 通过对信息进行量化分析(经济,各行业动态等), 进行预测. 原文标题:Mastering t ...

  5. 神经网络如何训练数据,训练神经网络的算法

    谷歌发布的人工智能服务工具AutoML如何使用? 在加入谷歌一年后,1月18日凌晨,谷歌云负责人.首席科学家李飞飞通过自己的推特账号和博客宣布了谷歌云取得的里程碑进展:可自动设计.建立机器学习模型的服 ...

  6. 打爆李世石第一步:使用神经网络设计人工智能围棋机器人

    上一节,我们使用基于蒙特卡洛树搜索的机器人来自我对弈,同时我们把机器人落子方式和落子时的棋盘编码记录下来,本节我们就使用上一节数据来训练神经网络,让网络学会如何在给定棋盘下进行精确落子. 神经网络的运 ...

  7. 4千元AI围棋机器人来了!人机大战在家随时开打,职业九段水平,聂卫平点赞...

    杨净 发自 凹非寺 量子位 | 公众号 QbitAI 下围棋,现在也可以跟AI机器人battle一番了. 商汤元萝卜继象棋之后,再推出围棋版AI机器人. 外观上看,这个机器人延续了此前的外观:小小宇航 ...

  8. 从 AI 级别到人类棋手级别

    为增加游戏的趣味性,需要判断人类棋手级别. 1. 问题 1: 竞技型对弈 1.1 问题描述 下棋是一个双方 (有时是多方, 如弹珠跳棋) 交替落子, 进行对抗的过程. 该问题可以描述如下: 问题 1: ...

  9. 如何使用深度学习训练聊天机器人

    原文地址 译者观点:目前AI整体处于研究热点,很多领域离产业化还很远,比如本文中的主题如何制作聊天机器人,虽然各大厂都有不同涉足,但是涉及的领域有限,其实在各个细分领域都可以训练专用的聊天机器人.那么 ...

最新文章

  1. 一种注册表沙箱的思路、实现——Hook Nt函数
  2. 2017《面向对象程序设计》寒假作业一
  3. python的执行过程_在交互式环境中执行Python程序过程详解
  4. 你是AI王者吗?2018人工智能专业期末考试,66666奖学金等你来
  5. 人生最浪费生命的四件事,2017年别再做了!
  6. 如何使用plantUML生成go项目的UML图?(mac)
  7. hdu-1074 Doing Homework
  8. 『设计模式』就因为多收了我2块5,我追着收银员问是不是不懂设计模式--策略模式
  9. Python MySQL 插入表
  10. 如何查看node的版本及安装的位置?
  11. mysql查询时间段内的数据
  12. c++实现双向链表操作
  13. html checked属性值,HTML复选框的checked属性的值是多少?
  14. O2O、C2C、B2B、B2C的区别
  15. Quote Form OnLoad Implement Add Leftnav, count Activities
  16. Atlassian 域名被曝一次点击账户接管漏洞 可导致供应链攻击
  17. python执行源程序的方式是_python源程序执行的方式
  18. 程控电源测试使用小记
  19. 钢笔墨水能否代替打印机墨水_喷墨打印机该用染料墨水还是颜料墨水?
  20. 统计学 假设检验 总体均值的检验

热门文章

  1. 群晖上NVMe实测兼容机型
  2. 799元起魅族V8为何成黄章眼中国际爆款?网友:买贵的版本更值
  3. ASCII 和 Base64
  4. 一个大三学生的学习生活之感
  5. iOS 屏幕旋转问题总结
  6. 办公室合租的话需要注意什么问题?合租办公室有什么优缺点吗?
  7. 前端页面实现倒计时效果的几种方法
  8. 淘淘商城-之上传图片
  9. 学习感悟:态度决定一切
  10. 7彩苹果 天天好心情