# Imports and Settings from pathlib import Path import logging import tensorflow as tf import tensorflow_datasets as tfds from tensorflow.python.keras import layers from tensorflow.python.keras import models from nncf import NNCFConfig from nncf.tensorflow.helpers.model_creation import create_compressed_model from nncf.tensorflow.initialization import register_default_init_args from nncf.common.utils.logger import set_log_level set_log_level(logging.ERROR) MODEL_DIR = Path("model") OUTPUT_DIR = Path("output") MODEL_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True) BASE_MODEL_NAME = "ResNet-18" fp32_h5_path = Path(MODEL_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".h5") fp32_sm_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_fp32")) fp32_ir_path = Path(OUTPUT_DIR / "saved_model").with_suffix(".xml") int8_pb_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_int8")).with_suffix(".pb") int8_pb_name = Path(BASE_MODEL_NAME + "_int8").with_suffix(".pb") int8_ir_path = int8_pb_path.with_suffix(".xml") BATCH_SIZE = 128 IMG_SIZE = (64, 64) # Default Imagenet image size NUM_CLASSES = 10 # For Imagenette dataset LR = 1e-5 MEAN_RGB = (0.485 * 255, 0.456 * 255, 0.406 * 255) # From Imagenet dataset STDDEV_RGB = (0.229 * 255, 0.224 * 255, 0.225 * 255) # From Imagenet dataset fp32_pth_url = "https://storage.openvinotoolkit.org/repositories/nncf/openvino_notebook_ckpts/305_resnet18_imagenette_fp32_v1.h5" _ = tf.keras.utils.get_file(fp32_h5_path.resolve(), fp32_pth_url) print(f'Absolute path where the model weights are saved:\n {fp32_h5_path.resolve()}') # Dataset Preprocessing datasets, datasets_info = tfds.load('imagenette/160px', shuffle_files=True, as_supervised=True, with_info=True, read_config=tfds.ReadConfig(shuffle_seed=0)) train_dataset, validation_dataset = datasets['train'], datasets['validation'] fig = tfds.show_examples(train_dataset, datasets_info) def preprocessing(image, label): image = tf.image.resize(image, IMG_SIZE) image = image - MEAN_RGB image = image / STDDEV_RGB label = tf.one_hot(label, NUM_CLASSES) return image, label train_dataset = (train_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE) .batch(BATCH_SIZE) .prefetch(tf.data.experimental.AUTOTUNE)) validation_dataset = (validation_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE) .batch(BATCH_SIZE) .prefetch(tf.data.experimental.AUTOTUNE)) # Define a Floating-Point Model def residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre'): def layer(input_tensor): x = layers.BatchNormalization(epsilon=2e-5)(input_tensor) x = layers.Activation('relu')(x) # defining shortcut connection if cut == 'pre': shortcut = input_tensor elif cut == 'post': shortcut = layers.Conv2D(filters, (1, 1), strides=strides, kernel_initializer='he_uniform', use_bias=False)(x) # continue with convolution layers x = layers.ZeroPadding2D(padding=(1, 1))(x) x = layers.Conv2D(filters, (3, 3), strides=strides, kernel_initializer='he_uniform', use_bias=False)(x) x = layers.BatchNormalization(epsilon=2e-5)(x) x = layers.Activation('relu')(x) x = layers.ZeroPadding2D(padding=(1, 1))(x) x = layers.Conv2D(filters, (3, 3), kernel_initializer='he_uniform', use_bias=False)(x) # add residual connection x = layers.Add()([x, shortcut]) return x return layer def ResNet18(input_shape=None): """Instantiates the ResNet18 architecture.""" img_input = layers.Input(shape=input_shape, name='data') # ResNet18 bottom x = layers.BatchNormalization(epsilon=2e-5, scale=False)(img_input) x = layers.ZeroPadding2D(padding=(3, 3))(x) x = layers.Conv2D(64, (7, 7), strides=(2, 2), kernel_initializer='he_uniform', use_bias=False)(x) x = layers.BatchNormalization(epsilon=2e-5)(x) x = layers.Activation('relu')(x) x = layers.ZeroPadding2D(padding=(1, 1))(x) x = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='valid')(x) # ResNet18 body repetitions = (2, 2, 2, 2) for stage, rep in enumerate(repetitions): for block in range(rep): filters = 64 * (2 ** stage) if block == 0 and stage == 0: x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='post')(x) elif block == 0: x = residual_conv_block(filters, stage, block, strides=(2, 2), cut='post')(x) else: x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre')(x) x = layers.BatchNormalization(epsilon=2e-5)(x) x = layers.Activation('relu')(x) # ResNet18 top x = layers.GlobalAveragePooling2D()(x) x = layers.Dense(NUM_CLASSES)(x) x = layers.Activation('softmax')(x) # Create model model = models.Model(img_input, x) return model IMG_SHAPE = IMG_SIZE + (3,) model = ResNet18(input_shape=IMG_SHAPE) # Pre-train Floating-Point Model # Load the floating-point weights model.load_weights(fp32_h5_path) # Compile the floating-point model model.compile(loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1), metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')]) # Validate the floating-point model test_loss, acc_fp32 = model.evaluate(validation_dataset, callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1'])) print(f"\nAccuracy of FP32 model: {acc_fp32:.3f}") model.save(fp32_sm_path) print(f'Absolute path where the model is saved:\n {fp32_sm_path.resolve()}') # Create and Initialize Quantization nncf_config_dict = { "input_info": {"sample_size": [1, 3] + list(IMG_SIZE)}, "log_dir": str(OUTPUT_DIR), # log directory for NNCF-specific logging outputs "compression": { "algorithm": "quantization", # specify the algorithm here }, } nncf_config = NNCFConfig.from_dict(nncf_config_dict) nncf_config = register_default_init_args(nncf_config=nncf_config, data_loader=train_dataset, batch_size=BATCH_SIZE) compression_ctrl, model = create_compressed_model(model, nncf_config) # Compile the int8 model model.compile(optimizer=tf.keras.optimizers.Adam(lr=LR), loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1), metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')]) # Validate the int8 model test_loss, test_acc = model.evaluate(validation_dataset, callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1'])) print(f"\nAccuracy of INT8 model after initialization: {test_acc:.3f}") # Fine-tune the Compressed Model # Train the int8 model model.fit(train_dataset, epochs=2) # Validate the int8 model test_loss, acc_int8 = model.evaluate(validation_dataset, callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1'])) print(f"\nAccuracy of INT8 model after fine-tuning: {acc_int8:.3f}") print(f"\nAccuracy drop of tuned INT8 model over pre-trained FP32 model: {acc_fp32 - acc_int8:.3f}") compression_ctrl.export_model(int8_pb_path, 'frozen_graph') print(f'Absolute path where the int8 model is saved:\n {int8_pb_path.resolve()}') # Export Frozen Graph Models to OpenVINO Intermediate Representation (IR) !mo --framework=tf --input_shape=[1,64,64,3] --input=data --saved_model_dir=$fp32_sm_path --output_dir=$OUTPUT_DIR !mo --framework=tf --input_shape=[1,64,64,3] --input=Placeholder --input_model=$int8_pb_path --output_dir=$OUTPUT_DIR # Benchmark Model Performance by Computing Inference Time def parse_benchmark_output(benchmark_output): parsed_output = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line == "")] print(*parsed_output, sep='\n') print('Benchmark FP32 model (IR)') benchmark_output = ! benchmark_app -m $fp32_ir_path -d CPU -api async -t 15 parse_benchmark_output(benchmark_output) print('\nBenchmark INT8 model (IR)') benchmark_output = ! benchmark_app -m $int8_ir_path -d CPU -api async -t 15 parse_benchmark_output(benchmark_output) # Show CPU Information for reference from openvino.runtime import Core ie = Core() ie.get_property(device_name='CPU', name="FULL_DEVICE_NAME")