Snippets Collections
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import matplotlib.pyplot as plt
import torch
from torch.autograd import Variable
from torch.autograd import Function
from torchvision import models
from torchvision import utils
import cv2
import sys
import numpy as np
import argparse

class FeatureExtractor():
    """ Class for extracting activations and 
    registering gradients from targetted intermediate layers """
    def __init__(self, model, target_layers):
        self.model = model
        self.target_layers = target_layers
        self.gradients = []

    def save_gradient(self, grad):
        self.gradients.append(grad)

    def __call__(self, x):
        outputs = []
        self.gradients = []
        for name, module in self.model._modules.items():
            x = module(x)
            if name in self.target_layers:
                x.register_hook(self.save_gradient)
                outputs += [x]
        return outputs, x

class ModelOutputs():
    """ Class for making a forward pass, and getting:
    1. The network output.
    2. Activations from intermeddiate targetted layers.
    3. Gradients from intermeddiate targetted layers. """
    def __init__(self, model, target_layers):
        self.model = model
        self.feature_extractor = FeatureExtractor(self.model.features, target_layers)

    def get_gradients(self):
        return self.feature_extractor.gradients

    def __call__(self, x):
        target_activations, output  = self.feature_extractor(x)
        output = output.view(output.size(0), -1)
        output = self.model.classifier(output)
        return target_activations, output

def preprocess_image(img):
    means=[0.485, 0.456, 0.406]
    stds=[0.229, 0.224, 0.225]

    preprocessed_img = img.copy()[: , :, ::-1]
    for i in range(3):
        preprocessed_img[:, :, i] = preprocessed_img[:, :, i] - means[i]
        preprocessed_img[:, :, i] = preprocessed_img[:, :, i] / stds[i]
    preprocessed_img = \
        np.ascontiguousarray(np.transpose(preprocessed_img, (2, 0, 1)))
    preprocessed_img = torch.from_numpy(preprocessed_img)
    preprocessed_img.unsqueeze_(0)
    input = Variable(preprocessed_img, requires_grad = True)
    return input

def show_cam_on_image(img, mask):
    heatmap = cv2.applyColorMap(np.uint8(255*mask), cv2.COLORMAP_JET)
    heatmap = np.float32(heatmap) / 255
    cam = heatmap + np.float32(img)
    cam = cam / np.max(cam)
    cv2.imwrite("cam.jpg", np.uint8(255 * cam))

class GradCam:
    def __init__(self, model, target_layer_names, use_cuda):
        self.model = model
        self.model.eval()
        self.cuda = use_cuda
        if self.cuda:
            self.model = model.cuda()

        self.extractor = ModelOutputs(self.model, target_layer_names)

    def forward(self, input):
        return self.model(input) 

    def __call__(self, input, index = None):
        if self.cuda:
            features, output = self.extractor(input.cuda())
        else:
            features, output = self.extractor(input)

        if index == None:
            index = np.argmax(output.cpu().data.numpy())

        one_hot = np.zeros((1, output.size()[-1]), dtype = np.float32)
        one_hot[0][index] = 1
        one_hot = Variable(torch.from_numpy(one_hot), requires_grad = True)
        if self.cuda:
            one_hot = torch.sum(one_hot.cuda() * output)
        else:
            one_hot = torch.sum(one_hot * output)

        self.model.features.zero_grad()
        self.model.classifier.zero_grad()
        one_hot.backward(retain_graph=True)

        grads_val = self.extractor.get_gradients()[-1].cpu().data.numpy()

        target = features[-1]
        target = target.cpu().data.numpy()[0, :]

        weights = np.mean(grads_val, axis = (2, 3))[0, :]
        cam = np.zeros(target.shape[1 : ], dtype = np.float32)

        for i, w in enumerate(weights):
            cam += w * target[i, :, :]

        cam = np.maximum(cam, 0)
        cam = cv2.resize(cam, (224, 224))
        cam = cam - np.min(cam)
        cam = cam / np.max(cam)
        return cam

class GuidedBackpropReLU(Function):

    def forward(self, input):
        positive_mask = (input > 0).type_as(input)
        output = torch.addcmul(torch.zeros(input.size()).type_as(input), input, positive_mask)
        self.save_for_backward(input, output)
        return output

    def backward(self, grad_output):
        input, output = self.saved_tensors
        grad_input = None

        positive_mask_1 = (input > 0).type_as(grad_output)
        positive_mask_2 = (grad_output > 0).type_as(grad_output)
        grad_input = torch.addcmul(torch.zeros(input.size()).type_as(input), torch.addcmul(torch.zeros(input.size()).type_as(input), grad_output, positive_mask_1), positive_mask_2)

        return grad_input

