Facial Expression Recognition - TensorFlow CNN

Kaggle - Challenges in Representation Learning: Facial Expression Recognition Challenge

https://www.kaggle.com/c/challenges-in-representation-learning-facial-expression-recognition-challenge/data

The data consists of 48x48 pixel grayscale images of faces. The faces have been automatically registered so that the face is more or less centered and occupies about the same amount of space in each image. The task is to categorize each face based on the emotion shown in the facial expression in to one of seven categories (0=Angry, 1=Disgust, 2=Fear, 3=Happy, 4=Sad, 5=Surprise, 6=Neutral).

train.csv contains two columns, "emotion" and "pixels". The "emotion" column contains a numeric code ranging from 0 to 6, inclusive, for the emotion that is present in the image. The "pixels" column contains a string surrounded in quotes for each image. The contents of this string a space-separated pixel values in row major order. test.csv contains only the "pixels" column and your task is to predict the emotion column.

The training set consists of 28,709 examples. The public test set used for the leaderboard consists of 3,589 examples. The final test set, which was used to determine the winner of the competition, consists of another 3,589 examples.

This dataset was prepared by Pierre-Luc Carrier and Aaron Courville, as part of an ongoing research project. They have graciously provided the workshop organizers with a preliminary version of their dataset to use for this contest.

In [1]:
%matplotlib inline
In [2]:
import sys

# To import cv2, which datagenerator uses
sys.path.append('/Users/rahulsridhar/anaconda2/lib/python2.7/site-packages') 
In [3]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from os import listdir
from os.path import isfile, join
import os
import datagenerator
import cv2
from timeit import default_timer as timer
In [4]:
np.random.seed(15)

Create datasets

In [7]:
# Training dataset
with open('train.txt','w') as f:
    for i in range(7):
        path = 'Training/'+str(i)
        onlyfiles = [fl for fl in listdir(path) if isfile(join(path, fl))]
        print len(onlyfiles), onlyfiles[0]
        for curr_file in onlyfiles:
            if curr_file.find(".DS_Store") != -1:
                continue
            fname = path + '/' + curr_file + ' ' + str(i) + '\n'
            f.write(fname)
3995 Training_10002922.jpg
436 Training_10191968.jpg
4097 Training_10069035.jpg
7214 Training_10001243.jpg
4830 Training_10023337.jpg
3171 Training_10081105.jpg
4964 Training_10002382.jpg
In [8]:
# Public test dataset
with open('test.txt','w') as f:
    for i in range(7):
        path = 'PublicTest/'+str(i)
        onlyfiles = [fl for fl in listdir(path) if isfile(join(path, fl))]
        print len(onlyfiles), onlyfiles[0]
        for curr_file in onlyfiles:
            if curr_file.find(".DS_Store") != -1:
                continue
            fname = path + '/' + curr_file + ' ' + str(i) + '\n'
            f.write(fname)
467 PublicTest_10113400.jpg
56 PublicTest_1123625.jpg
496 PublicTest_1023506.jpg
895 PublicTest_100405.jpg
653 PublicTest_10308692.jpg
415 PublicTest_100155.jpg
607 PublicTest_10110281.jpg
In [9]:
# Verify that all files have been written to the text files
with open("train.txt", 'r') as f1, open("test.txt", 'r') as f2:
    train_lines = f1.readlines()
    test_lines = f2.readlines()
    print len(train_lines), len(test_lines)    
28707 3589
In [10]:
# Delete unnecessary objects
del train_lines, test_lines, onlyfiles, f, f1, f2, fl, i, path, curr_file, fname

Tensorflow Model

In [6]:
# Few hyperparameters
n_input = 2304 # Image shape 48 by 48
n_classes = 7 # Number of output classes 7
In [7]:
# Placeholders for data
x = tf.placeholder(tf.float32, [None, 48, 48], name = "Input")
y = tf.placeholder(tf.float32, [None, n_classes], name = "Output")
In [8]:
keep_prob = tf.placeholder(tf.float32) # Placeholder for dropout rate
In [9]:
def xavier_init(size):
    '''
    Function to perform Xavier initialization
    '''
    in_dim = 1
    for i in range(len(size)-1):
        in_dim *= size[i]
    xavier_stddev = 1./tf.sqrt(in_dim/2.)
    return tf.random_normal(shape=size, stddev=xavier_stddev)
