# Imports and Settings
from pathlib import Path
import logging
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.python.keras import layers
from tensorflow.python.keras import models
from nncf import NNCFConfig
from nncf.tensorflow.helpers.model_creation import create_compressed_model
from nncf.tensorflow.initialization import register_default_init_args
from nncf.common.utils.logger import set_log_level
set_log_level(logging.ERROR)
MODEL_DIR = Path("model")
OUTPUT_DIR = Path("output")
MODEL_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
BASE_MODEL_NAME = "ResNet-18"
fp32_h5_path = Path(MODEL_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".h5")
fp32_sm_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_fp32"))
fp32_ir_path = Path(OUTPUT_DIR / "saved_model").with_suffix(".xml")
int8_pb_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_int8")).with_suffix(".pb")
int8_pb_name = Path(BASE_MODEL_NAME + "_int8").with_suffix(".pb")
int8_ir_path = int8_pb_path.with_suffix(".xml")
BATCH_SIZE = 128
IMG_SIZE = (64, 64) # Default Imagenet image size
NUM_CLASSES = 10 # For Imagenette dataset
LR = 1e-5
MEAN_RGB = (0.485 * 255, 0.456 * 255, 0.406 * 255) # From Imagenet dataset
STDDEV_RGB = (0.229 * 255, 0.224 * 255, 0.225 * 255) # From Imagenet dataset
fp32_pth_url = "https://storage.openvinotoolkit.org/repositories/nncf/openvino_notebook_ckpts/305_resnet18_imagenette_fp32_v1.h5"
_ = tf.keras.utils.get_file(fp32_h5_path.resolve(), fp32_pth_url)
print(f'Absolute path where the model weights are saved:\n {fp32_h5_path.resolve()}')
# Dataset Preprocessing
datasets, datasets_info = tfds.load('imagenette/160px', shuffle_files=True, as_supervised=True, with_info=True,
read_config=tfds.ReadConfig(shuffle_seed=0))
train_dataset, validation_dataset = datasets['train'], datasets['validation']
fig = tfds.show_examples(train_dataset, datasets_info)
def preprocessing(image, label):
image = tf.image.resize(image, IMG_SIZE)
image = image - MEAN_RGB
image = image / STDDEV_RGB
label = tf.one_hot(label, NUM_CLASSES)
return image, label
train_dataset = (train_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(tf.data.experimental.AUTOTUNE))
validation_dataset = (validation_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(tf.data.experimental.AUTOTUNE))
# Define a Floating-Point Model
def residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre'):
def layer(input_tensor):
x = layers.BatchNormalization(epsilon=2e-5)(input_tensor)
x = layers.Activation('relu')(x)
# defining shortcut connection
if cut == 'pre':
shortcut = input_tensor
elif cut == 'post':
shortcut = layers.Conv2D(filters, (1, 1), strides=strides, kernel_initializer='he_uniform',
use_bias=False)(x)
# continue with convolution layers
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.Conv2D(filters, (3, 3), strides=strides, kernel_initializer='he_uniform', use_bias=False)(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.Conv2D(filters, (3, 3), kernel_initializer='he_uniform', use_bias=False)(x)
# add residual connection
x = layers.Add()([x, shortcut])
return x
return layer
def ResNet18(input_shape=None):
"""Instantiates the ResNet18 architecture."""
img_input = layers.Input(shape=input_shape, name='data')
# ResNet18 bottom
x = layers.BatchNormalization(epsilon=2e-5, scale=False)(img_input)
x = layers.ZeroPadding2D(padding=(3, 3))(x)
x = layers.Conv2D(64, (7, 7), strides=(2, 2), kernel_initializer='he_uniform', use_bias=False)(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='valid')(x)
# ResNet18 body
repetitions = (2, 2, 2, 2)
for stage, rep in enumerate(repetitions):
for block in range(rep):
filters = 64 * (2 ** stage)
if block == 0 and stage == 0:
x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='post')(x)
elif block == 0:
x = residual_conv_block(filters, stage, block, strides=(2, 2), cut='post')(x)
else:
x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre')(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
# ResNet18 top
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(NUM_CLASSES)(x)
x = layers.Activation('softmax')(x)
# Create model
model = models.Model(img_input, x)
return model
IMG_SHAPE = IMG_SIZE + (3,)
model = ResNet18(input_shape=IMG_SHAPE)
# Pre-train Floating-Point Model
# Load the floating-point weights
model.load_weights(fp32_h5_path)
# Compile the floating-point model
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')])
# Validate the floating-point model
test_loss, acc_fp32 = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of FP32 model: {acc_fp32:.3f}")
model.save(fp32_sm_path)
print(f'Absolute path where the model is saved:\n {fp32_sm_path.resolve()}')
# Create and Initialize Quantization
nncf_config_dict = {
"input_info": {"sample_size": [1, 3] + list(IMG_SIZE)},
"log_dir": str(OUTPUT_DIR), # log directory for NNCF-specific logging outputs
"compression": {
"algorithm": "quantization", # specify the algorithm here
},
}
nncf_config = NNCFConfig.from_dict(nncf_config_dict)
nncf_config = register_default_init_args(nncf_config=nncf_config,
data_loader=train_dataset,
batch_size=BATCH_SIZE)
compression_ctrl, model = create_compressed_model(model, nncf_config)
# Compile the int8 model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=LR),
loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')])
# Validate the int8 model
test_loss, test_acc = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of INT8 model after initialization: {test_acc:.3f}")
# Fine-tune the Compressed Model
# Train the int8 model
model.fit(train_dataset,
epochs=2)
# Validate the int8 model
test_loss, acc_int8 = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of INT8 model after fine-tuning: {acc_int8:.3f}")
print(f"\nAccuracy drop of tuned INT8 model over pre-trained FP32 model: {acc_fp32 - acc_int8:.3f}")
compression_ctrl.export_model(int8_pb_path, 'frozen_graph')
print(f'Absolute path where the int8 model is saved:\n {int8_pb_path.resolve()}')
# Export Frozen Graph Models to OpenVINO Intermediate Representation (IR)
!mo --framework=tf --input_shape=[1,64,64,3] --input=data --saved_model_dir=$fp32_sm_path --output_dir=$OUTPUT_DIR
!mo --framework=tf --input_shape=[1,64,64,3] --input=Placeholder --input_model=$int8_pb_path --output_dir=$OUTPUT_DIR
# Benchmark Model Performance by Computing Inference Time
def parse_benchmark_output(benchmark_output):
parsed_output = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line == "")]
print(*parsed_output, sep='\n')
print('Benchmark FP32 model (IR)')
benchmark_output = ! benchmark_app -m $fp32_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
print('\nBenchmark INT8 model (IR)')
benchmark_output = ! benchmark_app -m $int8_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
# Show CPU Information for reference
from openvino.runtime import Core
ie = Core()
ie.get_property(device_name='CPU', name="FULL_DEVICE_NAME")