class GuidedBackpropReLUModel:
    def __init__(self, model, use_cuda):
        self.model = model
        self.model.eval()
        self.cuda = use_cuda
        if self.cuda:
            self.model = model.cuda()

        # replace ReLU with GuidedBackpropReLU
        for idx, module in self.model.features._modules.items():
            if module.__class__.__name__ == 'ReLU':
                self.model.features._modules[idx] = GuidedBackpropReLU()

    def forward(self, input):
        return self.model(input)

    def __call__(self, input, index = None):
        if self.cuda:
            output = self.forward(input.cuda())
        else:
            output = self.forward(input)

        if index == None:
            index = np.argmax(output.cpu().data.numpy())

        one_hot = np.zeros((1, output.size()[-1]), dtype = np.float32)
        one_hot[0][index] = 1
        one_hot = Variable(torch.from_numpy(one_hot), requires_grad = True)
        if self.cuda:
            one_hot = torch.sum(one_hot.cuda() * output)
        else:
            one_hot = torch.sum(one_hot * output)

        # self.model.features.zero_grad()
        # self.model.classifier.zero_grad()
        one_hot.backward(retain_graph=True)

        output = input.grad.cpu().data.numpy()
        output = output[0,:,:,:]

        return output

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--use-cuda', action='store_true', default=False,
                        help='Use NVIDIA GPU acceleration')
    parser.add_argument('--image-path', type=str, default='./examples/both.png',
                        help='Input image path')
    args = parser.parse_args()
    args.use_cuda = args.use_cuda and torch.cuda.is_available()
    if args.use_cuda:
        print("Using GPU for acceleration")
    else:
        print("Using CPU for computation")

    return args

if __name__ == '__main__':
    """ python grad_cam.py <path_to_image>
    1. Loads an image with opencv.
    2. Preprocesses it for VGG19 and converts to a pytorch variable.
    3. Makes a forward pass to find the category index with the highest score,
    and computes intermediate activations.
    Makes the visualization. """

    args = get_args()

    # Can work with any model, but it assumes that the model has a 
    # feature method, and a classifier method,
    # as in the VGG models in torchvision.
    grad_cam = GradCam(model = models.vgg19(pretrained=True), \
                    target_layer_names = ["35"], use_cuda=args.use_cuda)

    img = cv2.imread(args.image_path, 1)
    img = np.float32(cv2.resize(img, (224, 224))) / 255
    input = preprocess_image(img)

    # If None, returns the map for the highest scoring category.
    # Otherwise, targets the requested index.
    target_index = None

    mask = grad_cam(input, target_index)

    show_cam_on_image(img, mask)

    gb_model = GuidedBackpropReLUModel(model = models.vgg19(pretrained=True), use_cuda=args.use_cuda)
    gb = gb_model(input, index=target_index)
    utils.save_image(torch.from_numpy(gb), 'gb.jpg')

    cam_mask = np.zeros(gb.shape)
    for i in range(0, gb.shape[0]):
        cam_mask[i, :, :] = mask

    cam_gb = np.multiply(cam_mask, gb)
    utils.save_image(torch.from_numpy(cam_gb), 'cam_gb.jpg')
class LeNet5(nn.Module):
  def __init__(self):
    super(LeNet5, self).__init__()
    # define a 2D convolutional layer
    
    self.hidden_1 = torch.nn.Conv2d(1,6,kernel_size=5,stride=1,padding=2)    
    # define a maxpool layer
    self.hidden_2 = nn.MaxPool2d(2, stride=2)
    # new 2D convolutional layer
    self.hidden_3 = torch.nn.Conv2d(6,16,kernel_size=5,stride=1)
    # another maxpool layer
    self.hidden_4 = nn.MaxPool2d(2, stride=2)
    # first linear layer
    self.hidden_5 = nn.Linear(16*5*5,120, bias=True) 
    # second linear layer
    self.hidden_6 = nn.Linear(120,84, bias=True) 
    
    # final output layer
    self.output = nn.Linear(84, 10, bias=False)
    # activation function
    self.activation = nn.ReLU()  
    

  def forward(self, x):
    
    for i in range(1,7):
      if i==5:
        x=x.flatten(start_dim=1)
      # activate pass through the first layer     
      f = getattr(self,f"hidden_{i}")
      x = self.activation(f(x))

    return self.output(x)                                         # return output
  
x = torch.randn((1, 1, 28, 28))
model = LeNet5()
y = model(x)
print(y)
print(model)
from torchsummary import summary
summ = summary(model, torch.Size((1,28,28)))
#data loader
batch_size = 32 # The batch size
num_workers = 0 # Subprocess for loading the data

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, 
                                           num_workers=num_workers, shuffle=True) # shuffle every epoch in training set to avoid training biases