In [10]:
def he_init(size):
    '''
    Function to perform He initialization
    '''
    in_dim = 1
    for i in range(len(size)-1):
        in_dim *= size[i]
    he_stddev = tf.sqrt(2./in_dim)
    return tf.random_normal(shape=size, stddev=he_stddev)
In [11]:
# Convolution and pooling functions
def conv2d(x, W, b, strides = 1, pad = 'SAME'):
    # conv 2d wrapper with relu activation
    x = tf.nn.conv2d(x, W, strides =[1, strides, strides, 1], padding=pad)
    x = tf.nn.bias_add(x, b)
    return tf.nn.relu(x)

def maxpool2d(x, k=2, pad = 'SAME'):
    # maxpool 2d wrapper
    return tf.nn.max_pool(x, ksize = [1, k, k, 1], strides=[1, k, k, 1], \
                          padding=pad)

# Note:
# 1, x, x, 1:
# First dim = stride or k for traversing #images in each batch - how many do you want to traverse in each batch
# Second dim = height
# Third dim = width
# Fourth dim = depth/ number of channels
# Filter dimension - filter height by width by number of channels by number of filters
# padding = same ensures that output feature map is of same size as input feature map
# input to pooling - 4D tensor (NHWC - num samples x height x width x channels)
In [12]:
# The network
def conv_net(x, weights, biases, scales, offsets, dropout):
    # Reshape image
    # Grayscale; -1 ==> tf computes that dim based on others and input
    
    # Conv layers
    x = tf.reshape(x, shape = [-1, 48, 48, 1])
    conv1 = conv2d(x, weights['wc1'], biases['bc1'], strides=1, pad='SAME') # Conv layer
    m, v = tf.nn.moments(conv1, [0])
    conv1 = tf.nn.batch_normalization(conv1, m, v, offsets['oc1'], scales['sc1'], 1e-5)
    conv1 = tf.nn.relu(conv1)
    #conv1 = maxpool2d(conv1, k=2) # Down sampling
    print conv1.shape
    
    conv2 = conv2d(conv1, weights['wc2'], biases['bc2'], strides=1, pad='SAME') # Conv layer 2
    m, v = tf.nn.moments(conv2, [0])
    conv2 = tf.nn.batch_normalization(conv2, m, v, offsets['oc2'], scales['sc2'], 1e-5)
    conv2 = tf.nn.relu(conv2)
    #conv2 = maxpool2d(conv2, k=2, pad='SAME') # Down sampling
    print conv2.shape

    conv3 = conv2d(conv2, weights['wc3'], biases['bc3'], strides=1, pad='SAME') # Conv layer 2
    m, v = tf.nn.moments(conv3, [0])
    conv3 = tf.nn.batch_normalization(conv3, m, v, offsets['oc3'], scales['sc3'], 1e-5)
    conv3 = tf.nn.relu(conv3)
    conv3 = maxpool2d(conv3, k=2, pad='SAME') # Down sampling
    conv3 = tf.nn.dropout(conv3, keep_prob=dropout)
    print conv3.shape

# Additional conv layers
#     conv4 = conv2d(conv3, weights['wc4'], biases['bc4'], strides=1, pad='SAME') # Conv layer 2
#     m, v = tf.nn.moments(conv4, [0])
#     conv4 = tf.nn.batch_normalization(conv4, m, v, offsets['oc4'], scales['sc4'], 1e-5)
#     conv4 = tf.nn.relu(conv4)
#     #conv4 = maxpool2d(conv4, k=2, pad='SAME') # Down sampling
#     print conv4.shape

#     conv5 = conv2d(conv4, weights['wc5'], biases['bc5'], strides=1, pad='SAME') # Conv layer 2
#     m, v = tf.nn.moments(conv5, [0])
#     conv5 = tf.nn.batch_normalization(conv5, m, v, offsets['oc5'], scales['sc5'], 1e-5)
#     conv5 = tf.nn.relu(conv5)
#     conv5 = maxpool2d(conv5, k=2, pad='SAME') # Down sampling
#     conv5 = tf.nn.dropout(conv5, keep_prob=dropout)
#     print conv5.shape

    # Fully connected layers
    fc1 = tf.reshape(conv3, [-1, weights['wf1'].get_shape().as_list()[0]]) # Batch size by 12*12*64
    fc1 = tf.add(tf.matmul(fc1, weights['wf1']), biases['bf1'])
    m, v = tf.nn.moments(fc1, [0])
    fc1 = tf.nn.batch_normalization(fc1, m, v, offsets['of1'], scales['sf1'], 1e-5)
    fc1 = tf.nn.relu(fc1)
    fc1 = tf.nn.dropout(fc1, keep_prob=dropout)
    print fc1.shape
                      
    fc2 = tf.add(tf.matmul(fc1, weights['wf2']), biases['bf2'])
    m, v = tf.nn.moments(fc2, [0])
    fc2 = tf.nn.batch_normalization(fc2, m, v, offsets['of2'], scales['sf2'], 1e-5)
    fc2 = tf.nn.relu(fc2)
    fc2 = tf.nn.dropout(fc2, keep_prob=dropout)
    print fc2.shape
    
