

Implementing an LSTM Model for Text Generation

We show how to implement a LSTM (Long Short Term Memory) RNN for Shakespeare language generation. (Word level vocabulary)

将展示如何为莎士比亚语言生成实现LSTM(长短期记忆)RNN。 (词汇​​词汇)


# Implementing an LSTM RNN Model
#  Here we implement an LSTM model on all a data set of Shakespeare works.'''
We start by loading the necessary libraries and resetting the default computational graph.
import os
import re
import string
import requests
import numpy as np
import collections
import random
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.python.framework import ops
ops.reset_default_graph()'''We start a computational graph session.'''
sess = tf.Session()'''
Next, it is important to set the algorithm and data processing parameters.
接下来,设置算法和数据处理参数很重要。Parameter : Descriptionsmin_word_freq: Only attempt to model words that appear at least 5 times. 仅尝试对出现至少5次的单词进行建模.
rnn_size: size of our RNN (equal to the embedding size) RNN大小(等于嵌入大小)
epochs: Number of epochs to cycle through the data
batch_size: How many examples to train on at once
learning_rate: The learning rate or the convergence paramter 学习率或收敛度参数
training_seq_len: The length of the surrounding word group (e.g. 10 = 5 on each side) 周围单词组的长度(例如每侧10 = 5)
embedding_size: Must be equal to the rnn_size
save_every: How often to save the model
eval_every: How often to evaluate the model
prime_texts: List of test sentences
# Set RNN Parameters
min_word_freq = 5  # Trim the less frequent words off
rnn_size = 128  # RNN Model size
epochs = 10  # Number of epochs to cycle through data
batch_size = 100  # Train on this many examples at once
learning_rate = 0.001  # Learning rate
training_seq_len = 50  # how long of a word group to consider
embedding_size = rnn_size  # Word embedding size
save_every = 500  # How often to save model checkpoints
eval_every = 50  # How often to evaluate the test sentences
prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou']# Download/store Shakespeare data
data_dir = 'temp'
data_file = 'shakespeare.txt'
model_path = 'shakespeare_model'
full_model_dir = os.path.join(data_dir, model_path)# Declare punctuation to remove, everything except hyphens and apostrophes
# 声明标点符号以删除除连字符和撇号之外的所有内容
punctuation = string.punctuation
punctuation = ''.join([x for x in punctuation if x not in ['-', "'"]])# Make Model Directory
if not os.path.exists(full_model_dir):os.makedirs(full_model_dir) '''用于递归创建目录。'''# Make data directory
if not os.path.exists(data_dir):os.makedirs(data_dir)'''
Download the data if we don't have it saved already. The data comes from the Gutenberg Project
print('Loading Shakespeare Data')
# Check if file is downloaded.
if not os.path.isfile(os.path.join(data_dir, data_file)):print('Not found, downloading Shakespeare texts from www.gutenberg.org')shakespeare_url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt'# Get Shakespeare textresponse = requests.get(shakespeare_url)shakespeare_file = response.content# Decode binary into strings_text = shakespeare_file.decode('utf-8')# Drop first few descriptive paragraphs.s_text = s_text[7675:]# Remove newliness_text = s_text.replace('\r\n', '')s_text = s_text.replace('\n', '')# Write to filewith open(os.path.join(data_dir, data_file), 'w') as out_conn:out_conn.write(s_text)
else:# If file has been saved, load from that filewith open(os.path.join(data_dir, data_file), 'r') as file_conn:s_text = file_conn.read().replace('\n', '')# Clean text
print('Cleaning Text')
s_text = re.sub(r'[{}]'.format(punctuation), ' ', s_text)
s_text = re.sub('\s+', ' ', s_text).strip().lower()
Loading Shakespeare Data
Cleaning Text
Done loading/cleaning.
Define a function to build a word processing dictionary (word -> ix)
定义一个函数来构建一个文字处理字典(word - > ix)
# Build word vocabulary function
def build_vocab(text, min_freq):word_counts = collections.Counter(text.split(' '))# limit word counts to those more frequent than cutoff# 将字数限制为比截止频率更频繁的字数word_counts = {key: val for key, val in word_counts.items() if val > min_freq}# Create vocab --> index mappingwords = word_counts.keys()vocab_to_ix_dict = {key: (i_x+1) for i_x, key in enumerate(words)}# Add unknown key --> 0 indexvocab_to_ix_dict['unknown'] = 0# Create index --> vocab mappingix_to_vocab_dict = {val: key for key, val in vocab_to_ix_dict.items()}return ix_to_vocab_dict, vocab_to_ix_dict'''
Now we can build the index-vocabulary from the Shakespeare data.
# Build Shakespeare vocabulary
print('Building Shakespeare Vocab')
ix2vocab, vocab2ix = build_vocab(s_text, min_word_freq)
vocab_size = len(ix2vocab) + 1
print('Vocabulary Length = {}'.format(vocab_size))
# Sanity Check
# 完整性检查
assert(len(ix2vocab) == len(vocab2ix))# Convert text to word vectors
s_text_words = s_text.split(' ')
s_text_ix = []
for ix, x in enumerate(s_text_words):try:s_text_ix.append(vocab2ix[x])except KeyError:s_text_ix.append(0)
s_text_ix = np.array(s_text_ix)
Building Shakespeare Vocab
Vocabulary Length = 8009
''''''We define the LSTM model. The methods of interest are the __init__() method,
which defines all the model variables and operations,
and the sample() method which takes in a sample word and loops through to generate text.
我们定义LSTM模型。方法特定是__init __()方法[*1],它定义所有模型变量和操作,以及sample()方法,它接收一个样本字并循环生成文本。
# Define LSTM RNN Model
class LSTM_Model():def __init__(self, embedding_size, rnn_size, batch_size, learning_rate,training_seq_len, vocab_size, infer_sample=False):self.embedding_size = embedding_sizeself.rnn_size = rnn_sizeself.vocab_size = vocab_sizeself.infer_sample = infer_sampleself.learning_rate = learning_rateif infer_sample:self.batch_size = 1self.training_seq_len = 1else:self.batch_size = batch_sizeself.training_seq_len = training_seq_lenself.lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.rnn_size)self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32)self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])with tf.variable_scope('lstm_vars'):# Softmax Output WeightsW = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer())b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0))# Define Embedding# 定义嵌入embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.embedding_size],tf.float32, tf.random_normal_initializer())embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data)rnn_inputs = tf.split(axis=1, num_or_size_splits=self.training_seq_len, value=embedding_output)rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs]# If we are inferring (generating text), we add a 'loop' function# 如果我们推断(生成文本),我们添加一个'循环'函数# Define how to get the i+1 th input from the i th output# 定义如何从第i个输出获得第i + 1个输入def inferred_loop(prev):# Apply hidden layerprev_transformed = tf.matmul(prev, W) + b# Get the index of the output (also don't run the gradient)prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1)) '''*2'''# Get embedded vectorout = tf.nn.embedding_lookup(embedding_mat, prev_symbol)return outdecoder = tf.contrib.legacy_seq2seq.rnn_decoderoutputs, last_state = decoder(rnn_inputs_trimmed,self.initial_state,self.lstm_cell,loop_function=inferred_loop if infer_sample else None)# Non inferred outputsoutput = tf.reshape(tf.concat(axis=1, values=outputs), [-1, self.rnn_size])# Logits and outputself.logit_output = tf.matmul(output, W) + bself.model_output = tf.nn.softmax(self.logit_output)loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_exampleloss = loss_fun([self.logit_output], [tf.reshape(self.y_output, [-1])],[tf.ones([self.batch_size * self.training_seq_len])])self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len)self.final_state = last_stategradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5)optimizer = tf.train.AdamOptimizer(self.learning_rate)self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables()))def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'):state = sess.run(self.lstm_cell.zero_state(1, tf.float32))word_list = prime_text.split()for word in word_list[:-1]:x = np.zeros((1, 1))x[0, 0] = vocab[word]feed_dict = {self.x_data: x, self.initial_state: state}[state] = sess.run([self.final_state], feed_dict=feed_dict)out_sentence = prime_textword = word_list[-1]for n in range(num):x = np.zeros((1, 1))x[0, 0] = vocab[word]feed_dict = {self.x_data: x, self.initial_state: state}[model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict)sample = np.argmax(model_output[0])if sample == 0:breakword = words[sample]out_sentence = out_sentence + ' ' + wordreturn (out_sentence)'''In order to use the same model (with the same trained variables), we need to share the variable scope between the trained model and the test model.
# Define LSTM Model
lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate,training_seq_len, vocab_size)# Tell TensorFlow we are reusing the scope for the testing
# 告诉TensorFlow我们正在重复使用测试范围
with tf.variable_scope(tf.get_variable_scope(), reuse=True):test_lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate,training_seq_len, vocab_size, infer_sample=True)'''
We need to save the model, so we create a model saving operation.
# Create model saver
saver = tf.train.Saver(tf.global_variables())'''
Let's calculate how many batches are needed for each epoch and split up the data accordingly.
# Create batches for each epoch
num_batches = int(len(s_text_ix)/(batch_size * training_seq_len)) + 1
# Split up text indices into subarrays, of equal size
batches = np.array_split(s_text_ix, num_batches)
# Reshape each split into [batch_size, training_seq_len]
batches = [np.resize(x, [batch_size, training_seq_len]) for x in batches]# Initialize all variables
init = tf.global_variables_initializer()
sess.run(init)'''Training the model!'''
# Train model
train_loss = []
iteration_count = 1
for epoch in range(epochs):# Shuffle word indicesrandom.shuffle(batches)# Create targets from shuffled batchestargets = [np.roll(x, -1, axis=1) for x in batches]# Run a through one epochprint('Starting Epoch #{} of {}.'.format(epoch+1, epochs))# Reset initial LSTM state every epochstate = sess.run(lstm_model.initial_state)for ix, batch in enumerate(batches):training_dict = {lstm_model.x_data: batch, lstm_model.y_output: targets[ix]}c, h = lstm_model.initial_statetraining_dict[c] = state.ctraining_dict[h] = state.htemp_loss, state, _ = sess.run([lstm_model.cost, lstm_model.final_state, lstm_model.train_op],feed_dict=training_dict)train_loss.append(temp_loss)# Print status every 10 gensif iteration_count % 10 == 0:summary_nums = (iteration_count, epoch+1, ix+1, num_batches+1, temp_loss)print('Iteration: {}, Epoch: {}, Batch: {} out of {}, Loss: {:.2f}'.format(*summary_nums))# Save the model and the vocabif iteration_count % save_every == 0:# Save modelmodel_file_name = os.path.join(full_model_dir, 'model')saver.save(sess, model_file_name, global_step=iteration_count)print('Model Saved To: {}'.format(model_file_name))# Save vocabularydictionary_file = os.path.join(full_model_dir, 'vocab.pkl')with open(dictionary_file, 'wb') as dict_file_conn:pickle.dump([vocab2ix, ix2vocab], dict_file_conn)if iteration_count % eval_every == 0:for sample in prime_texts:print(test_lstm_model.sample(sess, ix2vocab, vocab2ix, num=10, prime_text=sample))iteration_count += 1# Plot loss over time
plt.plot(train_loss, 'k-')
plt.title('Sequence to Sequence Loss')
Starting Epoch #1 of 10.
Iteration: 10, Epoch: 1, Batch: 10 out of 182, Loss: 9.90