print(train_loader)
print(train_loader.__dict__) # Information held by the dataloader
#random integer generator
torch.randint(low=0, high=len(train_dataset), size=(6,))
import math
def slerp_theta(z1, z2, theta): 
	return math.cos(theta) * z1 + math.sin(theta) * z2
# Imports and Settings
# On Windows, add the directory that contains cl.exe to the PATH to enable PyTorch to find the
# required C++ tools. This code assumes that Visual Studio 2019 is installed in the default
# directory. If you have a different C++ compiler, please add the correct path to os.environ["PATH"]
# directly. Note that the C++ Redistributable is not enough to run this notebook.

# Adding the path to os.environ["LIB"] is not always required - it depends on the system's configuration

import sys

if sys.platform == "win32":
    import distutils.command.build_ext
    import os
    from pathlib import Path

    VS_INSTALL_DIR = r"C:/Program Files (x86)/Microsoft Visual Studio"
    cl_paths = sorted(list(Path(VS_INSTALL_DIR).glob("**/Hostx86/x64/cl.exe")))
    if len(cl_paths) == 0:
        raise ValueError(
            "Cannot find Visual Studio. This notebook requires a C++ compiler. If you installed "
            "a C++ compiler, please add the directory that contains cl.exe to `os.environ['PATH']`."
        )
    else:
        # If multiple versions of MSVC are installed, get the most recent version
        cl_path = cl_paths[-1]
        vs_dir = str(cl_path.parent)
        os.environ["PATH"] += f"{os.pathsep}{vs_dir}"
        # Code for finding the library dirs from
        # https://stackoverflow.com/questions/47423246/get-pythons-lib-path
        d = distutils.core.Distribution()
        b = distutils.command.build_ext.build_ext(d)
        b.finalize_options()
        os.environ["LIB"] = os.pathsep.join(b.library_dirs)
        print(f"Added {vs_dir} to PATH")

import sys
import time
import warnings  # to disable warnings on export to ONNX
import zipfile
from pathlib import Path
import logging

import torch
import nncf  # Important - should be imported directly after torch

import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

from nncf.common.utils.logger import set_log_level
set_log_level(logging.ERROR)  # Disables all NNCF info and warning messages
from nncf import NNCFConfig
from nncf.torch import create_compressed_model, register_default_init_args
from openvino.runtime import Core
from torch.jit import TracerWarning

sys.path.append("../utils")
from notebook_utils import download_file

torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

MODEL_DIR = Path("model")
OUTPUT_DIR = Path("output")
DATA_DIR = Path("data")
BASE_MODEL_NAME = "resnet18"
image_size = 64

OUTPUT_DIR.mkdir(exist_ok=True)
MODEL_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)

# Paths where PyTorch, ONNX and OpenVINO IR models will be stored
fp32_pth_path = Path(MODEL_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".pth")
fp32_onnx_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".onnx")
fp32_ir_path = fp32_onnx_path.with_suffix(".xml")
int8_onnx_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_int8")).with_suffix(".onnx")
int8_ir_path = int8_onnx_path.with_suffix(".xml")

# It's possible to train FP32 model from scratch, but it might be slow. So the pre-trained weights are downloaded by default.
pretrained_on_tiny_imagenet = True
fp32_pth_url = "https://storage.openvinotoolkit.org/repositories/nncf/openvino_notebook_ckpts/302_resnet18_fp32_v1.pth"
download_file(fp32_pth_url, directory=MODEL_DIR, filename=fp32_pth_path.name)

# Download Tiny ImageNet dataset
def download_tiny_imagenet_200(
    data_dir: Path,
    url="http://cs231n.stanford.edu/tiny-imagenet-200.zip",
    tarname="tiny-imagenet-200.zip",
):
    archive_path = data_dir / tarname
    download_file(url, directory=data_dir, filename=tarname)
    zip_ref = zipfile.ZipFile(archive_path, "r")
    zip_ref.extractall(path=data_dir)
    zip_ref.close()

def prepare_tiny_imagenet_200(dataset_dir: Path):
    # format validation set the same way as train set is formatted
    val_data_dir = dataset_dir / 'val'
    val_annotations_file = val_data_dir / 'val_annotations.txt'
    with open(val_annotations_file, 'r') as f:
        val_annotation_data = map(lambda line: line.split('\t')[:2], f.readlines())
    val_images_dir = val_data_dir / 'images'
    for image_filename, image_label in val_annotation_data:
        from_image_filepath = val_images_dir / image_filename
        to_image_dir = val_data_dir / image_label
        if not to_image_dir.exists():
            to_image_dir.mkdir()
        to_image_filepath = to_image_dir / image_filename
        from_image_filepath.rename(to_image_filepath)
    val_annotations_file.unlink()
    val_images_dir.rmdir()
    