# Additional fully connected layer    
#     fc3 = tf.add(tf.matmul(fc2, weights['wf3']), biases['bf3'])
#     m, v = tf.nn.moments(fc3, [0])
#     fc3 = tf.nn.batch_normalization(fc3, m, v, offsets['of3'], scales['sf3'], 1e-5)
#     fc3 = tf.nn.sigmoid(fc3)
#     fc3 = tf.nn.dropout(fc3, keep_prob=dropout)
#     print fc3.shape
                          
    out = tf.add(tf.matmul(fc2, weights['out']), biases['out'])
    print out.shape

    return out
In [19]:
# Weights, biases, scales and offsets (for batch norm)
weights = {
    'wc1': tf.Variable(he_init([4, 4, 1, 64])), # 5x5 conv, 1 ch, 32 outputs
    'wc2': tf.Variable(he_init([4, 4, 64, 64])),# 5x5 conv, 32 ch, 64 outputs
    'wc3': tf.Variable(he_init([3, 3, 64, 128])),# 5x5 conv, 32 ch, 64 outputs
    'wc4': tf.Variable(he_init([3, 3, 64, 128])),# 5x5 conv, 32 ch, 64 outputs
    'wc5': tf.Variable(he_init([3, 3, 128, 128])),# 5x5 conv, 32 ch, 64 outputs
    
    # Vectorized ==> can use fully connected n/w - in order to convert the final
    # 3D outputs into n_classes
    #'wf1': tf.Variable(he_init([7*7*64, 1024])),
    'wf1': tf.Variable(he_init([24*24*128, 1024])),
    'wf2': tf.Variable(he_init([1024, 512])),
    'wf3': tf.Variable(he_init([1024, 128])),
    'out': tf.Variable(he_init([512, n_classes])) #1024 inputs, 10 output classes
}

biases = {
    'bc1': tf.Variable(tf.random_normal([64])),
    'bc2': tf.Variable(tf.random_normal([64])),
    'bc3': tf.Variable(tf.random_normal([128])),
    'bc4': tf.Variable(tf.random_normal([128])),
    'bc5': tf.Variable(tf.random_normal([128])),
    'bf1': tf.Variable(tf.random_normal([1024])),
    'bf2': tf.Variable(tf.random_normal([512])),
    'bf3': tf.Variable(tf.random_normal([128])),
    'out': tf.Variable(tf.random_normal([n_classes]))
}

scales = {
    'sc1': tf.Variable(tf.random_normal([64])),
    'sc2': tf.Variable(tf.random_normal([64])),
    'sc3': tf.Variable(tf.random_normal([128])),
    'sc4': tf.Variable(tf.random_normal([128])),
    'sc5': tf.Variable(tf.random_normal([128])),
    'sf1': tf.Variable(tf.random_normal([1024])),
    'sf2': tf.Variable(tf.random_normal([512])),
    'sf3': tf.Variable(tf.random_normal([128])),
    'out': tf.Variable(tf.random_normal([n_classes]))

}

offsets = {
    'oc1': tf.Variable(tf.random_normal([64])),
    'oc2': tf.Variable(tf.random_normal([64])),
    'oc3': tf.Variable(tf.random_normal([128])),
    'oc4': tf.Variable(tf.random_normal([128])),
    'oc5': tf.Variable(tf.random_normal([128])),
    'of1': tf.Variable(tf.random_normal([1024])),
    'of2': tf.Variable(tf.random_normal([512])),
    'of3': tf.Variable(tf.random_normal([128])),
    'out': tf.Variable(tf.random_normal([n_classes]))
}
In [20]:
# Build model and verify shapes
pred = conv_net(x, weights, biases, scales, offsets, keep_prob)
(?, 48, 48, 64)
(?, 48, 48, 64)
(?, 24, 24, 128)
(?, 1024)
(?, 512)
(?, 7)
In [32]:
# Define cost function and optimizer

