import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import concatenate
import os
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
%matplotlib inline
path = ''
image_path = os.path.join(path, './data/CameraRGB/')
mask_path = os.path.join(path, './data/CameraMask/')
image_list = os.listdir(image_path)
mask_list = os.listdir(mask_path)
image_list = [image_path+i for i in image_list]
mask_list = [mask_path+i for i in mask_list]
import imageio
N = 1010
img = imageio.imread(image_list[N])
mask = imageio.imread(mask_list[N])
#mask = np.array([max(mask[i, j]) for i in range(mask.shape[0]) for j in range(mask.shape[1])]).reshape(img.shape[0], img.shape[1])
fig, arr = plt.subplots(1, 2, figsize=(14, 10))
arr[1].imshow(mask[:, :, 0])
image_list_ds =, shuffle=False)
mask_list_ds =, shuffle=False)
for path in zip(image_list_ds.take(3), mask_list_ds.take(3)):
image_filenames = tf.constant(image_list)
masks_filenames = tf.constant(mask_list)
dataset =, masks_filenames))
for image, mask in dataset.take(1):
def process_path(image_path, mask_path):
img =
img = tf.image.decode_png(img, channels=3)
img = tf.image.convert_image_dtype(img, tf.float32)
mask =
mask = tf.image.decode_png(mask, channels=3)
mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
return img, mask
def preprocess(image, mask):
input_image = tf.image.resize(image, (96, 128), method='nearest')
input_mask = tf.image.resize(mask, (96, 128), method='nearest')
return input_image, input_mask
image_ds =
processed_image_ds =
def conv_block(inputs=None, n_filters=32, dropout_prob=0, max_pooling=True):
Convolutional downsampling block
inputs -- Input tensor
n_filters -- Number of filters for the convolutional layers
dropout_prob -- Dropout probability
max_pooling -- Use MaxPooling2D to reduce the spatial dimensions of the output volume
next_layer, skip_connection -- Next layer and skip connection outputs
conv = Conv2D(n_filters, # Number of filters
3, # Kernel size
kernel_initializer= 'he_normal')(inputs)
conv = Conv2D(n_filters, # Number of filters
3,# Kernel size
kernel_initializer= 'he_normal')(conv)
# if dropout_prob > 0 add a dropout layer, with the variable dropout_prob as parameter
if dropout_prob > 0:
conv = Dropout(dropout_prob)(conv)
# if max_pooling is True add a MaxPooling2D with 2x2 pool_size
if max_pooling:
next_layer = MaxPooling2D(2,strides=2)(conv)
next_layer = conv
skip_connection = conv
return next_layer, skip_connection
def upsampling_block(expansive_input, contractive_input, n_filters=32):
Convolutional upsampling block
expansive_input -- Input tensor from previous layer
contractive_input -- Input tensor from previous skip layer
n_filters -- Number of filters for the convolutional layers
conv -- Tensor output
up = Conv2DTranspose(
n_filters, # number of filters
3,# Kernel size
# Merge the previous output and the contractive_input
merge = concatenate([up, contractive_input], axis=3)
conv = Conv2D(n_filters, # Number of filters
3,# Kernel size
kernel_initializer= 'he_normal')(merge)
conv = Conv2D(n_filters, # Number of filters
3,# Kernel size
kernel_initializer= 'he_normal')(conv)
return conv
This is where you'll put it all together, by chaining the encoder, bottleneck, and decoder! You'll need to specify the number of output channels, which for this particular set would be 23. That's because there are 23 possible labels for each pixel in this self-driving car dataset.
def unet_model(input_size=(96, 128, 3), n_filters=32, n_classes=23):
Unet model
input_size -- Input shape
n_filters -- Number of filters for the convolutional layers
n_classes -- Number of output classes
model -- tf.keras.Model
inputs = Input(input_size)
# Contracting Path (encoding)
# Add a conv_block with the inputs of the unet_ model and n_filters
cblock1 = conv_block(inputs=inputs, n_filters=n_filters*1)
# Chain the first element of the output of each block to be the input of the next conv_block.
# Double the number of filters at each new step
cblock2 = conv_block(inputs=cblock1[0], n_filters=n_filters*2)
cblock3 = conv_block(inputs=cblock2[0], n_filters=n_filters*4)
# Include a dropout of 0.3 for this layer
cblock4 = conv_block(inputs=cblock3[0], n_filters=n_filters*8,dropout_prob=0.3)
# Include a dropout of 0.3 for this layer, and avoid the max_pooling layer
cblock5 = conv_block(inputs=cblock4[0], n_filters=n_filters*16,dropout_prob=0.3, max_pooling=False)
# Expanding Path (decoding)
# Add the first upsampling_block.
# From here,at each step, use half the number of filters of the previous block
# Use the cblock5[0] as expansive_input and cblock4[1] as contractive_input and n_filters * 8
ublock6 = upsampling_block(cblock5[0], cblock4[1], n_filters*8)
# Chain the output of the previous block as expansive_input and the corresponding contractive block output.
# Note that you must use the second element of the contractive block i.e before the maxpooling layer.
ublock7 = upsampling_block(ublock6, cblock3[1], n_filters*4)
ublock8 = upsampling_block(ublock7, cblock2[1], n_filters*2)
ublock9 = upsampling_block(ublock8, cblock1[1], n_filters*1)
conv9 = Conv2D(n_filters,
# Add a Conv2D layer with n_classes filter, kernel size of 1 and a 'same' padding
conv10 = Conv2D(n_classes, 1, padding='same')(conv9)
model = tf.keras.Model(inputs=inputs, outputs=conv10)
return model
img_height = 96
img_width = 128
num_channels = 3
unet = unet_model((img_height, img_width, num_channels))
Below, define a function that allows you to display both an input image, and its ground truth: the true mask. The true mask is what your trained model output is aiming to get as close to as possible.
def display(display_list):
plt.figure(figsize=(15, 15))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
for image, mask in image_ds.take(1):
sample_image, sample_mask = image, mask
display([sample_image, sample_mask])
for image, mask in processed_image_ds.take(1):
sample_image, sample_mask = image, mask
display([sample_image, sample_mask])
train_dataset = processed_image_ds.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
model_history =, epochs=EPOCHS)
def create_mask(pred_mask):
pred_mask = tf.argmax(pred_mask, axis=-1)
pred_mask = pred_mask[..., tf.newaxis]
return pred_mask[0]
def show_predictions(dataset=None, num=1):
Displays the first image of each of the num batches
if dataset:
for image, mask in dataset.take(num):
pred_mask = unet.predict(image)
display([image[0], mask[0], create_mask(pred_mask)])
display([sample_image, sample_mask,
create_mask(unet.predict(sample_image[tf.newaxis, ...]))])
show_predictions(train_dataset, 6)