DATASET_DIR = DATA_DIR / "tiny-imagenet-200"
if not DATASET_DIR.exists():
    download_tiny_imagenet_200(DATA_DIR)
    prepare_tiny_imagenet_200(DATASET_DIR)
    print(f"Successfully downloaded and prepared dataset at: {DATASET_DIR}")

# Pre-train Floating-Point Model
# Train Function
def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter("Time", ":3.3f")
    losses = AverageMeter("Loss", ":2.3f")
    top1 = AverageMeter("Acc@1", ":2.2f")
    top5 = AverageMeter("Acc@5", ":2.2f")
    progress = ProgressMeter(
        len(train_loader), [batch_time, losses, top1, top5], prefix="Epoch:[{}]".format(epoch)
    )

    # switch to train mode
    model.train()

    end = time.time()
    for i, (images, target) in enumerate(train_loader):
        images = images.to(device)
        target = target.to(device)

        # compute output
        output = model(images)
        loss = criterion(output, target)

        # measure accuracy and record loss
        acc1, acc5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), images.size(0))
        top1.update(acc1[0], images.size(0))
        top5.update(acc5[0], images.size(0))

        # compute gradient and do opt step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        print_frequency = 50
        if i % print_frequency == 0:
            progress.display(i)

# Validate Function
def validate(val_loader, model, criterion):
    batch_time = AverageMeter("Time", ":3.3f")
    losses = AverageMeter("Loss", ":2.3f")
    top1 = AverageMeter("Acc@1", ":2.2f")
    top5 = AverageMeter("Acc@5", ":2.2f")
    progress = ProgressMeter(len(val_loader), [batch_time, losses, top1, top5], prefix="Test: ")

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (images, target) in enumerate(val_loader):
            images = images.to(device)
            target = target.to(device)

            # compute output
            output = model(images)
            loss = criterion(output, target)

            # measure accuracy and record loss
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), images.size(0))
            top1.update(acc1[0], images.size(0))
            top5.update(acc5[0], images.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            print_frequency = 10
            if i % print_frequency == 0:
                progress.display(i)

        print(" * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}".format(top1=top1, top5=top5))
    return top1.avg

# Helpers
class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self, name, fmt=":f"):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print("\t".join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = "{:" + str(num_digits) + "d}"
        return "[" + fmt + "/" + fmt.format(num_batches) + "]"


def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

# Get a Pre-trained FP32 Model
num_classes = 200  # 200 is for Tiny ImageNet, default is 1000 for ImageNet
init_lr = 1e-4
batch_size = 128
epochs = 4

model = models.resnet18(pretrained=not pretrained_on_tiny_imagenet)
# update the last FC layer for Tiny ImageNet number of classes
model.fc = nn.Linear(in_features=512, out_features=num_classes, bias=True)
model.to(device)

# Data loading code
train_dir = DATASET_DIR / "train"
val_dir = DATASET_DIR / "val"
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

train_dataset = datasets.ImageFolder(
    train_dir,
    transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ]
    ),
)
val_dataset = datasets.ImageFolder(
    val_dir,
    transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.ToTensor(),
            normalize,
        ]
    ),
)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, sampler=None
)

val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True
)

# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=init_lr)

if pretrained_on_tiny_imagenet:
    #
    # ** WARNING: torch.load functionality uses Python's pickling module that
    # may be used to perform arbitrary code execution during unpickling. Only load data that you
    # trust.
    #
    checkpoint = torch.load(str(fp32_pth_path), map_location="cpu")
    model.load_state_dict(checkpoint["state_dict"], strict=True)
    acc1_fp32 = checkpoint["acc1"]
else:
    best_acc1 = 0
    # Training loop
    for epoch in range(0, epochs):
        # run a single training epoch
        train(train_loader, model, criterion, optimizer, epoch)

        # evaluate on validation set
        acc1 = validate(val_loader, model, criterion)

        is_best = acc1 > best_acc1
        best_acc1 = max(acc1, best_acc1)

        if is_best:
            checkpoint = {"state_dict": model.state_dict(), "acc1": acc1}
            torch.save(checkpoint, fp32_pth_path)
    acc1_fp32 = best_acc1
    
print(f"Accuracy of FP32 model: {acc1_fp32:.3f}")

dummy_input = torch.randn(1, 3, image_size, image_size).to(device)

torch.onnx.export(model, dummy_input, fp32_onnx_path)
print(f"FP32 ONNX model was exported to {fp32_onnx_path}.")