# L2 regularization
beta = 0.01
learning_rate = 0.0007

regularizer = tf.nn.l2_loss(weights['wc1']) + tf.nn.l2_loss(weights['wc2']) + tf.nn.l2_loss(weights['wc3']) + \
              tf.nn.l2_loss(weights['wc4']) + tf.nn.l2_loss(weights['wc5']) + tf.nn.l2_loss(weights['wf1']) + \
              tf.nn.l2_loss(weights['wf2']) + tf.nn.l2_loss(weights['wf3']) + tf.nn.l2_loss(weights['out'])
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=y)) + \
       beta*regularizer

optimizer = tf.train.AdamOptimizer(learning_rate).minimize(cost)
In [33]:
# Evaluate model
correct_pred = tf.equal(tf.arg_max(pred, 1), tf.arg_max(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

init = tf.global_variables_initializer()
In [34]:
# Load training data image mean
training_im_mean = np.load("Training_Img_Mean.npy")
In [35]:
# Verify consistency of image mean
print training_im_mean
print training_im_mean.shape
print "Mean min max", np.min(training_im_mean), np.max(training_im_mean)
[[ 0.48654914  0.47327606  0.46036798 ...,  0.45633206  0.4688883
   0.48128119]
 [ 0.48142738  0.46768493  0.45549841 ...,  0.4518278   0.46461     0.47703289]
 [ 0.47529901  0.4623244   0.45051726 ...,  0.44730555  0.45989347
   0.47240412]
 ..., 
 [ 0.47569951  0.47143612  0.46740514 ...,  0.45558879  0.45928385
   0.46306767]
 [ 0.47593024  0.47159593  0.46722158 ...,  0.45585475  0.45931873
   0.46306987]
 [ 0.47595055  0.47129647  0.46623525 ...,  0.45572917  0.45900836
   0.46238783]]
(48, 48)
Mean min max 0.419400353 0.693280081443
In [36]:
reload(datagenerator)
Out[36]:
<module 'datagenerator' from 'datagenerator.pyc'>
In [ ]:
# Train the network

start = timer()
training_iters = 30000
batch_size = 256
display_step = 100
dropout_inp = 0.5  # Probability to keep units
train_file = 'train.txt'
test_file = 'test.txt'


train_generator = datagenerator.ImageDataGenerator(train_file, horizontal_flip=True, shuffle=True, \
                                                   mean=training_im_mean, scale_size=(48, 48), nb_classes=7)
test_generator = datagenerator.ImageDataGenerator(test_file, horizontal_flip=False, shuffle=False, \
                                                   mean=training_im_mean, scale_size=(48, 48), nb_classes=7)
# test_generator = datagenerator.ImageDataGenerator(test_file, horizontal_flip=False, shuffle=False, \
#                                                    mean=np.zeros((48, 48)), scale_size=(48, 48), nb_classes=7)

# Number of batches per epoch for each dataset
train_batches_per_epoch = np.floor(train_generator.data_size/batch_size).astype(np.int16)
test_batches_per_epoch = np.floor(test_generator.data_size/batch_size).astype(np.int16)

with tf.Session() as sess:
    sess.run(init)
    step = 1
    
    while step*batch_size < training_iters:
        batch_x, batch_y = train_generator.next_batch(batch_size)
        sess.run(optimizer, feed_dict = {x:batch_x, y:batch_y, keep_prob: dropout_inp})
        
        if step%display_step == 0:
            loss, acc = sess.run([cost, accuracy], feed_dict={x:batch_x, y:batch_y,
                                                             keep_prob: 1.})
            print "\nIter " + str(step*batch_size) + " Loss=", loss, "Accuracy=", acc, "\n"
        print step, 
        step += 1
    print "-----Optimization Done-----"
    
    # Test accuracy - sum up accuracies on each of the test batches
    test_acc = 0.
    test_count = 0
    for _ in range(test_batches_per_epoch):
        batch_tx, batch_ty = test_generator.next_batch(batch_size)
        acc = sess.run(accuracy, feed_dict={x: batch_tx,
                                            y: batch_ty,
                                            keep_prob: 1.})
        test_acc += acc
        test_count += 1
    test_acc /= test_count
    print("Test Accuracy = {:.7f}".format(test_acc))    
    
os.system('say "Your program has finished"')
end = timer()
print "\nTime elapsed = ", end-start, " seconds"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
In [ ]: