[python] view
plain copy


import timeit


import PIL.Image as Image

except ImportError:

import Image

import numpy

import theano

import theano.tensor as T

import os

from theano.tensor.shared_randomstreams import RandomStreams

from utils import tile_raster_images

from logistic_sgd import load_data


class RBM(object):

def __init__(self,input=None,n_visible=784,n_hidden=500,W=None,hbias=None,vbias=None,numpy_rng=None,theano_rng=None):

self.n_visible = n_visible

self.n_hidden = n_hidden

if numpy_rng is None:

numpy_rng = numpy.random.RandomState(1234)

if theano_rng is None:

theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

#参数初始化公式 -4*sqrt(6./(n_visible+n_hidden))

if W is None:

initial_W = numpy.asarray(


low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),

high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),

size=(n_visible, n_hidden)





W = theano.shared(value=initial_W, name='W', borrow=True)


if hbias is None:

hbias = theano.shared(value=numpy.zeros(n_hidden,dtype=theano.config.floatX),name='hbias',borrow=True)


if vbias is None:

vbias = theano.shared(value=numpy.zeros(n_visible,dtype=theano.config.floatX),name='vbias',borrow=True)


self.input = input

if not input:

self.input = T.matrix('input')

self.W = W

self.hbias = hbias

self.vbias = vbias

self.theano_rng = theano_rng


self.params = [self.W, self.hbias, self.vbias]


def free_energy(self, v_sample):

''''' Function to compute the free energy '''

wx_b = T.dot(v_sample, self.W) + self.hbias

vbias_term = T.dot(v_sample, self.vbias)

hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)

return -hidden_term - vbias_term

#前向传导 从可视层到隐藏层,计算p(h=1|v)

def propup(self, vis):

pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias

return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]

#根据可视层状态 计算隐藏层状态

def sample_h_given_v(self, v0_sample):

#计算隐藏层每个神经元为状态1的概率,即 p(h=1|v)

pre_sigmoid_h1, h1_mean = self.propup(v0_sample)

#根据给定的概率,进行采样,就像抛硬币一样。n表示抛硬币的次数 ,每个神经元随机采样一次,就可以得到每个神经元的状态了


h1_sample = self.theano_rng.binomial(size=h1_mean.shape,n=1,p=h1_mean,dtype=theano.config.floatX)

return [pre_sigmoid_h1, h1_mean, h1_sample]


def propdown(self, hid):

pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias

return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]


def sample_v_given_h(self, h0_sample):

pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)

v1_sample = self.theano_rng.binomial(size=v1_mean.shape,

n=1, p=v1_mean,


return [pre_sigmoid_v1, v1_mean, v1_sample]

#计算从隐藏层-》可视层-》隐藏层的一个状态转移过程,相当于一次的Gibbs sampling采样

def gibbs_hvh(self, h0_sample):

pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)

pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)

return [pre_sigmoid_v1, v1_mean, v1_sample,

pre_sigmoid_h1, h1_mean, h1_sample]

#计算从可视层-》隐藏层-》可视层的状态转移过程,相当于一次的Gibbs sampling采样

def gibbs_vhv(self, v0_sample):

pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)

pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)

return [pre_sigmoid_h1, h1_mean, h1_sample,

pre_sigmoid_v1, v1_mean, v1_sample]

# k用于设置Gibbs sampling采样次数,也就是相当于来回跑了多少次(来回一趟算一次)

def get_cost_updates(self, lr=0.1, persistent=None, k=1):

# 当我们输入数据的时候,首先根据输入x,计算隐藏层的概率分布,概率分布的采样结果

pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)

# decide how to initialize persistent chain:

# for CD, we use the newly generate hidden sample

# for PCD, we initialize from the old state of the chain

if persistent is None:

chain_start = ph_sample


chain_start = persistent












) = theano.scan(self.gibbs_hvh,outputs_info=[None, None, None, None, None, chain_start],n_steps=k)


chain_end = nv_samples[-1]


cost = T.mean(self.free_energy(self.input)) - T.mean(



gparams = T.grad(cost, self.params, consider_constant=[chain_end])

for gparam, param in zip(gparams, self.params):

updates[param] = param - gparam * T.cast(lr,dtype=theano.config.floatX)

if persistent:

# Note that this works only if persistent is a shared variable

updates[persistent] = nh_samples[-1]

# pseudo-likelihood is a better proxy for PCD

monitoring_cost = self.get_pseudo_likelihood_cost(updates)


# reconstruction cross-entropy is a better proxy for CD

monitoring_cost = self.get_reconstruction_cost(updates,pre_sigmoid_nvs[-1])

return monitoring_cost, updates

# end-snippet-4

def get_pseudo_likelihood_cost(self, updates):

"""Stochastic approximation to the pseudo-likelihood"""

# index of bit i in expression p(x_i | x_{\i})

bit_i_idx = theano.shared(value=0, name='bit_i_idx')

# binarize the input image by rounding to nearest integer

xi = T.round(self.input)

# calculate free energy for the given bit configuration

fe_xi = self.free_energy(xi)

# flip bit x_i of matrix xi and preserve all other bits x_{\i}

# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns

# the result to xi_flip, instead of working in place on xi.

xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])

# calculate free energy with bit flipped

fe_xi_flip = self.free_energy(xi_flip)

# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))

cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -


# increment bit_i_idx % number as part of updates

updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible

return cost

def get_reconstruction_cost(self, updates, pre_sigmoid_nv):

"""Approximation to the reconstruction error

Note that this function requires the pre-sigmoid activation as

input. To understand why this is so you need to understand a

bit about how Theano works. Whenever you compile a Theano

function, the computational graph that you pass as input gets

optimized for speed and stability. This is done by changing

several parts of the subgraphs with others. One such

optimization expresses terms of the form log(sigmoid(x)) in

terms of softplus. We need this optimization for the

cross-entropy since sigmoid of numbers larger than 30. (or

even less then that) turn to 1. and numbers smaller than

-30. turn to 0 which in terms will force theano to compute

log(0) and therefore we will get either -inf or NaN as

cost. If the value is expressed in terms of softplus we do not

get this undesirable behaviour. This optimization usually

works fine, but here we have a special case. The sigmoid is

applied inside the scan op, while the log is

outside. Therefore Theano will only see log(scan(..)) instead

of log(sigmoid(..)) and will not apply the wanted

optimization. We can not go and replace the sigmoid in scan

with something else also, because this only needs to be done

on the last step. Therefore the easiest and more efficient way

is to get also the pre-sigmoid activation as an output of

scan, and apply both the log and sigmoid outside scan such

that Theano can catch and optimize the expression.


cross_entropy = T.mean(


self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +

(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),




return cross_entropy


def test_rbm(learning_rate=0.1, training_epochs=15,

dataset='mnist.pkl.gz', batch_size=20,

n_chains=20, n_samples=10, output_folder='rbm_plots',


datasets = load_data(dataset)

train_set_x, train_set_y = datasets[0]

test_set_x, test_set_y = datasets[2]


n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size

index = T.lscalar() # index to a [mini]batch

x = T.matrix('x') # the data is presented as rasterized images

rng = numpy.random.RandomState(123)

theano_rng = RandomStreams(rng.randint(2 ** 30))

persistent_chain = theano.shared(numpy.zeros((batch_size, n_hidden),dtype=theano.config.floatX),borrow=True)

#网络构建 ,可视层神经元个数为28*28,隐藏层神经元为500

rbm = RBM(input=x, n_visible=28 * 28,n_hidden=n_hidden, numpy_rng=rng, theano_rng=theano_rng)

# get the cost and the gradient corresponding to one step of CD-15

cost, updates = rbm.get_cost_updates(lr=learning_rate,persistent=persistent_chain, k=15)


# Training the RBM #


if not os.path.isdir(output_folder):



# start-snippet-5

# it is ok for a theano function to have no output

# the purpose of train_rbm is solely to update the RBM parameters

train_rbm = theano.function([index],cost,updates=updates,givens={x: train_set_x[index * batch_size: (index + 1) * batch_size]},name='train_rbm')

plotting_time = 0.

start_time = timeit.default_timer()

# go through training epochs

for epoch in xrange(training_epochs):

# go through the training set

mean_cost = []

for batch_index in xrange(n_train_batches):

mean_cost += [train_rbm(batch_index)]

print 'Training epoch %d, cost is ' % epoch, numpy.mean(mean_cost)

# Plot filters after each training epoch

plotting_start = timeit.default_timer()

# Construct image from the weight matrix

image = Image.fromarray(



img_shape=(28, 28),

tile_shape=(10, 10),

tile_spacing=(1, 1)



image.save('filters_at_epoch_%i.png' % epoch)

plotting_stop = timeit.default_timer()

plotting_time += (plotting_stop - plotting_start)

end_time = timeit.default_timer()

pretraining_time = (end_time - start_time) - plotting_time

print ('Training took %f minutes' % (pretraining_time / 60.))

# end-snippet-5 start-snippet-6


# Sampling from the RBM #


# find out the number of test samples

number_of_test_samples = test_set_x.get_value(borrow=True).shape[0]

# pick random test examples, with which to initialize the persistent chain

test_idx = rng.randint(number_of_test_samples - n_chains)

persistent_vis_chain = theano.shared(


test_set_x.get_value(borrow=True)[test_idx:test_idx + n_chains],




# end-snippet-6 start-snippet-7

plot_every = 1000

# define one step of Gibbs sampling (mf = mean-field) define a

# function that does `plot_every` steps before returning the

# sample for plotting











) = theano.scan(


outputs_info=[None, None, None, None, None, persistent_vis_chain],



# add to updates the shared variable that takes care of our persistent

# chain :.

updates.update({persistent_vis_chain: vis_samples[-1]})

# construct the function that implements our persistent chain.

# we generate the "mean field" activations for plotting and the actual

# samples for reinitializing the state of our persistent chain

sample_fn = theano.function(









# create a space to store the image for plotting ( we need to leave

# room for the tile_spacing as well)

image_data = numpy.zeros(

(29 * n_samples + 1, 29 * n_chains - 1),



for idx in xrange(n_samples):

# generate `plot_every` intermediate samples that we discard,

# because successive samples in the chain are too correlated

vis_mf, vis_sample = sample_fn()

print ' ... plotting sample ', idx

image_data[29 * idx:29 * idx + 28, :] = tile_raster_images(


img_shape=(28, 28),

tile_shape=(1, n_chains),

tile_spacing=(1, 1)


# construct image

image = Image.fromarray(image_data)


# end-snippet-7


if __name__ == '__main__':