# Create and Initialize Quantization
nncf_config_dict = {
    "input_info": {"sample_size": [1, 3, image_size, image_size]},
    "log_dir": str(OUTPUT_DIR),  # log directory for NNCF-specific logging outputs
    "compression": {
        "algorithm": "quantization",  # specify the algorithm here
    },
}
nncf_config = NNCFConfig.from_dict(nncf_config_dict)

nncf_config = register_default_init_args(nncf_config, train_loader)
compression_ctrl, model = create_compressed_model(model, nncf_config)
acc1 = validate(val_loader, model, criterion)
print(f"Accuracy of initialized INT8 model: {acc1:.3f}")

# Fine-tune the Compressed Model
compression_lr = init_lr / 10
optimizer = torch.optim.Adam(model.parameters(), lr=compression_lr)

# train for one epoch with NNCF
train(train_loader, model, criterion, optimizer, epoch=0)

# evaluate on validation set after Quantization-Aware Training (QAT case)
acc1_int8 = validate(val_loader, model, criterion)

print(f"Accuracy of tuned INT8 model: {acc1_int8:.3f}")
print(f"Accuracy drop of tuned INT8 model over pre-trained FP32 model: {acc1_fp32 - acc1_int8:.3f}")

# Export INT8 Model to ONNX
if not int8_onnx_path.exists():
    warnings.filterwarnings("ignore", category=TracerWarning)
    warnings.filterwarnings("ignore", category=UserWarning)
    # Export INT8 model to ONNX that is supported by the OpenVINO™ toolkit
    compression_ctrl.export_model(int8_onnx_path)
    print(f"INT8 ONNX model exported to {int8_onnx_path}.")

# Convert ONNX models to OpenVINO Intermediate Representation (IR)
if not fp32_ir_path.exists():
    !mo --input_model $fp32_onnx_path --input_shape "[1,3, $image_size, $image_size]" --mean_values "[123.675, 116.28 , 103.53]" --scale_values "[58.395, 57.12 , 57.375]" --data_type FP16 --output_dir $OUTPUT_DIR

if not int8_ir_path.exists():
    !mo --input_model $int8_onnx_path --input_shape "[1,3, $image_size, $image_size]" --mean_values "[123.675, 116.28 , 103.53]" --scale_values "[58.395, 57.12 , 57.375]" --data_type FP16 --output_dir $OUTPUT_DIR

# Benchmark Model Performance by Computing Inference Time
def parse_benchmark_output(benchmark_output):
    parsed_output = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith("  ") or line == "")]
    print(*parsed_output, sep='\n')


print('Benchmark FP32 model (IR)')
benchmark_output = ! benchmark_app -m $fp32_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)

print('Benchmark INT8 model (IR)')
benchmark_output = ! benchmark_app -m $int8_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)

# Show CPU Information for reference
ie = Core()
ie.get_property(device_name="CPU", name="FULL_DEVICE_NAME")
# Imports
import sys
import time
from pathlib import Path

import cv2
import numpy as np
import torch
from IPython.display import Markdown, display
from fastseg import MobileV3Large
from openvino.runtime import Core

sys.path.append("../utils")
from notebook_utils import CityScapesSegmentation, segmentation_map_to_image, viz_result_image

# Settings
IMAGE_WIDTH = 1024  # Suggested values: 2048, 1024 or 512. The minimum width is 512.
# Set IMAGE_HEIGHT manually for custom input sizes. Minimum height is 512
IMAGE_HEIGHT = 1024 if IMAGE_WIDTH == 2048 else 512
DIRECTORY_NAME = "model"
BASE_MODEL_NAME = DIRECTORY_NAME + f"/fastseg{IMAGE_WIDTH}"

# Paths where PyTorch, ONNX and OpenVINO IR models will be stored
model_path = Path(BASE_MODEL_NAME).with_suffix(".pth")
onnx_path = model_path.with_suffix(".onnx")
ir_path = model_path.with_suffix(".xml")

# Download the Fastseg Model
print("Downloading the Fastseg model (if it has not been downloaded before)....")
model = MobileV3Large.from_pretrained().cpu().eval()
print("Loaded PyTorch Fastseg model")

# Save the model
model_path.parent.mkdir(exist_ok=True)
torch.save(model.state_dict(), str(model_path))
print(f"Model saved at {model_path}")

# Convert PyTorch model to ONNX
if not onnx_path.exists():
    dummy_input = torch.randn(1, 3, IMAGE_HEIGHT, IMAGE_WIDTH)

    # For the Fastseg model, setting do_constant_folding to False is required
    # for PyTorch>1.5.1
    torch.onnx.export(
        model,
        dummy_input,
        onnx_path,
        opset_version=11,
        do_constant_folding=False,
    )
    print(f"ONNX model exported to {onnx_path}.")
else:
    print(f"ONNX model {onnx_path} already exists.")

# Convert ONNX Model to OpenVINO IR Format
# Construct the command for Model Optimizer
mo_command = f"""mo
                 --input_model "{onnx_path}"
                 --input_shape "[1,3, {IMAGE_HEIGHT}, {IMAGE_WIDTH}]"
                 --mean_values="[123.675, 116.28 , 103.53]"
                 --scale_values="[58.395, 57.12 , 57.375]"
                 --data_type FP16
                 --output_dir "{model_path.parent}"
                 """
mo_command = " ".join(mo_command.split())
print("Model Optimizer command to convert the ONNX model to OpenVINO:")
display(Markdown(f"`{mo_command}`"))

if not ir_path.exists():
    print("Exporting ONNX model to IR... This may take a few minutes.")
    mo_result = %sx $mo_command
    print("\n".join(mo_result))
else:
    print(f"IR model {ir_path} already exists.")

# Show results: Load and Preprocess an Input Image
def normalize(image: np.ndarray) -> np.ndarray:
    """
    Normalize the image to the given mean and standard deviation
    for CityScapes models.
    """
    image = image.astype(np.float32)
    mean = (0.485, 0.456, 0.406)
    std = (0.229, 0.224, 0.225)
    image /= 255.0
    image -= mean
    image /= std
    return image

image_filename = "data/street.jpg"
image = cv2.cvtColor(cv2.imread(image_filename), cv2.COLOR_BGR2RGB)

resized_image = cv2.resize(image, (IMAGE_WIDTH, IMAGE_HEIGHT))
normalized_image = normalize(resized_image)

# Convert the resized images to network input shape
input_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)
normalized_input_image = np.expand_dims(np.transpose(normalized_image, (2, 0, 1)), 0)

# ONNX Model in Inference Engine
# Load network to Inference Engine
ie = Core()
model_onnx = ie.read_model(model=onnx_path)
compiled_model_onnx = ie.compile_model(model=model_onnx, device_name="CPU")

output_layer_onnx = compiled_model_onnx.output(0)

# Run inference on the input image
res_onnx = compiled_model_onnx([normalized_input_image])[output_layer_onnx]

# Convert network result to segmentation map and display the result
result_mask_onnx = np.squeeze(np.argmax(res_onnx, axis=1)).astype(np.uint8)
viz_result_image(
    image,
    segmentation_map_to_image(result_mask_onnx, CityScapesSegmentation.get_colormap()),
    resize=True,
)

# IR Model in Inference Engine
# Load the network in Inference Engine
ie = Core()
model_ir = ie.read_model(model=ir_path)
compiled_model_ir = ie.compile_model(model=model_ir, device_name="CPU")

# Get input and output layers
output_layer_ir = compiled_model_ir.output(0)

# Run inference on the input image
res_ir = compiled_model_ir([input_image])[output_layer_ir]

result_mask_ir = np.squeeze(np.argmax(res_ir, axis=1)).astype(np.uint8)
viz_result_image(
    image,
    segmentation_map_to_image(result=result_mask_ir, colormap=CityScapesSegmentation.get_colormap()),
    resize=True,
)

# PyTorch Comparison
with torch.no_grad():
    result_torch = model(torch.as_tensor(normalized_input_image).float())

result_mask_torch = torch.argmax(result_torch, dim=1).squeeze(0).numpy().astype(np.uint8)
viz_result_image(
    image,
    segmentation_map_to_image(result=result_mask_torch, colormap=CityScapesSegmentation.get_colormap()),
    resize=True,
)

# Performance Comparison
num_images = 20

start = time.perf_counter()
for _ in range(num_images):
    compiled_model_onnx([normalized_input_image])
end = time.perf_counter()
time_onnx = end - start
print(
    f"ONNX model in Inference Engine/CPU: {time_onnx/num_images:.3f} "
    f"seconds per image, FPS: {num_images/time_onnx:.2f}"
)

start = time.perf_counter()
for _ in range(num_images):
    compiled_model_ir([input_image])
end = time.perf_counter()
time_ir = end - start
print(
    f"IR model in Inference Engine/CPU: {time_ir/num_images:.3f} "
    f"seconds per image, FPS: {num_images/time_ir:.2f}"
)

with torch.no_grad():
    start = time.perf_counter()
    for _ in range(num_images):
        model(torch.as_tensor(input_image).float())
    end = time.perf_counter()
    time_torch = end - start
print(
    f"PyTorch model on CPU: {time_torch/num_images:.3f} seconds per image, "
    f"FPS: {num_images/time_torch:.2f}"
)

if "GPU" in ie.available_devices:
    compiled_model_onnx_gpu = ie.compile_model(model=model_onnx, device_name="GPU")
    start = time.perf_counter()
    for _ in range(num_images):
        compiled_model_onnx_gpu([input_image])
    end = time.perf_counter()
    time_onnx_gpu = end - start
    print(
        f"ONNX model in Inference Engine/GPU: {time_onnx_gpu/num_images:.3f} "
        f"seconds per image, FPS: {num_images/time_onnx_gpu:.2f}"
    )

    compiled_model_ir_gpu = ie.compile_model(model=model_ir, device_name="GPU")
    start = time.perf_counter()
    for _ in range(num_images):
        compiled_model_ir_gpu([input_image])
    end = time.perf_counter()
    time_ir_gpu = end - start
    print(
        f"IR model in Inference Engine/GPU: {time_ir_gpu/num_images:.3f} "
        f"seconds per image, FPS: {num_images/time_ir_gpu:.2f}"
    )

# Show Device Information
devices = ie.available_devices
for device in devices:
    device_name = ie.get_property(device_name=device, name="FULL_DEVICE_NAME")
    print(f"{device}: {device_name}")
class LabelSmoothingLoss(torch.nn.Module):
    def __init__(self, smoothing: float = 0.1, 
                 reduction="mean", weight=None):
        super(LabelSmoothingLoss, self).__init__()
        self.smoothing   = smoothing
        self.reduction = reduction
        self.weight    = weight

    def reduce_loss(self, loss):
        return loss.mean() if self.reduction == 'mean' else loss.sum() \
         if self.reduction == 'sum' else loss

    def linear_combination(self, x, y):
        return self.smoothing * x + (1 - self.smoothing) * y

    def forward(self, preds, target):
        assert 0 <= self.smoothing < 1

        if self.weight is not None:
            self.weight = self.weight.to(preds.device)

        n = preds.size(-1)
        log_preds = F.log_softmax(preds, dim=-1)
        loss = self.reduce_loss(-log_preds.sum(dim=-1))
        nll = F.nll_loss(
            log_preds, target, reduction=self.reduction, weight=self.weight
        )
        return self.linear_combination(loss / n, nll)
import torch.nn as nn
from torch.nn.utils import spectral_norm

def conv2d(*args, **kwargs):
    return spectral_norm(nn.Conv2d(*args, **kwargs))

class SeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, bias=False):
        super(SeparableConv2d, self).__init__()
        self.depthwise = conv2d(in_channels, in_channels, kernel_size=kernel_size,
            groups=in_channels, bias=bias, padding=1)
        self.pointwise = conv2d(in_channels, out_channels,
            kernel_size=1, bias=bias)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out
import torch
import torch.nn as nn
import torch.nn.functional as F
class STN(nn.Module):
    def __init__(self):
        super(STN, self).__init__()
        # simple convnet classifier
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        # spatial transformer localization network
        self.localization = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )
        # tranformation regressor for theta
        self.fc_loc = nn.Sequential(
            nn.Linear(128*4*4, 256),
            nn.ReLU(True),
            nn.Linear(256, 3 * 2)
        )
        # initializing the weights and biases with identity transformations
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], 
                                                    dtype=torch.float))
    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, xs.size(1)*xs.size(2)*xs.size(3))
        # calculate the transformation parameters theta
        theta = self.fc_loc(xs)
        # resize theta
        theta = theta.view(-1, 2, 3) 
        # grid generator => transformation on parameters theta
        grid = F.affine_grid(theta, x.size())
        # grid sampling => applying the spatial transformations
        x = F.grid_sample(x, grid)
        return x
    def forward(self, x):
        # transform the input
        x = self.stn(x)
        
        # forward pass through the classifier 
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16*5*5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)
images, boxes = torch.rand(2, 3, 400, 400).to(device), torch.rand(2, 11, 4).to(device)
boxes[:, :, 2:4] = boxes[:, :, 0:2] + boxes[:, :, 2:4]
labels = torch.randint(0, 2, (4, 11)).to(device)
images = list(image for image in images)
targets = []
for i in range(len(images)):
    d = {}
    d['boxes'] = boxes[i]
    d['labels'] = labels[i]
    targets.append(d)

model = model.to(device)
output = model(images, targets)
# Load entire dataset
X, y = torch.load('some_training_set_with_labels.pt')
 
# Train model
for epoch in range(max_epochs):
    for i in range(n_batches):
        # Local batches and labels
        local_X, local_y = X[i*n_batches:(i+1)*n_batches,], y[i*n_batches:(i+1)*n_batches,]
 
        # Your model
        [...]
         
         
# other
# Unoptimized generator
training_generator = SomeSingleCoreGenerator('some_training_set_with_labels.pt')
 
# Train model
for epoch in range(max_epochs):
    for local_X, local_y in training_generator:
        # Your model
        [...]
# Unoptimized generator
training_generator = SomeSingleCoreGenerator('some_training_set_with_labels.pt')

# Train model
for epoch in range(max_epochs):
    for local_X, local_y in training_generator:
        # Your model
        [...]
# Load entire dataset
X, y = torch.load('some_training_set_with_labels.pt')

# Train model
for epoch in range(max_epochs):
    for i in range(n_batches):
        # Local batches and labels
        local_X, local_y = X[i*n_batches:(i+1)*n_batches,], y[i*n_batches:(i+1)*n_batches,]

        # Your model
        [...]
star

Thu Jun 01 2023 09:50:15 GMT+0000 (Coordinated Universal Time)

#python #pytorch
star

Thu Apr 27 2023 15:17:18 GMT+0000 (Coordinated Universal Time) https://www.zhihu.com/question/274926848

#python #pytorch
star

Mon Dec 05 2022 00:04:35 GMT+0000 (Coordinated Universal Time)

#pytorch #cnn #cleverhacks
star

Tue Nov 29 2022 23:22:32 GMT+0000 (Coordinated Universal Time)

#pytorch #summary #model
star

Tue Nov 29 2022 19:56:05 GMT+0000 (Coordinated Universal Time)

#pytorch #dataloader
star

Tue Nov 29 2022 19:42:17 GMT+0000 (Coordinated Universal Time)

#pytorch #general
star

Tue Nov 29 2022 19:30:29 GMT+0000 (Coordinated Universal Time) https://pytorch.org/vision/0.9/transforms.html

#pytorch #transform
star

Wed Aug 31 2022 09:24:17 GMT+0000 (Coordinated Universal Time)

#python #pytorch
star

Sat Jun 18 2022 20:53:15 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/302-pytorch-quantization-aware-training/302-pytorch-quantization-aware-training.ipynb

#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #quantization #nncf #optimization #pytorch
star

Thu Jun 09 2022 17:04:05 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/102-pytorch-onnx-to-openvino/102-pytorch-onnx-to-openvino.ipynb

#python #openvino #openvino-notebook #pytorch #onnx
star

Mon May 16 2022 22:45:07 GMT+0000 (Coordinated Universal Time)

#python #pytorch
star

Tue Mar 15 2022 10:29:57 GMT+0000 (Coordinated Universal Time) https://github.com/last-one/Pytorch_Realtime_Multi-Person_Pose_Estimation/blob/master/caffe2pytorch/convert.py

#python #pytorch
star

Sat Mar 12 2022 16:35:05 GMT+0000 (Coordinated Universal Time) https://github.com/autonomousvision/projected_gan/blob/main/pg_modules/diffaug.py

#python #pytorch
star

Sat Mar 12 2022 16:29:53 GMT+0000 (Coordinated Universal Time) https://github.com/autonomousvision/projected_gan/blob/main/pg_modules/blocks.py

#python #pytorch
star

Fri Mar 11 2022 17:01:38 GMT+0000 (Coordinated Universal Time) https://github.com/d2l-ai/d2l-en/blob/master/d2l/torch.py

#python #pytorch
star

Thu Mar 10 2022 13:01:44 GMT+0000 (Coordinated Universal Time) https://debuggercafe.com/reducing-image-distortion-using-spatial-transformer-network/

#python #pytorch
star

Sat Mar 05 2022 14:42:45 GMT+0000 (Coordinated Universal Time) https://github.com/steveli/pytorch-sqrtm/blob/master/sqrtm.py

#python #pytorch
star

Thu Jan 27 2022 13:13:37 GMT+0000 (Coordinated Universal Time) https://www.kaggle.com/mrinath/hacking-fasterrcnn/notebook

#python #pytorch
star

Fri Jul 02 2021 13:44:34 GMT+0000 (Coordinated Universal Time) https://stanford.edu/~shervine/blog/pytorch-how-to-generate-data-parallel

#gnn #pytorch #loaddata #trainset
star

Fri Jul 02 2021 13:43:59 GMT+0000 (Coordinated Universal Time) https://stanford.edu/~shervine/blog/pytorch-how-to-generate-data-parallel

#gnn #pytorch #trainset #loaddata

Save snippets that work with our extensions

Available in the Chrome Web Store Get Firefox Add-on Get VS Code extension