#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #nlp #entity-recognition #bert
204-named-entity-recognition: Named Entity Recognition with OpenVINO# Imports
import time
import json
import numpy as np
import tokens_bert as tokens
from openvino.runtime import Core
from openvino.runtime import Dimension
# Download the model
# directory where model will be downloaded
base_model_dir = "model"
# desired precision
precision = "FP16-INT8"
# model name as named in Open Model Zoo
model_name = "bert-small-uncased-whole-word-masking-squad-int8-0002"
model_path = f"model/intel/{model_name}/{precision}/{model_name}.xml"
model_weights_path = f"model/intel/{model_name}/{precision}/{model_name}.bin"
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir} " \
f"--cache_dir {base_model_dir}"
! $download_command
# Load the model for Entity Extraction with Dynamic Shape
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=model_path, weights=model_weights_path)
# assign dynamic shapes to every input layer on the last dimension
for input_layer in model.inputs:
input_shape = input_layer.partial_shape
input_shape[1] = Dimension(1, 384)
model.reshape({input_layer: input_shape})
# compile the model for the CPU
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input names of nodes
input_keys = list(compiled_model.inputs)
# Processing
# path to vocabulary file
vocab_file_path = "data/vocab.txt"
# create dictionary with words and their indices
vocab = tokens.load_vocab_file(vocab_file_path)
# define special tokens
cls_token = vocab["[CLS]"]
sep_token = vocab["[SEP]"]
# set a confidence score threshold
confidence_threshold = 0.4
# Preprocessing
# generator of a sequence of inputs
def prepare_input(entity_tokens, context_tokens):
input_ids = [cls_token] + entity_tokens + [sep_token] + \
context_tokens + [sep_token]
# 1 for any index
attention_mask = [1] * len(input_ids)
# 0 for entity tokens, 1 for context part
token_type_ids = [0] * (len(entity_tokens) + 2) + \
[1] * (len(context_tokens) + 1)
# create input to feed the model
input_dict = {
"input_ids": np.array([input_ids], dtype=np.int32),
"attention_mask": np.array([attention_mask], dtype=np.int32),
"token_type_ids": np.array([token_type_ids], dtype=np.int32),
}
# some models require additional position_ids
if "position_ids" in [i_key.any_name for i_key in input_keys]:
position_ids = np.arange(len(input_ids))
input_dict["position_ids"] = np.array([position_ids], dtype=np.int32)
return input_dict
# Postprocessing
def postprocess(output_start, output_end, entity_tokens,
context_tokens_start_end, input_size):
def get_score(logits):
out = np.exp(logits)
return out / out.sum(axis=-1)
# get start-end scores for context
score_start = get_score(output_start)
score_end = get_score(output_end)
# index of first context token in tensor
context_start_idx = len(entity_tokens) + 2
# index of last+1 context token in tensor
context_end_idx = input_size - 1
# find product of all start-end combinations to find the best one
max_score, max_start, max_end = find_best_entity_window(
start_score=score_start, end_score=score_end,
context_start_idx=context_start_idx, context_end_idx=context_end_idx
)
# convert to context text start-end index
max_start = context_tokens_start_end[max_start][0]
max_end = context_tokens_start_end[max_end][1]
return max_score, max_start, max_end
def find_best_entity_window(start_score, end_score,
context_start_idx, context_end_idx):
context_len = context_end_idx - context_start_idx
score_mat = np.matmul(
start_score[context_start_idx:context_end_idx].reshape(
(context_len, 1)),
end_score[context_start_idx:context_end_idx].reshape(
(1, context_len)),
)
# reset candidates with end before start
score_mat = np.triu(score_mat)
# reset long candidates (>16 words)
score_mat = np.tril(score_mat, 16)
# find the best start-end pair
max_s, max_e = divmod(score_mat.flatten().argmax(), score_mat.shape[1])
max_score = score_mat[max_s, max_e]
return max_score, max_s, max_e
def get_best_entity(entity, context, vocab):
# convert context string to tokens
context_tokens, context_tokens_end = tokens.text_to_tokens(
text=context.lower(), vocab=vocab)
# convert entity string to tokens
entity_tokens, _ = tokens.text_to_tokens(text=entity.lower(), vocab=vocab)
network_input = prepare_input(entity_tokens, context_tokens)
input_size = len(context_tokens) + len(entity_tokens) + 3
# openvino inference
output_start_key = compiled_model.output("output_s")
output_end_key = compiled_model.output("output_e")
result = compiled_model(network_input)
# postprocess the result getting the score and context range for the answer
score_start_end = postprocess(output_start=result[output_start_key][0],
output_end=result[output_end_key][0],
entity_tokens=entity_tokens,
context_tokens_start_end=context_tokens_end,
input_size=input_size)
# return the part of the context, which is already an answer
return context[score_start_end[1]:score_start_end[2]], score_start_end[0]
# Set the Entity Recognition Template
template = ["building", "company", "persons", "city",
"state", "height", "floor", "address"]
def run_analyze_entities(context):
print(f"Context: {context}\n", flush=True)
if len(context) == 0:
print("Error: Empty context or outside paragraphs")
return
if len(context) > 380:
print("Error: The context is too long for this particular model. "
"Try with context shorter than 380 words.")
return
# measure processing time
start_time = time.perf_counter()
extract = []
for field in template:
entity_to_find = field + "?"
entity, score = get_best_entity(entity=entity_to_find,
context=context,
vocab=vocab)
if score >= confidence_threshold:
extract.append({"Entity": entity, "Type": field,
"Score": f"{score:.2f}"})
end_time = time.perf_counter()
res = {"Extraction": extract, "Time": f"{end_time - start_time:.2f}s"}
print("\nJSON Output:")
print(json.dumps(res, sort_keys=False, indent=4))
# Run on Simple Text
# Sample 1
source_text = "Intel Corporation is an American multinational and technology" \
" company headquartered in Santa Clara, California."
run_analyze_entities(source_text)
# Sample 2
source_text = "Intel was founded in Mountain View, California, " \
"in 1968 by Gordon E. Moore, a chemist, and Robert Noyce, " \
"a physicist and co-inventor of the integrated circuit."
run_analyze_entities(source_text)
# Sample 3
source_text = "The Robert Noyce Building in Santa Clara, California, " \
"is the headquarters for Intel Corporation. It was constructed in 1992 " \
"and is located at 2200 Mission College Boulevard - 95054. It has an " \
"estimated height of 22.20 meters and 6 floors above ground."
run_analyze_entities(source_text)
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #optimization #tensorflow
301-tensorflow-training-openvino: From Training to Deployment with TensorFlow and OpenVINO# Import TensorFlow and Other Libraries
import os
import sys
from pathlib import Path
import PIL
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from PIL import Image
from openvino.runtime import Core
from openvino.tools.mo import mo_tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
sys.path.append("../utils")
from notebook_utils import download_file
# Download and Explore the Dataset
import pathlib
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
data_dir = tf.keras.utils.get_file('flower_photos', origin=dataset_url, untar=True)
data_dir = pathlib.Path(data_dir)
image_count = len(list(data_dir.glob('*/*.jpg')))
print(image_count)
roses = list(data_dir.glob('roses/*'))
PIL.Image.open(str(roses[0]))
PIL.Image.open(str(roses[1]))
tulips = list(data_dir.glob('tulips/*'))
PIL.Image.open(str(tulips[0]))
PIL.Image.open(str(tulips[1]))
# Create a Dataset
batch_size = 32
img_height = 180
img_width = 180
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(img_height, img_width),
batch_size=batch_size)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(img_height, img_width),
batch_size=batch_size)
class_names = train_ds.class_names
print(class_names)
# Visualize the Data
plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
for i in range(9):
ax = plt.subplot(3, 3, i + 1)
plt.imshow(images[i].numpy().astype("uint8"))
plt.title(class_names[labels[i]])
plt.axis("off")
for image_batch, labels_batch in train_ds:
print(image_batch.shape)
print(labels_batch.shape)
break
# Configure the Dataset for Performance
# AUTOTUNE = tf.data.AUTOTUNE
AUTOTUNE = tf.data.experimental.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
# Standardize the Data
normalization_layer = layers.experimental.preprocessing.Rescaling(1./255)
normalized_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_ds))
first_image = image_batch[0]
# Notice the pixels values are now in `[0,1]`.
print(np.min(first_image), np.max(first_image))
# Create the Model
num_classes = 5
model = Sequential([
layers.experimental.preprocessing.Rescaling(1./255, input_shape=(img_height, img_width, 3)),
layers.Conv2D(16, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(32, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, padding='same', activation='relu'),
layers.MaxPooling2D(),
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dense(num_classes)
])
# Compile the Model
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #optimization #tensorflow
301-tensorflow-training-openvino: Post-Training Quantization with TensorFlow Classification Model# Preparation
from pathlib import Path
import tensorflow as tf
model_xml = Path("model/flower/flower_ir.xml")
dataset_url = (
"https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
)
data_dir = Path(tf.keras.utils.get_file("flower_photos", origin=dataset_url, untar=True))
if not model_xml.exists():
print("Executing training notebook. This will take a while...")
%run 301-tensorflow-training-openvino.ipynb
# Imports
import copy
import os
import sys
import cv2
import matplotlib.pyplot as plt
import numpy as np
from addict import Dict
from openvino.tools.pot.api import Metric, DataLoader
from openvino.tools.pot.graph import load_model, save_model
from openvino.tools.pot.graph.model_utils import compress_model_weights
from openvino.tools.pot.engines.ie_engine import IEEngine
from openvino.tools.pot.pipeline.initializer import create_pipeline
from openvino.runtime import Core
from PIL import Image
sys.path.append("../utils")
from notebook_utils import benchmark_model, download_file
# Settings
model_config = Dict(
{
"model_name": "flower",
"model": "model/flower/flower_ir.xml",
"weights": "model/flower/flower_ir.bin",
}
)
engine_config = Dict({"device": "CPU", "stat_requests_number": 2, "eval_requests_number": 2})
algorithms = [
{
"name": "DefaultQuantization",
"params": {
"target_device": "CPU",
"preset": "performance",
"stat_subset_size": 1000,
},
}
]
# Create DataLoader Class
class ClassificationDataLoader(DataLoader):
"""
DataLoader for image data that is stored in a directory per category. For example, for
categories _rose_ and _daisy_, rose images are expected in data_source/rose, daisy images
in data_source/daisy.
"""
def __init__(self, data_source):
"""
:param data_source: path to data directory
"""
self.data_source = Path(data_source)
self.dataset = [p for p in data_dir.glob("**/*") if p.suffix in (".png", ".jpg")]
self.class_names = sorted([item.name for item in Path(data_dir).iterdir() if item.is_dir()])
def __len__(self):
"""
Returns the number of elements in the dataset
"""
return len(self.dataset)
def __getitem__(self, index):
"""
Get item from self.dataset at the specified index.
Returns (annotation, image), where annotation is a tuple (index, class_index)
and image a preprocessed image in network shape
"""
if index >= len(self):
raise IndexError
filepath = self.dataset[index]
annotation = (index, self.class_names.index(filepath.parent.name))
image = self._read_image(filepath)
return annotation, image
def _read_image(self, index):
"""
Read image at dataset[index] to memory, resize, convert to BGR and to network shape
:param index: dataset index to read
:return ndarray representation of image batch
"""
image = cv2.imread(os.path.join(self.data_source, index))[:, :, (2, 1, 0)]
image = cv2.resize(image, (180, 180)).astype(np.float32)
return image
# Create Accuracy Metric Class
class Accuracy(Metric):
def __init__(self):
super().__init__()
self._name = "accuracy"
self._matches = []
@property
def value(self):
"""Returns accuracy metric value for the last model output."""
return {self._name: self._matches[-1]}
@property
def avg_value(self):
"""
Returns accuracy metric value for all model outputs. Results per image are stored in
self._matches, where True means a correct prediction and False a wrong prediction.
Accuracy is computed as the number of correct predictions divided by the total
number of predictions.
"""
num_correct = np.count_nonzero(self._matches)
return {self._name: num_correct / len(self._matches)}
def update(self, output, target):
"""Updates prediction matches.
:param output: model output
:param target: annotations
"""
predict = np.argmax(output[0], axis=1)
match = predict == target
self._matches.append(match)
def reset(self):
"""
Resets the Accuracy metric. This is a required method that should initialize all
attributes to their initial value.
"""
self._matches = []
def get_attributes(self):
"""
Returns a dictionary of metric attributes {metric_name: {attribute_name: value}}.
Required attributes: 'direction': 'higher-better' or 'higher-worse'
'type': metric type
"""
return {self._name: {"direction": "higher-better", "type": "accuracy"}}
# POT Optimization
# Step 1: Load the model
model = load_model(model_config=model_config)
original_model = copy.deepcopy(model)
# Step 2: Initialize the data loader
data_loader = ClassificationDataLoader(data_source=data_dir)
# Step 3 (Optional. Required for AccuracyAwareQuantization): Initialize the metric
# Compute metric results on original model
metric = Accuracy()
# Step 4: Initialize the engine for metric calculation and statistics collection
engine = IEEngine(config=engine_config, data_loader=data_loader, metric=metric)
# Step 5: Create a pipeline of compression algorithms
pipeline = create_pipeline(algo_config=algorithms, engine=engine)
# Step 6: Execute the pipeline
compressed_model = pipeline.run(model=model)
# Step 7 (Optional): Compress model weights quantized precision
# in order to reduce the size of final .bin file
compress_model_weights(model=compressed_model)
# Step 8: Save the compressed model and get the path to the model
compressed_model_paths = save_model(
model=compressed_model, save_path=os.path.join(os.path.curdir, "model/optimized")
)
compressed_model_xml = Path(compressed_model_paths[0]["model"])
print(f"The quantized model is stored in {compressed_model_xml}")
# Step 9 (Optional): Evaluate the original and compressed model. Print the results
original_metric_results = pipeline.evaluate(original_model)
if original_metric_results:
print(f"Accuracy of the original model: {next(iter(original_metric_results.values())):.5f}")
quantized_metric_results = pipeline.evaluate(compressed_model)
if quantized_metric_results:
print(f"Accuracy of the quantized model: {next(iter(quantized_metric_results.values())):.5f}")
# Run Inference on Quantized Model
def pre_process_image(imagePath, img_height=180):
# Model input format
n, c, h, w = [1, 3, img_height, img_height]
image = Image.open(imagePath)
image = image.resize((h, w), resample=Image.BILINEAR)
# Convert to array and change data layout from HWC to CHW
image = np.array(image)
input_image = image.reshape((n, h, w, c))
return input_image
# Load the optimized model and get the names of the input and output layer
ie = Core()
model_pot = ie.read_model(model="model/optimized/flower_ir.xml")
compiled_model_pot = ie.compile_model(model=model_pot, device_name="CPU")
input_layer = compiled_model_pot.input(0)
output_layer = compiled_model_pot.output(0)
# Get the class names: a list of directory names in alphabetical order
class_names = sorted([item.name for item in Path(data_dir).iterdir() if item.is_dir()])
# Run inference on an input image...
inp_img_url = (
"https://upload.wikimedia.org/wikipedia/commons/4/48/A_Close_Up_Photo_of_a_Dandelion.jpg"
)
directory = "output"
inp_file_name = "A_Close_Up_Photo_of_a_Dandelion.jpg"
file_path = Path(directory)/Path(inp_file_name)
# Download the image if it does not exist yet
if not Path(inp_file_name).exists():
download_file(inp_img_url, inp_file_name, directory=directory)
# Pre-process the image and get it ready for inference.
input_image = pre_process_image(imagePath=file_path)
print(f'input image shape: {input_image.shape}')
print(f'input layer shape: {input_layer.shape}')
res = compiled_model_pot([input_image])[output_layer]
score = tf.nn.softmax(res[0])
# Show the results
image = Image.open(file_path)
plt.imshow(image)
print(
"This image most likely belongs to {} with a {:.2f} percent confidence.".format(
class_names[np.argmax(score)], 100 * np.max(score)
)
)
# Compare Inference Speed
# print the available devices on this system
ie = Core()
print("Device information:")
print(ie.get_property("CPU", "FULL_DEVICE_NAME"))
if "GPU" in ie.available_devices:
print(ie.get_property("GPU", "FULL_DEVICE_NAME"))
# Original model - CPU
benchmark_model(model_path=model_xml, device="CPU", seconds=15, api='async')
# Quantized model - CPU
benchmark_model(model_path=compressed_model_xml, device="CPU", seconds=15, api='async')
# Original model - MULTI:CPU,GPU
if "GPU" in ie.available_devices:
benchmark_model(model_path=model_xml, device="MULTI:CPU,GPU", seconds=15, api='async')
else:
print("A supported integrated GPU is not available on this system.")
# Quantized model - MULTI:CPU,GPU
if "GPU" in ie.available_devices:
benchmark_model(model_path=compressed_model_xml, device="MULTI:CPU,GPU", seconds=15, api='async')
else:
print("A supported integrated GPU is not available on this system.")
# print the available devices on this system
print("Device information:")
print(ie.get_property("CPU", "FULL_DEVICE_NAME"))
if "GPU" in ie.available_devices:
print(ie.get_property("GPU", "FULL_DEVICE_NAME"))
# Original IR model - CPU
benchmark_output = %sx benchmark_app -m $model_xml -t 15 -api async
# Remove logging info from benchmark_app output and show only the results
benchmark_result = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line=="")]
print("\n".join(benchmark_result))
# Quantized IR model - CPU
benchmark_output = %sx benchmark_app -m $compressed_model_xml -t 15 -api async
# Remove logging info from benchmark_app output and show only the results
benchmark_result = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line=="")]
print("\n".join(benchmark_result))
# Original IR model - MULTI:CPU,GPU
ie = Core()
if "GPU" in ie.available_devices:
benchmark_output = %sx benchmark_app -m $model_xml -d MULTI:CPU,GPU -t 15 -api async
# Remove logging info from benchmark_app output and show only the results
benchmark_result = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line=="")]
print("\n".join(benchmark_result))
else:
print("An integrated GPU is not available on this system.")
# Quantized IR model - MULTI:CPU,GPU
ie = Core()
if "GPU" in ie.available_devices:
benchmark_output = %sx benchmark_app -m $compressed_model_xml -d MULTI:CPU,GPU -t 15 -api async
# Remove logging info from benchmark_app output and show only the results
benchmark_result = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line=="")]
print("\n".join(benchmark_result))
else:
print("An integrated GPU is not available on this system.")
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #quantization #nncf #optimization #pytorch
302-pytorch-quantization-aware-training: Optimizing PyTorch models with Neural Network Compression Framework of OpenVINO by 8-bit quantization# Imports and Settings
# On Windows, add the directory that contains cl.exe to the PATH to enable PyTorch to find the
# required C++ tools. This code assumes that Visual Studio 2019 is installed in the default
# directory. If you have a different C++ compiler, please add the correct path to os.environ["PATH"]
# directly. Note that the C++ Redistributable is not enough to run this notebook.
# Adding the path to os.environ["LIB"] is not always required - it depends on the system's configuration
import sys
if sys.platform == "win32":
import distutils.command.build_ext
import os
from pathlib import Path
VS_INSTALL_DIR = r"C:/Program Files (x86)/Microsoft Visual Studio"
cl_paths = sorted(list(Path(VS_INSTALL_DIR).glob("**/Hostx86/x64/cl.exe")))
if len(cl_paths) == 0:
raise ValueError(
"Cannot find Visual Studio. This notebook requires a C++ compiler. If you installed "
"a C++ compiler, please add the directory that contains cl.exe to `os.environ['PATH']`."
)
else:
# If multiple versions of MSVC are installed, get the most recent version
cl_path = cl_paths[-1]
vs_dir = str(cl_path.parent)
os.environ["PATH"] += f"{os.pathsep}{vs_dir}"
# Code for finding the library dirs from
# https://stackoverflow.com/questions/47423246/get-pythons-lib-path
d = distutils.core.Distribution()
b = distutils.command.build_ext.build_ext(d)
b.finalize_options()
os.environ["LIB"] = os.pathsep.join(b.library_dirs)
print(f"Added {vs_dir} to PATH")
import sys
import time
import warnings # to disable warnings on export to ONNX
import zipfile
from pathlib import Path
import logging
import torch
import nncf # Important - should be imported directly after torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
from nncf.common.utils.logger import set_log_level
set_log_level(logging.ERROR) # Disables all NNCF info and warning messages
from nncf import NNCFConfig
from nncf.torch import create_compressed_model, register_default_init_args
from openvino.runtime import Core
from torch.jit import TracerWarning
sys.path.append("../utils")
from notebook_utils import download_file
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")
MODEL_DIR = Path("model")
OUTPUT_DIR = Path("output")
DATA_DIR = Path("data")
BASE_MODEL_NAME = "resnet18"
image_size = 64
OUTPUT_DIR.mkdir(exist_ok=True)
MODEL_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
# Paths where PyTorch, ONNX and OpenVINO IR models will be stored
fp32_pth_path = Path(MODEL_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".pth")
fp32_onnx_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".onnx")
fp32_ir_path = fp32_onnx_path.with_suffix(".xml")
int8_onnx_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_int8")).with_suffix(".onnx")
int8_ir_path = int8_onnx_path.with_suffix(".xml")
# It's possible to train FP32 model from scratch, but it might be slow. So the pre-trained weights are downloaded by default.
pretrained_on_tiny_imagenet = True
fp32_pth_url = "https://storage.openvinotoolkit.org/repositories/nncf/openvino_notebook_ckpts/302_resnet18_fp32_v1.pth"
download_file(fp32_pth_url, directory=MODEL_DIR, filename=fp32_pth_path.name)
# Download Tiny ImageNet dataset
def download_tiny_imagenet_200(
data_dir: Path,
url="http://cs231n.stanford.edu/tiny-imagenet-200.zip",
tarname="tiny-imagenet-200.zip",
):
archive_path = data_dir / tarname
download_file(url, directory=data_dir, filename=tarname)
zip_ref = zipfile.ZipFile(archive_path, "r")
zip_ref.extractall(path=data_dir)
zip_ref.close()
def prepare_tiny_imagenet_200(dataset_dir: Path):
# format validation set the same way as train set is formatted
val_data_dir = dataset_dir / 'val'
val_annotations_file = val_data_dir / 'val_annotations.txt'
with open(val_annotations_file, 'r') as f:
val_annotation_data = map(lambda line: line.split('\t')[:2], f.readlines())
val_images_dir = val_data_dir / 'images'
for image_filename, image_label in val_annotation_data:
from_image_filepath = val_images_dir / image_filename
to_image_dir = val_data_dir / image_label
if not to_image_dir.exists():
to_image_dir.mkdir()
to_image_filepath = to_image_dir / image_filename
from_image_filepath.rename(to_image_filepath)
val_annotations_file.unlink()
val_images_dir.rmdir()
DATASET_DIR = DATA_DIR / "tiny-imagenet-200"
if not DATASET_DIR.exists():
download_tiny_imagenet_200(DATA_DIR)
prepare_tiny_imagenet_200(DATASET_DIR)
print(f"Successfully downloaded and prepared dataset at: {DATASET_DIR}")
# Pre-train Floating-Point Model
# Train Function
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter("Time", ":3.3f")
losses = AverageMeter("Loss", ":2.3f")
top1 = AverageMeter("Acc@1", ":2.2f")
top5 = AverageMeter("Acc@5", ":2.2f")
progress = ProgressMeter(
len(train_loader), [batch_time, losses, top1, top5], prefix="Epoch:[{}]".format(epoch)
)
# switch to train mode
model.train()
end = time.time()
for i, (images, target) in enumerate(train_loader):
images = images.to(device)
target = target.to(device)
# compute output
output = model(images)
loss = criterion(output, target)
# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))
# compute gradient and do opt step
optimizer.zero_grad()
loss.backward()
optimizer.step()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
print_frequency = 50
if i % print_frequency == 0:
progress.display(i)
# Validate Function
def validate(val_loader, model, criterion):
batch_time = AverageMeter("Time", ":3.3f")
losses = AverageMeter("Loss", ":2.3f")
top1 = AverageMeter("Acc@1", ":2.2f")
top5 = AverageMeter("Acc@5", ":2.2f")
progress = ProgressMeter(len(val_loader), [batch_time, losses, top1, top5], prefix="Test: ")
# switch to evaluate mode
model.eval()
with torch.no_grad():
end = time.time()
for i, (images, target) in enumerate(val_loader):
images = images.to(device)
target = target.to(device)
# compute output
output = model(images)
loss = criterion(output, target)
# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
print_frequency = 10
if i % print_frequency == 0:
progress.display(i)
print(" * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}".format(top1=top1, top5=top5))
return top1.avg
# Helpers
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=":f"):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print("\t".join(entries))
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = "{:" + str(num_digits) + "d}"
return "[" + fmt + "/" + fmt.format(num_batches) + "]"
def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
# Get a Pre-trained FP32 Model
num_classes = 200 # 200 is for Tiny ImageNet, default is 1000 for ImageNet
init_lr = 1e-4
batch_size = 128
epochs = 4
model = models.resnet18(pretrained=not pretrained_on_tiny_imagenet)
# update the last FC layer for Tiny ImageNet number of classes
model.fc = nn.Linear(in_features=512, out_features=num_classes, bias=True)
model.to(device)
# Data loading code
train_dir = DATASET_DIR / "train"
val_dir = DATASET_DIR / "val"
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
train_dataset = datasets.ImageFolder(
train_dir,
transforms.Compose(
[
transforms.Resize(image_size),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
]
),
)
val_dataset = datasets.ImageFolder(
val_dir,
transforms.Compose(
[
transforms.Resize(image_size),
transforms.ToTensor(),
normalize,
]
),
)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, sampler=None
)
val_loader = torch.utils.data.DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True
)
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=init_lr)
if pretrained_on_tiny_imagenet:
#
# ** WARNING: torch.load functionality uses Python's pickling module that
# may be used to perform arbitrary code execution during unpickling. Only load data that you
# trust.
#
checkpoint = torch.load(str(fp32_pth_path), map_location="cpu")
model.load_state_dict(checkpoint["state_dict"], strict=True)
acc1_fp32 = checkpoint["acc1"]
else:
best_acc1 = 0
# Training loop
for epoch in range(0, epochs):
# run a single training epoch
train(train_loader, model, criterion, optimizer, epoch)
# evaluate on validation set
acc1 = validate(val_loader, model, criterion)
is_best = acc1 > best_acc1
best_acc1 = max(acc1, best_acc1)
if is_best:
checkpoint = {"state_dict": model.state_dict(), "acc1": acc1}
torch.save(checkpoint, fp32_pth_path)
acc1_fp32 = best_acc1
print(f"Accuracy of FP32 model: {acc1_fp32:.3f}")
dummy_input = torch.randn(1, 3, image_size, image_size).to(device)
torch.onnx.export(model, dummy_input, fp32_onnx_path)
print(f"FP32 ONNX model was exported to {fp32_onnx_path}.")
# Create and Initialize Quantization
nncf_config_dict = {
"input_info": {"sample_size": [1, 3, image_size, image_size]},
"log_dir": str(OUTPUT_DIR), # log directory for NNCF-specific logging outputs
"compression": {
"algorithm": "quantization", # specify the algorithm here
},
}
nncf_config = NNCFConfig.from_dict(nncf_config_dict)
nncf_config = register_default_init_args(nncf_config, train_loader)
compression_ctrl, model = create_compressed_model(model, nncf_config)
acc1 = validate(val_loader, model, criterion)
print(f"Accuracy of initialized INT8 model: {acc1:.3f}")
# Fine-tune the Compressed Model
compression_lr = init_lr / 10
optimizer = torch.optim.Adam(model.parameters(), lr=compression_lr)
# train for one epoch with NNCF
train(train_loader, model, criterion, optimizer, epoch=0)
# evaluate on validation set after Quantization-Aware Training (QAT case)
acc1_int8 = validate(val_loader, model, criterion)
print(f"Accuracy of tuned INT8 model: {acc1_int8:.3f}")
print(f"Accuracy drop of tuned INT8 model over pre-trained FP32 model: {acc1_fp32 - acc1_int8:.3f}")
# Export INT8 Model to ONNX
if not int8_onnx_path.exists():
warnings.filterwarnings("ignore", category=TracerWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# Export INT8 model to ONNX that is supported by the OpenVINO™ toolkit
compression_ctrl.export_model(int8_onnx_path)
print(f"INT8 ONNX model exported to {int8_onnx_path}.")
# Convert ONNX models to OpenVINO Intermediate Representation (IR)
if not fp32_ir_path.exists():
!mo --input_model $fp32_onnx_path --input_shape "[1,3, $image_size, $image_size]" --mean_values "[123.675, 116.28 , 103.53]" --scale_values "[58.395, 57.12 , 57.375]" --data_type FP16 --output_dir $OUTPUT_DIR
if not int8_ir_path.exists():
!mo --input_model $int8_onnx_path --input_shape "[1,3, $image_size, $image_size]" --mean_values "[123.675, 116.28 , 103.53]" --scale_values "[58.395, 57.12 , 57.375]" --data_type FP16 --output_dir $OUTPUT_DIR
# Benchmark Model Performance by Computing Inference Time
def parse_benchmark_output(benchmark_output):
parsed_output = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line == "")]
print(*parsed_output, sep='\n')
print('Benchmark FP32 model (IR)')
benchmark_output = ! benchmark_app -m $fp32_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
print('Benchmark INT8 model (IR)')
benchmark_output = ! benchmark_app -m $int8_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
# Show CPU Information for reference
ie = Core()
ie.get_property(device_name="CPU", name="FULL_DEVICE_NAME")
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #tensorflow #quantization #nncf #optimization
305-tensorflow-quantization-aware-training: Optimizing TensorFlow models with Neural Network Compression Framework of OpenVINO by 8-bit quantization# Imports and Settings
from pathlib import Path
import logging
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.python.keras import layers
from tensorflow.python.keras import models
from nncf import NNCFConfig
from nncf.tensorflow.helpers.model_creation import create_compressed_model
from nncf.tensorflow.initialization import register_default_init_args
from nncf.common.utils.logger import set_log_level
set_log_level(logging.ERROR)
MODEL_DIR = Path("model")
OUTPUT_DIR = Path("output")
MODEL_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
BASE_MODEL_NAME = "ResNet-18"
fp32_h5_path = Path(MODEL_DIR / (BASE_MODEL_NAME + "_fp32")).with_suffix(".h5")
fp32_sm_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_fp32"))
fp32_ir_path = Path(OUTPUT_DIR / "saved_model").with_suffix(".xml")
int8_pb_path = Path(OUTPUT_DIR / (BASE_MODEL_NAME + "_int8")).with_suffix(".pb")
int8_pb_name = Path(BASE_MODEL_NAME + "_int8").with_suffix(".pb")
int8_ir_path = int8_pb_path.with_suffix(".xml")
BATCH_SIZE = 128
IMG_SIZE = (64, 64) # Default Imagenet image size
NUM_CLASSES = 10 # For Imagenette dataset
LR = 1e-5
MEAN_RGB = (0.485 * 255, 0.456 * 255, 0.406 * 255) # From Imagenet dataset
STDDEV_RGB = (0.229 * 255, 0.224 * 255, 0.225 * 255) # From Imagenet dataset
fp32_pth_url = "https://storage.openvinotoolkit.org/repositories/nncf/openvino_notebook_ckpts/305_resnet18_imagenette_fp32_v1.h5"
_ = tf.keras.utils.get_file(fp32_h5_path.resolve(), fp32_pth_url)
print(f'Absolute path where the model weights are saved:\n {fp32_h5_path.resolve()}')
# Dataset Preprocessing
datasets, datasets_info = tfds.load('imagenette/160px', shuffle_files=True, as_supervised=True, with_info=True,
read_config=tfds.ReadConfig(shuffle_seed=0))
train_dataset, validation_dataset = datasets['train'], datasets['validation']
fig = tfds.show_examples(train_dataset, datasets_info)
def preprocessing(image, label):
image = tf.image.resize(image, IMG_SIZE)
image = image - MEAN_RGB
image = image / STDDEV_RGB
label = tf.one_hot(label, NUM_CLASSES)
return image, label
train_dataset = (train_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(tf.data.experimental.AUTOTUNE))
validation_dataset = (validation_dataset.map(preprocessing, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(tf.data.experimental.AUTOTUNE))
# Define a Floating-Point Model
def residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre'):
def layer(input_tensor):
x = layers.BatchNormalization(epsilon=2e-5)(input_tensor)
x = layers.Activation('relu')(x)
# defining shortcut connection
if cut == 'pre':
shortcut = input_tensor
elif cut == 'post':
shortcut = layers.Conv2D(filters, (1, 1), strides=strides, kernel_initializer='he_uniform',
use_bias=False)(x)
# continue with convolution layers
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.Conv2D(filters, (3, 3), strides=strides, kernel_initializer='he_uniform', use_bias=False)(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.Conv2D(filters, (3, 3), kernel_initializer='he_uniform', use_bias=False)(x)
# add residual connection
x = layers.Add()([x, shortcut])
return x
return layer
def ResNet18(input_shape=None):
"""Instantiates the ResNet18 architecture."""
img_input = layers.Input(shape=input_shape, name='data')
# ResNet18 bottom
x = layers.BatchNormalization(epsilon=2e-5, scale=False)(img_input)
x = layers.ZeroPadding2D(padding=(3, 3))(x)
x = layers.Conv2D(64, (7, 7), strides=(2, 2), kernel_initializer='he_uniform', use_bias=False)(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
x = layers.ZeroPadding2D(padding=(1, 1))(x)
x = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='valid')(x)
# ResNet18 body
repetitions = (2, 2, 2, 2)
for stage, rep in enumerate(repetitions):
for block in range(rep):
filters = 64 * (2 ** stage)
if block == 0 and stage == 0:
x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='post')(x)
elif block == 0:
x = residual_conv_block(filters, stage, block, strides=(2, 2), cut='post')(x)
else:
x = residual_conv_block(filters, stage, block, strides=(1, 1), cut='pre')(x)
x = layers.BatchNormalization(epsilon=2e-5)(x)
x = layers.Activation('relu')(x)
# ResNet18 top
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(NUM_CLASSES)(x)
x = layers.Activation('softmax')(x)
# Create model
model = models.Model(img_input, x)
return model
IMG_SHAPE = IMG_SIZE + (3,)
model = ResNet18(input_shape=IMG_SHAPE)
# Pre-train Floating-Point Model
# Load the floating-point weights
model.load_weights(fp32_h5_path)
# Compile the floating-point model
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')])
# Validate the floating-point model
test_loss, acc_fp32 = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of FP32 model: {acc_fp32:.3f}")
model.save(fp32_sm_path)
print(f'Absolute path where the model is saved:\n {fp32_sm_path.resolve()}')
# Create and Initialize Quantization
nncf_config_dict = {
"input_info": {"sample_size": [1, 3] + list(IMG_SIZE)},
"log_dir": str(OUTPUT_DIR), # log directory for NNCF-specific logging outputs
"compression": {
"algorithm": "quantization", # specify the algorithm here
},
}
nncf_config = NNCFConfig.from_dict(nncf_config_dict)
nncf_config = register_default_init_args(nncf_config=nncf_config,
data_loader=train_dataset,
batch_size=BATCH_SIZE)
compression_ctrl, model = create_compressed_model(model, nncf_config)
# Compile the int8 model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=LR),
loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
metrics=[tf.keras.metrics.CategoricalAccuracy(name='acc@1')])
# Validate the int8 model
test_loss, test_acc = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of INT8 model after initialization: {test_acc:.3f}")
# Fine-tune the Compressed Model
# Train the int8 model
model.fit(train_dataset,
epochs=2)
# Validate the int8 model
test_loss, acc_int8 = model.evaluate(validation_dataset,
callbacks=tf.keras.callbacks.ProgbarLogger(stateful_metrics=['acc@1']))
print(f"\nAccuracy of INT8 model after fine-tuning: {acc_int8:.3f}")
print(f"\nAccuracy drop of tuned INT8 model over pre-trained FP32 model: {acc_fp32 - acc_int8:.3f}")
compression_ctrl.export_model(int8_pb_path, 'frozen_graph')
print(f'Absolute path where the int8 model is saved:\n {int8_pb_path.resolve()}')
# Export Frozen Graph Models to OpenVINO Intermediate Representation (IR)
!mo --framework=tf --input_shape=[1,64,64,3] --input=data --saved_model_dir=$fp32_sm_path --output_dir=$OUTPUT_DIR
!mo --framework=tf --input_shape=[1,64,64,3] --input=Placeholder --input_model=$int8_pb_path --output_dir=$OUTPUT_DIR
# Benchmark Model Performance by Computing Inference Time
def parse_benchmark_output(benchmark_output):
parsed_output = [line for line in benchmark_output if not (line.startswith(r"[") or line.startswith(" ") or line == "")]
print(*parsed_output, sep='\n')
print('Benchmark FP32 model (IR)')
benchmark_output = ! benchmark_app -m $fp32_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
print('\nBenchmark INT8 model (IR)')
benchmark_output = ! benchmark_app -m $int8_ir_path -d CPU -api async -t 15
parse_benchmark_output(benchmark_output)
# Show CPU Information for reference
from openvino.runtime import Core
ie = Core()
ie.get_property(device_name='CPU', name="FULL_DEVICE_NAME")
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #ocr #paddle-paddle #paddle-ocr #nlp
405-paddle-ocr-webcam: PaddleOCR with OpenVINO# Imports
import sys
import os
import cv2
import numpy as np
import paddle
import math
import time
import collections
from PIL import Image
from pathlib import Path
import tarfile
import urllib.request
from openvino.runtime import Core
from IPython import display
import copy
sys.path.append("../utils")
import notebook_utils as utils
import pre_post_processing as processing
# Models for PaddleOCR
# Define the function to download text detection and recognition models from PaddleOCR resources
def run_model_download(model_url, model_file_path):
"""
Download pre-trained models from PaddleOCR resources
Parameters:
model_url: url link to pre-trained models
model_file_path: file path to store the downloaded model
"""
model_name = model_url.split("/")[-1]
if model_file_path.is_file():
print("Model already exists")
else:
# Download the model from the server, and untar it.
print("Downloading the pre-trained model... May take a while...")
# create a directory
os.makedirs("model", exist_ok=True)
urllib.request.urlretrieve(model_url, f"model/{model_name} ")
print("Model Downloaded")
file = tarfile.open(f"model/{model_name} ")
res = file.extractall("model")
file.close()
if not res:
print(f"Model Extracted to {model_file_path}.")
else:
print("Error Extracting the model. Please check the network.")
# Download the Model for Text Detection
# Directory where model will be downloaded
det_model_url = "https://paddleocr.bj.bcebos.com/dygraph_v2.0/ch/ch_ppocr_mobile_v2.0_det_infer.tar"
det_model_file_path = Path("model/ch_ppocr_mobile_v2.0_det_infer/inference.pdmodel")
run_model_download(det_model_url, det_model_file_path)
# Load the Model for Text Detection
# initialize inference engine for text detection
core = Core()
det_model = core.read_model(model=det_model_file_path)
det_compiled_model = core.compile_model(model=det_model, device_name="CPU")
# get input and output nodes for text detection
det_input_layer = det_compiled_model.input(0)
det_output_layer = det_compiled_model.output(0)
# Download the Model for Text Recognition
rec_model_url = "https://paddleocr.bj.bcebos.com/dygraph_v2.0/ch/ch_ppocr_mobile_v2.0_rec_infer.tar"
rec_model_file_path = Path("model/ch_ppocr_mobile_v2.0_rec_infer/inference.pdmodel")
run_model_download(rec_model_url, rec_model_file_path)
# Load the Model for Text Recognition with Dynamic Shape
# read the model and corresponding weights from file
rec_model = core.read_model(model=rec_model_file_path)
# assign dynamic shapes to every input layer on the last dimension
for input_layer in rec_model.inputs:
input_shape = input_layer.partial_shape
input_shape[3] = -1
rec_model.reshape({input_layer: input_shape})
rec_compiled_model = core.compile_model(model=rec_model, device_name="CPU")
# get input and output nodes
rec_input_layer = rec_compiled_model.input(0)
rec_output_layer = rec_compiled_model.output(0)
# Preprocessing image functions for text detection and recognition
# Preprocess for text detection
def image_preprocess(input_image, size):
"""
Preprocess input image for text detection
Parameters:
input_image: input image
size: value for the image to be resized for text detection model
"""
img = cv2.resize(input_image, (size, size))
img = np.transpose(img, [2, 0, 1]) / 255
img = np.expand_dims(img, 0)
# NormalizeImage: {mean: [0.485, 0.456, 0.406], std: [0.229, 0.224, 0.225], is_scale: True}
img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
img -= img_mean
img /= img_std
return img.astype(np.float32)
# Preprocess for text recognition
def resize_norm_img(img, max_wh_ratio):
"""
Resize input image for text recognition
Parameters:
img: bounding box image from text detection
max_wh_ratio: value for the resizing for text recognition model
"""
rec_image_shape = [3, 32, 320]
imgC, imgH, imgW = rec_image_shape
assert imgC == img.shape[2]
character_type = "ch"
if character_type == "ch":
imgW = int((32 * max_wh_ratio))
h, w = img.shape[:2]
ratio = w / float(h)
if math.ceil(imgH * ratio) > imgW:
resized_w = imgW
else:
resized_w = int(math.ceil(imgH * ratio))
resized_image = cv2.resize(img, (resized_w, imgH))
resized_image = resized_image.astype('float32')
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5
padding_im = np.zeros((imgC, imgH, imgW), dtype=np.float32)
padding_im[:, :, 0:resized_w] = resized_image
return padding_im
def prep_for_rec(dt_boxes, frame):
"""
Preprocessing of the detected bounding boxes for text recognition
Parameters:
dt_boxes: detected bounding boxes from text detection
frame: original input frame
"""
ori_im = frame.copy()
img_crop_list = []
for bno in range(len(dt_boxes)):
tmp_box = copy.deepcopy(dt_boxes[bno])
img_crop = processing.get_rotate_crop_image(ori_im, tmp_box)
img_crop_list.append(img_crop)
img_num = len(img_crop_list)
# Calculate the aspect ratio of all text bars
width_list = []
for img in img_crop_list:
width_list.append(img.shape[1] / float(img.shape[0]))
# Sorting can speed up the recognition process
indices = np.argsort(np.array(width_list))
return img_crop_list, img_num, indices
def batch_text_box(img_crop_list, img_num, indices, beg_img_no, batch_num):
"""
Batch for text recognition
Parameters:
img_crop_list: processed detected bounding box images
img_num: number of bounding boxes from text detection
indices: sorting for bounding boxes to speed up text recognition
beg_img_no: the beginning number of bounding boxes for each batch of text recognition inference
batch_num: number of images for each batch
"""
norm_img_batch = []
max_wh_ratio = 0
end_img_no = min(img_num, beg_img_no + batch_num)
for ino in range(beg_img_no, end_img_no):
h, w = img_crop_list[indices[ino]].shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for ino in range(beg_img_no, end_img_no):
norm_img = resize_norm_img(img_crop_list[indices[ino]], max_wh_ratio)
norm_img = norm_img[np.newaxis, :]
norm_img_batch.append(norm_img)
norm_img_batch = np.concatenate(norm_img_batch)
norm_img_batch = norm_img_batch.copy()
return norm_img_batch
# Postprocessing image for text detection
def post_processing_detection(frame, det_results):
"""
Postprocess the results from text detection into bounding boxes
Parameters:
frame: input image
det_results: inference results from text detection model
"""
ori_im = frame.copy()
data = {'image': frame}
data_resize = processing.DetResizeForTest(data)
data_list = []
keep_keys = ['image', 'shape']
for key in keep_keys:
data_list.append(data_resize[key])
img, shape_list = data_list
shape_list = np.expand_dims(shape_list, axis=0)
pred = det_results[0]
if isinstance(pred, paddle.Tensor):
pred = pred.numpy()
segmentation = pred > 0.3
boxes_batch = []
for batch_index in range(pred.shape[0]):
src_h, src_w, ratio_h, ratio_w = shape_list[batch_index]
mask = segmentation[batch_index]
boxes, scores = processing.boxes_from_bitmap(pred[batch_index], mask, src_w, src_h)
boxes_batch.append({'points': boxes})
post_result = boxes_batch
dt_boxes = post_result[0]['points']
dt_boxes = processing.filter_tag_det_res(dt_boxes, ori_im.shape)
return dt_boxes
# Main processing function for PaddleOCR
def run_paddle_ocr(source=0, flip=False, use_popup=False, skip_first_frames=0):
"""
Main function to run the paddleOCR inference:
1. Create a video player to play with target fps (utils.VideoPlayer).
2. Prepare a set of frames for text detection and recognition.
3. Run AI inference for both text detection and recognition.
4. Visualize the results.
Parameters:
source: the webcam number to feed the video stream with primary webcam set to "0", or the video path.
flip: to be used by VideoPlayer function for flipping capture image
use_popup: False for showing encoded frames over this notebook, True for creating a popup window.
skip_first_frames: Number of frames to skip at the beginning of the video.
"""
# create video player to play with target fps
player = None
try:
player = utils.VideoPlayer(source=source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
# Start video capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(src=frame, dsize=None, fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
# preprocess image for text detection
test_image = image_preprocess(frame, 640)
# measure processing time for text detection
start_time = time.time()
# perform the inference step
det_results = det_compiled_model([test_image])[det_output_layer]
stop_time = time.time()
# Postprocessing for Paddle Detection
dt_boxes = post_processing_detection(frame, det_results)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
processing_time_det = np.mean(processing_times) * 1000
# Preprocess detection results for recognition
dt_boxes = processing.sorted_boxes(dt_boxes)
batch_num = 6
img_crop_list, img_num, indices = prep_for_rec(dt_boxes, frame)
# For storing recognition results, include two parts:
# txts are the recognized text results, scores are the recognition confidence level
rec_res = [['', 0.0]] * img_num
txts = []
scores = []
for beg_img_no in range(0, img_num, batch_num):
# Recognition starts from here
norm_img_batch = batch_text_box(
img_crop_list, img_num, indices, beg_img_no, batch_num)
# Run inference for text recognition
rec_results = rec_compiled_model([norm_img_batch])[rec_output_layer]
# Postprocessing recognition results
postprocess_op = processing.build_post_process(processing.postprocess_params)
rec_result = postprocess_op(rec_results)
for rno in range(len(rec_result)):
rec_res[indices[beg_img_no + rno]] = rec_result[rno]
if rec_res:
txts = [rec_res[i][0] for i in range(len(rec_res))]
scores = [rec_res[i][1] for i in range(len(rec_res))]
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
boxes = dt_boxes
# draw text recognition results beside the image
draw_img = processing.draw_ocr_box_txt(
image,
boxes,
txts,
scores,
drop_score=0.5)
# Visualize PaddleOCR results
f_height, f_width = draw_img.shape[:2]
fps = 1000 / processing_time_det
cv2.putText(img=draw_img, text=f"Inference time: {processing_time_det:.1f}ms ({fps:.1f} FPS)",
org=(20, 40),fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=f_width / 1000,
color=(0, 0, 255), thickness=1, lineType=cv2.LINE_AA)
# use this workaround if there is flickering
if use_popup:
draw_img = cv2.cvtColor(draw_img, cv2.COLOR_RGB2BGR)
cv2.imshow(winname=title, mat=draw_img)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
draw_img = cv2.cvtColor(draw_img, cv2.COLOR_RGB2BGR)
_, encoded_img = cv2.imencode(ext=".jpg", img=draw_img,
params=[cv2.IMWRITE_JPEG_QUALITY, 100])
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live PaddleOCR with OpenVINO
run_paddle_ocr(source=0, flip=False, use_popup=False)
# Test OCR results on video file
video_file = "https://raw.githubusercontent.com/yoyowz/classification/master/images/test.mp4"
run_paddle_ocr(source=video_file, flip=False, use_popup=False, skip_first_frames=0)
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #object-detection #onnx #onnx-runtime #openvino-onnx-runtime #openvino-execution-provider-for-onnx #tiny-yolov4
Object detection with YOLOv4 in Python using OpenVINO™ Execution Provider# pip3 install openvino
# Install ONNX Runtime for OpenVINO™ Execution Provider
# pip3 install onnxruntime-openvino==1.11.0
# pip3 install -r requirements.txt
# Running the ONNXRuntime OpenVINO™ Execution Provider sample
# python3 yolov4.py --device CPU_FP32 --video classroom.mp4 --model yolov4.onnx
'''
Copyright (C) 2021-2022, Intel Corporation
SPDX-License-Identifier: Apache-2.0
Major Portions of this code are copyright of their respective authors and released under the Apache License Version 2.0:
- onnx, Copyright 2021-2022. For licensing see https://github.com/onnx/models/blob/master/LICENSE
'''
import cv2
import numpy as np
from onnx import numpy_helper
import onnx
import onnxruntime as rt
import os
from PIL import Image
from scipy import special
import colorsys
import random
import argparse
import sys
import time
import platform
if platform.system() == "Windows":
from openvino import utils
utils.add_openvino_libs_to_path()
def image_preprocess(image, target_size, gt_boxes=None):
ih, iw = target_size
h, w, _ = image.shape
scale = min(iw/w, ih/h)
nw, nh = int(scale * w), int(scale * h)
image_resized = cv2.resize(image, (nw, nh))
image_padded = np.full(shape=[ih, iw, 3], fill_value=128.0)
dw, dh = (iw - nw) // 2, (ih-nh) // 2
image_padded[dh:nh+dh, dw:nw+dw, :] = image_resized
image_padded = image_padded / 255.
if gt_boxes is None:
return image_padded
else:
gt_boxes[:, [0, 2]] = gt_boxes[:, [0, 2]] * scale + dw
gt_boxes[:, [1, 3]] = gt_boxes[:, [1, 3]] * scale + dh
return image_padded, gt_boxes
def postprocess_bbbox(pred_bbox):
'''define anchor boxes'''
for i, pred in enumerate(pred_bbox):
conv_shape = pred.shape
output_size = conv_shape[1]
conv_raw_dxdy = pred[:, :, :, :, 0:2]
conv_raw_dwdh = pred[:, :, :, :, 2:4]
xy_grid = np.meshgrid(np.arange(output_size), np.arange(output_size))
xy_grid = np.expand_dims(np.stack(xy_grid, axis=-1), axis=2)
xy_grid = np.tile(np.expand_dims(xy_grid, axis=0), [1, 1, 1, 3, 1])
xy_grid = xy_grid.astype(float)
pred_xy = ((special.expit(conv_raw_dxdy) * XYSCALE[i]) - 0.5 * (XYSCALE[i] - 1) + xy_grid) * STRIDES[i]
pred_wh = (np.exp(conv_raw_dwdh) * ANCHORS[i])
pred[:, :, :, :, 0:4] = np.concatenate([pred_xy, pred_wh], axis=-1)
pred_bbox = [np.reshape(x, (-1, np.shape(x)[-1])) for x in pred_bbox]
pred_bbox = np.concatenate(pred_bbox, axis=0)
return pred_bbox
def postprocess_boxes(pred_bbox, org_img_shape, input_size, score_threshold):
'''remove boundary boxs with a low detection probability'''
valid_scale=[0, np.inf]
pred_bbox = np.array(pred_bbox)
pred_xywh = pred_bbox[:, 0:4]
pred_conf = pred_bbox[:, 4]
pred_prob = pred_bbox[:, 5:]
# # (1) (x, y, w, h) --> (xmin, ymin, xmax, ymax)
pred_coor = np.concatenate([pred_xywh[:, :2] - pred_xywh[:, 2:] * 0.5,
pred_xywh[:, :2] + pred_xywh[:, 2:] * 0.5], axis=-1)
# # (2) (xmin, ymin, xmax, ymax) -> (xmin_org, ymin_org, xmax_org, ymax_org)
org_h, org_w = org_img_shape
resize_ratio = min(input_size / org_w, input_size / org_h)
dw = (input_size - resize_ratio * org_w) / 2
dh = (input_size - resize_ratio * org_h) / 2
pred_coor[:, 0::2] = 1.0 * (pred_coor[:, 0::2] - dw) / resize_ratio
pred_coor[:, 1::2] = 1.0 * (pred_coor[:, 1::2] - dh) / resize_ratio
# # (3) clip some boxes that are out of range
pred_coor = np.concatenate([np.maximum(pred_coor[:, :2], [0, 0]),
np.minimum(pred_coor[:, 2:], [org_w - 1, org_h - 1])], axis=-1)
invalid_mask = np.logical_or((pred_coor[:, 0] > pred_coor[:, 2]), (pred_coor[:, 1] > pred_coor[:, 3]))
pred_coor[invalid_mask] = 0
# # (4) discard some invalid boxes
bboxes_scale = np.sqrt(np.multiply.reduce(pred_coor[:, 2:4] - pred_coor[:, 0:2], axis=-1))
scale_mask = np.logical_and((valid_scale[0] < bboxes_scale), (bboxes_scale < valid_scale[1]))
# # (5) discard some boxes with low scores
classes = np.argmax(pred_prob, axis=-1)
scores = pred_conf * pred_prob[np.arange(len(pred_coor)), classes]
score_mask = scores > score_threshold
mask = np.logical_and(scale_mask, score_mask)
coors, scores, classes = pred_coor[mask], scores[mask], classes[mask]
return np.concatenate([coors, scores[:, np.newaxis], classes[:, np.newaxis]], axis=-1)
def bboxes_iou(boxes1, boxes2):
'''calculate the Intersection Over Union value'''
boxes1 = np.array(boxes1)
boxes2 = np.array(boxes2)
boxes1_area = (boxes1[..., 2] - boxes1[..., 0]) * (boxes1[..., 3] - boxes1[..., 1])
boxes2_area = (boxes2[..., 2] - boxes2[..., 0]) * (boxes2[..., 3] - boxes2[..., 1])
left_up = np.maximum(boxes1[..., :2], boxes2[..., :2])
right_down = np.minimum(boxes1[..., 2:], boxes2[..., 2:])
inter_section = np.maximum(right_down - left_up, 0.0)
inter_area = inter_section[..., 0] * inter_section[..., 1]
union_area = boxes1_area + boxes2_area - inter_area
ious = np.maximum(1.0 * inter_area / union_area, np.finfo(np.float32).eps)
return ious
def nms(bboxes, iou_threshold, sigma=0.3, method='nms'):
"""
:param bboxes: (xmin, ymin, xmax, ymax, score, class)
Note: soft-nms, https://arxiv.org/pdf/1704.04503.pdf
https://github.com/bharatsingh430/soft-nms
"""
classes_in_img = list(set(bboxes[:, 5]))
best_bboxes = []
for cls in classes_in_img:
cls_mask = (bboxes[:, 5] == cls)
cls_bboxes = bboxes[cls_mask]
while len(cls_bboxes) > 0:
max_ind = np.argmax(cls_bboxes[:, 4])
best_bbox = cls_bboxes[max_ind]
best_bboxes.append(best_bbox)
cls_bboxes = np.concatenate([cls_bboxes[: max_ind], cls_bboxes[max_ind + 1:]])
iou = bboxes_iou(best_bbox[np.newaxis, :4], cls_bboxes[:, :4])
weight = np.ones((len(iou),), dtype=np.float32)
assert method in ['nms', 'soft-nms']
if method == 'nms':
iou_mask = iou > iou_threshold
weight[iou_mask] = 0.0
if method == 'soft-nms':
weight = np.exp(-(1.0 * iou ** 2 / sigma))
cls_bboxes[:, 4] = cls_bboxes[:, 4] * weight
score_mask = cls_bboxes[:, 4] > 0.
cls_bboxes = cls_bboxes[score_mask]
return best_bboxes
def read_class_names(class_file_name):
'''loads class name from a file'''
names = {}
with open(class_file_name, 'r') as data:
for ID, name in enumerate(data):
names[ID] = name.strip('\n')
return names
def draw_bbox(image, bboxes, classes=read_class_names("coco.names"), show_label=True):
"""
bboxes: [x_min, y_min, x_max, y_max, probability, cls_id] format coordinates.
"""
num_classes = len(classes)
image_h, image_w, _ = image.shape
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)
for i, bbox in enumerate(bboxes):
coor = np.array(bbox[:4], dtype=np.int32)
fontScale = 0.5
score = bbox[4]
class_ind = int(bbox[5])
bbox_color = colors[class_ind]
bbox_thick = int(0.6 * (image_h + image_w) / 600)
c1, c2 = (coor[0], coor[1]), (coor[2], coor[3])
cv2.rectangle(image, c1, c2, bbox_color, bbox_thick)
if show_label:
bbox_mess = '%s: %.2f' % (classes[class_ind], score)
t_size = cv2.getTextSize(bbox_mess, 0, fontScale, thickness=bbox_thick//2)[0]
cv2.rectangle(image, c1, (c1[0] + t_size[0], c1[1] - t_size[1] - 3), bbox_color, -1)
cv2.putText(image, bbox_mess, (c1[0], c1[1]-2), cv2.FONT_HERSHEY_SIMPLEX,
fontScale, (0, 0, 0), bbox_thick//2, lineType=cv2.LINE_AA)
return image
def get_anchors(anchors_path, tiny=False):
'''loads the anchors from a file'''
with open(anchors_path) as f:
anchors = f.readline()
anchors = np.array(anchors.split(','), dtype=np.float32)
return anchors.reshape(3, 3, 2)
#Specify the path to anchors file on your machine
ANCHORS = "./yolov4_anchors.txt"
STRIDES = [8, 16, 32]
XYSCALE = [1.2, 1.1, 1.05]
ANCHORS = get_anchors(ANCHORS)
STRIDES = np.array(STRIDES)
def parse_arguments():
parser = argparse.ArgumentParser(description='Object Detection using YOLOv4 in OPENCV using OpenVINO Execution Provider for ONNXRuntime')
parser.add_argument('--device', default='CPU_FP32', help="Device to perform inference on 'cpu (MLAS)' or on devices supported by OpenVINO-EP [CPU_FP32, GPU_FP32, GPU_FP16, MYRIAD_FP16, VAD-M_FP16].")
parser.add_argument('--image', help='Path to image file.')
parser.add_argument('--video', help='Path to video file.')
parser.add_argument('--model', help='Path to model.')
args = parser.parse_args()
return args
def check_model_extension(fp):
# Split the extension from the path and normalise it to lowercase.
ext = os.path.splitext(fp)[-1].lower()
# Now we can simply use != to check for inequality, no need for wildcards.
if(ext != ".onnx"):
raise Exception(fp, "is an unknown file format. Use the model ending with .onnx format")
if not os.path.exists(fp):
raise Exception("[ ERROR ] Path of the onnx model file is Invalid")
def main():
# Process arguments
args = parse_arguments()
# Validate model file path
check_model_extension(args.model)
# Process inputs
win_name = 'Object detection using ONNXRuntime OpenVINO Execution Provider using YoloV4 model'
cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)
output_file = "yolo_out_py.avi"
if (args.image):
# Open the image file
if not os.path.isfile(args.image):
print("Input image file ", args.image, " doesn't exist")
sys.exit(1)
cap = cv2.VideoCapture(args.image)
output_file = args.image[:-4]+'_yolo_out_py.jpg'
elif (args.video):
# Open the video file
if not os.path.isfile(args.video):
print("Input video file ", args.video, " doesn't exist")
sys.exit(1)
cap = cv2.VideoCapture(args.video)
output_file = args.video[:-4]+'_yolo_out_py.avi'
else:
# Webcam input
cap = cv2.VideoCapture(0)
# Get the video writer initialized to save the output video
if (not args.image):
vid_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc('M','J','P','G'), 30, (round(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
# Check the device information and create a session
device = args.device
so = rt.SessionOptions()
so.log_severity_level = 3
if(args.device == 'cpu'):
print("Device type selected is 'cpu' which is the default CPU Execution Provider (MLAS)")
#Specify the path to the ONNX model on your machine and register the CPU EP
sess = rt.InferenceSession(args.model, so, providers=['CPUExecutionProvider'])
else:
#Specify the path to the ONNX model on your machine and register the OpenVINO EP
sess = rt.InferenceSession(args.model, so, providers=['OpenVINOExecutionProvider'], provider_options=[{'device_type' : device}])
print("Device type selected is: " + device + " using the OpenVINO Execution Provider")
'''
other 'device_type' options are: (Any hardware target can be assigned if you have the access to it)
'CPU_FP32', 'GPU_FP32', 'GPU_FP16', 'MYRIAD_FP16', 'VAD-M_FP16'
'''
input_name = sess.get_inputs()[0].name
while cv2.waitKey(1) < 0:
# get frame from the video
has_frame, frame = cap.read()
# Stop the program if reached end of video
if not has_frame:
print("Done processing !!!")
print("Output file is stored as ", output_file)
has_frame=False
cv2.waitKey(3000)
# Release device
cap.release()
break
input_size = 416
original_image = frame
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
original_image_size = original_image.shape[:2]
image_data = image_preprocess(np.copy(original_image), [input_size, input_size])
image_data = image_data[np.newaxis, ...].astype(np.float32)
outputs = sess.get_outputs()
output_names = list(map(lambda output: output.name, outputs))
start = time.time()
detections = sess.run(output_names, {input_name: image_data})
end = time.time()
inference_time = end - start
pred_bbox = postprocess_bbbox(detections)
bboxes = postprocess_boxes(pred_bbox, original_image_size, input_size, 0.25)
bboxes = nms(bboxes, 0.213, method='nms')
image = draw_bbox(original_image, bboxes)
cv2.putText(image,device,(10,20),cv2.FONT_HERSHEY_COMPLEX,0.5,(255,255,255),1)
cv2.putText(image,'FPS: {}'.format(1.0/inference_time),(10,40),cv2.FONT_HERSHEY_COMPLEX,0.5,(255,255,255),1)
# Write the frame with the detection boxes
if (args.image):
cv2.imwrite(output_file, image.astype(np.uint8))
else:
vid_writer.write(image.astype(np.uint8))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
cv2.imshow(win_name, image)
if __name__ == "__main__":
main()
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #object-detection #onnx #onnx-runtime #openvino-onnx-runtime #yolov2 #openvino-execution-provider-for-onnx
Object detection with tinyYOLOv2 in Python using OpenVINO™ Execution Provider# pip3 install openvino
# Install ONNX Runtime for OpenVINO™ Execution Provider
# pip3 install onnxruntime-openvino==1.11.0
# pip3 install -r requirements.txt
# How to run the sample
# python3 tiny_yolov2_obj_detection_sample.py --h
# Running the ONNXRuntime OpenVINO™ Execution Provider sample
# python3 tiny_yolov2_obj_detection_sample.py --video face-demographics-walking-and-pause.mp4 --model tinyyolov2.onnx --device CPU_FP32
'''
Copyright (C) 2021-2022, Intel Corporation
SPDX-License-Identifier: Apache-2.0
'''
import numpy as np
import onnxruntime as rt
import cv2
import time
import os
import argparse
import platform
if platform.system() == "Windows":
from openvino import utils
utils.add_openvino_libs_to_path()
# color look up table for different classes for object detection sample
clut = [(0,0,0),(255,0,0),(255,0,255),(0,0,255),(0,255,0),(0,255,128),
(128,255,0),(128,128,0),(0,128,255),(128,0,128),
(255,0,128),(128,0,255),(255,128,128),(128,255,128),(255,255,0),
(255,128,128),(128,128,255),(255,128,128),(128,255,128),(128,255,128)]
# 20 labels that the tiny-yolov2 model can do the object_detection on
label = ["aeroplane","bicycle","bird","boat","bottle",
"bus","car","cat","chair","cow","diningtable",
"dog","horse","motorbike","person","pottedplant",
"sheep","sofa","train","tvmonitor"]
def parse_arguments():
parser = argparse.ArgumentParser(description='Object Detection using YOLOv2 in OPENCV using OpenVINO Execution Provider for ONNXRuntime')
parser.add_argument('--device', default='CPU_FP32', help="Device to perform inference on 'cpu (MLAS)' or on devices supported by OpenVINO-EP [CPU_FP32, GPU_FP32, GPU_FP16, MYRIAD_FP16, VAD-M_FP16].")
parser.add_argument('--video', help='Path to video file.')
parser.add_argument('--model', help='Path to model.')
args = parser.parse_args()
return args
def sigmoid(x, derivative=False):
return x*(1-x) if derivative else 1/(1+np.exp(-x))
def softmax(x):
score_mat_exp = np.exp(np.asarray(x))
return score_mat_exp / score_mat_exp.sum(0)
def check_model_extension(fp):
# Split the extension from the path and normalise it to lowercase.
ext = os.path.splitext(fp)[-1].lower()
# Now we can simply use != to check for inequality, no need for wildcards.
if(ext != ".onnx"):
raise Exception(fp, "is an unknown file format. Use the model ending with .onnx format")
if not os.path.exists(fp):
raise Exception("[ ERROR ] Path of the onnx model file is Invalid")
def check_video_file_extension(fp):
# Split the extension from the path and normalise it to lowercase.
ext = os.path.splitext(fp)[-1].lower()
# Now we can simply use != to check for inequality, no need for wildcards.
if(ext == ".mp4" or ext == ".avi" or ext == ".mov"):
pass
else:
raise Exception(fp, "is an unknown file format. Use the video file ending with .mp4 or .avi or .mov formats")
if not os.path.exists(fp):
raise Exception("[ ERROR ] Path of the video file is Invalid")
def image_preprocess(frame):
in_frame = cv2.resize(frame, (416, 416))
preprocessed_image = np.asarray(in_frame)
preprocessed_image = preprocessed_image.astype(np.float32)
preprocessed_image = preprocessed_image.transpose(2,0,1)
#Reshaping the input array to align with the input shape of the model
preprocessed_image = preprocessed_image.reshape(1,3,416,416)
return preprocessed_image
def postprocess_output(out, frame, x_scale, y_scale, i):
out = out[0][0]
num_classes = 20
anchors = [1.08, 1.19, 3.42, 4.41, 6.63, 11.38, 9.42, 5.11, 16.62, 10.52]
existing_labels = {l: [] for l in label}
#Inside this loop we compute the bounding box b for grid cell (cy, cx)
for cy in range(0,13):
for cx in range(0,13):
for b in range(0,5):
# First we read the tx, ty, width(tw), and height(th) for the bounding box from the out array, as well as the confidence score
channel = b*(num_classes+5)
tx = out[channel ][cy][cx]
ty = out[channel+1][cy][cx]
tw = out[channel+2][cy][cx]
th = out[channel+3][cy][cx]
tc = out[channel+4][cy][cx]
x = (float(cx) + sigmoid(tx))*32
y = (float(cy) + sigmoid(ty))*32
w = np.exp(tw) * 32 * anchors[2*b]
h = np.exp(th) * 32 * anchors[2*b+1]
#calculating the confidence score
confidence = sigmoid(tc) # The confidence value for the bounding box is given by tc
classes = np.zeros(num_classes)
for c in range(0,num_classes):
classes[c] = out[channel + 5 +c][cy][cx]
# we take the softmax to turn the array into a probability distribution. And then we pick the class with the largest score as the winner.
classes = softmax(classes)
detected_class = classes.argmax()
# Now we can compute the final score for this bounding box and we only want to keep the ones whose combined score is over a certain threshold
if 0.60 < classes[detected_class]*confidence:
color =clut[detected_class]
x = (x - w/2)*x_scale
y = (y - h/2)*y_scale
w *= x_scale
h *= y_scale
labelX = int((x+x+w)/2)
labelY = int((y+y+h)/2)
addLabel = True
lab_threshold = 100
for point in existing_labels[label[detected_class]]:
if labelX < point[0] + lab_threshold and labelX > point[0] - lab_threshold and \
labelY < point[1] + lab_threshold and labelY > point[1] - lab_threshold:
addLabel = False
#Adding class labels to the output of the frame and also drawing a rectangular bounding box around the object detected.
if addLabel:
cv2.rectangle(frame, (int(x),int(y)),(int(x+w),int(y+h)),color,2)
cv2.rectangle(frame, (int(x),int(y-13)),(int(x)+9*len(label[detected_class]),int(y)),color,-1)
cv2.putText(frame,label[detected_class],(int(x)+2,int(y)-3),cv2.FONT_HERSHEY_COMPLEX,0.4,(255,255,255),1)
existing_labels[label[detected_class]].append((labelX,labelY))
print('{} detected in frame {}'.format(label[detected_class],i))
def show_bbox(device, frame, inference_time):
cv2.putText(frame,device,(10,20),cv2.FONT_HERSHEY_COMPLEX,0.5,(255,255,255),1)
cv2.putText(frame,'FPS: {}'.format(1.0/inference_time),(10,40),cv2.FONT_HERSHEY_COMPLEX,0.5,(255,255,255),1)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
cv2.imshow('frame',frame)
def main():
# Process arguments
args = parse_arguments()
# Validate model file path
check_model_extension(args.model)
so = rt.SessionOptions()
so.log_severity_level = 3
if (args.device == 'cpu'):
print("Device type selected is 'cpu' which is the default CPU Execution Provider (MLAS)")
#Specify the path to the ONNX model on your machine and register the CPU EP
sess = rt.InferenceSession(args.model, so, providers=['CPUExecutionProvider'])
elif (args.device == 'CPU_FP32' or args.device == 'GPU_FP32' or args.device == 'GPU_FP16' or args.device == 'MYRIAD_FP16' or args.device == 'VADM_FP16'):
#Specify the path to the ONNX model on your machine and register the OpenVINO EP
sess = rt.InferenceSession(args.model, so, providers=['OpenVINOExecutionProvider'], provider_options=[{'device_type' : args.device}])
print("Device type selected is: " + args.device + " using the OpenVINO Execution Provider")
'''
other 'device_type' options are: (Any hardware target can be assigned if you have the access to it)
'CPU_FP32', 'GPU_FP32', 'GPU_FP16', 'MYRIAD_FP16', 'VAD-M_FP16'
'''
else:
raise Exception("Device type selected is not [cpu, CPU_FP32, GPU_FP32, GPU_FP16, MYRIAD_FP16, VADM_FP16]")
# Get the input name of the model
input_name = sess.get_inputs()[0].name
#validate video file input path
check_video_file_extension(args.video)
#Path to video file has to be provided
cap = cv2.VideoCapture(args.video)
# capturing different metrics of the image from the video
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
x_scale = float(width)/416.0 #In the document of tino-yolo-v2, input shape of this network is (1,3,416,416).
y_scale = float(height)/416.0
# writing the inferencing output as a video to the local disk
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_video_name = args.device + "_output.avi"
output_video = cv2.VideoWriter(output_video_name,fourcc, float(17.0), (640,360))
# capturing one frame at a time from the video feed and performing the inference
i = 0
while cv2.waitKey(1) < 0:
l_start = time.time()
ret, frame = cap.read()
if not ret:
break
initial_w = cap.get(3)
initial_h = cap.get(4)
# preprocessing the input frame and reshaping it.
#In the document of tino-yolo-v2, input shape of this network is (1,3,416,416). so we resize the model frame w.r.t that size.
preprocessed_image = image_preprocess(frame)
start = time.time()
#Running the session by passing in the input data of the model
out = sess.run(None, {input_name: preprocessed_image})
end = time.time()
inference_time = end - start
#Get the output
postprocess_output(out, frame, x_scale, y_scale, i)
#Show the Output
output_video.write(frame)
show_bbox(args.device, frame, inference_time)
#Press 'q' to quit the process
print('Processed Frame {}'.format(i))
i += 1
l_end = time.time()
print('Loop Time = {}'.format(l_end - l_start))
output_video.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #ocr #chinese #japanese #handwritten
209-handwritten-ocr: Handwritten Chinese and Japanese OCR# Imports
from collections import namedtuple
from itertools import groupby
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
from openvino.runtime import Core
# Settings
# Directories where data will be placed
model_folder = "model"
data_folder = "data"
charlist_folder = f"{data_folder}/charlists"
# Precision used by model
precision = "FP16"
Language = namedtuple(
typename="Language", field_names=["model_name", "charlist_name", "demo_image_name"]
)
chinese_files = Language(
model_name="handwritten-simplified-chinese-recognition-0001",
charlist_name="chinese_charlist.txt",
demo_image_name="handwritten_chinese_test.jpg",
)
japanese_files = Language(
model_name="handwritten-japanese-recognition-0001",
charlist_name="japanese_charlist.txt",
demo_image_name="handwritten_japanese_test.png",
)
# Select Language
# Select language by using either language='chinese' or language='japanese'
language = "chinese"
languages = {"chinese": chinese_files, "japanese": japanese_files}
selected_language = languages.get(language)
# Download Model
path_to_model_weights = Path(f'{model_folder}/intel/{selected_language.model_name}/{precision}/{selected_language.model_name}.bin')
if not path_to_model_weights.is_file():
download_command = f'omz_downloader --name {selected_language.model_name} --output_dir {model_folder} --precision {precision}'
print(download_command)
! $download_command
# Load Network and Execute
ie = Core()
path_to_model = path_to_model_weights.with_suffix(".xml")
model = ie.read_model(model=path_to_model)
# Select Device Name
# To check available device names run the line below
# print(ie.available_devices)
compiled_model = ie.compile_model(model=model, device_name="CPU")
# Fetch Information About Input and Output Layers
recognition_output_layer = compiled_model.output(0)
recognition_input_layer = compiled_model.input(0)
# Load an Image
# Read file name of demo file based on the selected model
file_name = selected_language.demo_image_name
# Text detection models expects an image in grayscale format
# IMPORTANT!!! This model allows to read only one line at time
# Read image
image = cv2.imread(filename=f"{data_folder}/{file_name}", flags=cv2.IMREAD_GRAYSCALE)
# Fetch shape
image_height, _ = image.shape
# B,C,H,W = batch size, number of channels, height, width
_, _, H, W = recognition_input_layer.shape
# Calculate scale ratio between input shape height and image height to resize image
scale_ratio = H / image_height
# Resize image to expected input sizes
resized_image = cv2.resize(
image, None, fx=scale_ratio, fy=scale_ratio, interpolation=cv2.INTER_AREA
)
# Pad image to match input size, without changing aspect ratio
resized_image = np.pad(
resized_image, ((0, 0), (0, W - resized_image.shape[1])), mode="edge"
)
# Reshape to network the input shape
input_image = resized_image[None, None, :, :]
# Visualise Input Image
plt.figure(figsize=(20, 1))
plt.axis("off")
plt.imshow(resized_image, cmap="gray", vmin=0, vmax=255);
# Prepare Charlist
# Get dictionary to encode output, based on model documentation
used_charlist = selected_language.charlist_name
# With both models, there should be blank symbol added at index 0 of each charlist
blank_char = "~"
with open(f"{charlist_folder}/{used_charlist}", "r", encoding="utf-8") as charlist:
letters = blank_char + "".join(line.strip() for line in charlist)
# Run Inference
# Run inference on the model
predictions = compiled_model([input_image])[recognition_output_layer]
# Process Output Data
# Remove batch dimension
predictions = np.squeeze(predictions)
# Run argmax to pick the symbols with the highest probability
predictions_indexes = np.argmax(predictions, axis=1)
# Use groupby to remove concurrent letters, as required by CTC greedy decoding
output_text_indexes = list(groupby(predictions_indexes))
# Remove grouper objects
output_text_indexes, _ = np.transpose(output_text_indexes, (1, 0))
# Remove blank symbols
output_text_indexes = output_text_indexes[output_text_indexes != 0]
# Assign letters to indexes from output array
output_text = [letters[letter_index] for letter_index in output_text_indexes]
# Print Output
plt.figure(figsize=(20, 1))
plt.axis("off")
plt.imshow(resized_image, cmap="gray", vmin=0, vmax=255)
print("".join(output_text))
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #ocr
208-optical-character-recognition: Optical Character Recognition (OCR) with OpenVINO# Imports
import shutil
import sys
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import Markdown, display
from PIL import Image
from openvino.runtime import Core
from yaspin import yaspin
sys.path.append("../utils")
from notebook_utils import load_image
# Settings
ie = Core()
model_dir = Path("model")
precision = "FP16"
detection_model = "horizontal-text-detection-0001"
recognition_model = "text-recognition-resnet-fc"
base_model_dir = Path("~/open_model_zoo_models").expanduser()
omz_cache_dir = Path("~/open_model_zoo_cache").expanduser()
model_dir.mkdir(exist_ok=True)
# Download Models
download_command = f"omz_downloader --name {detection_model},{recognition_model} --output_dir {base_model_dir} --cache_dir {omz_cache_dir} --precision {precision}"
display(Markdown(f"Download command: `{download_command}`"))
with yaspin(text=f"Downloading {detection_model}, {recognition_model}") as sp:
download_result = !$download_command
print(download_result)
sp.text = f"Finished downloading {detection_model}, {recognition_model}"
sp.ok("✔")
# Convert Models
convert_command = f"omz_converter --name {recognition_model} --precisions {precision} --download_dir {base_model_dir} --output_dir {base_model_dir}"
display(Markdown(f"Convert command: `{convert_command}`"))
display(Markdown(f"Converting {recognition_model}..."))
! $convert_command
# Copy Models
models_info_output = %sx omz_info_dumper --name $detection_model,$recognition_model
print(f'sx omz_info_dumper --name {detection_model},{recognition_model}')
detection_model_info, recognition_model_info = [
{
"name": "horizontal-text-detection-0001",
"composite_model_name": None,
"description": "Horizontal text detector based on FCOS with light MobileNetV2 backbone",
"framework": "dldt",
"license_url": "https://raw.githubusercontent.com/openvinotoolkit/open_model_zoo/master/LICENSE",
"precisions": [
"FP16",
"FP16-INT8",
"FP32"
],
"quantization_output_precisions": [],
"subdirectory": "intel/horizontal-text-detection-0001",
"task_type": "detection"
},
{
"name": "text-recognition-resnet-fc",
"composite_model_name": None,
"description": "\"text-recognition-resnet-fc\" is a simple and preformant scene text recognition model based on ResNet with Fully Connected text recognition head. Source implementation on a PyTorch* framework could be found here <https://github.com/Media-Smart/vedastr>. Model is able to recognize alphanumeric text.",
"framework": "pytorch",
"license_url": "https://raw.githubusercontent.com/Media-Smart/vedastr/0fd2a0bd7819ae4daa2a161501e9f1c2ac67e96a/LICENSE",
"precisions": [
"FP16",
"FP32"
],
"quantization_output_precisions": [],
"subdirectory": "public/text-recognition-resnet-fc",
"task_type": "optical_character_recognition"
}
]
for model_info in (detection_model_info, recognition_model_info):
omz_dir = Path(model_info["subdirectory"])
omz_model_dir = base_model_dir / omz_dir / precision
print(omz_model_dir)
for model_file in omz_model_dir.iterdir():
try:
shutil.copyfile(model_file, model_dir / model_file.name)
except FileExistsError:
pass
detection_model_path = (model_dir / detection_model).with_suffix(".xml")
recognition_model_path = (model_dir / recognition_model).with_suffix(".xml")
# Load Detection Model
detection_model = ie.read_model(
model=detection_model_path, weights=detection_model_path.with_suffix(".bin")
)
detection_compiled_model = ie.compile_model(model=detection_model, device_name="CPU")
detection_input_layer = detection_compiled_model.input(0)
# Load an Image
# image_file can point to a URL or local image
image_file = "https://github.com/openvinotoolkit/openvino_notebooks/raw/main/notebooks/004-hello-detection/data/intel_rnb.jpg"
image = load_image(image_file)
# N,C,H,W = batch size, number of channels, height, width
N, C, H, W = detection_input_layer.shape
# Resize image to meet network expected input sizes
resized_image = cv2.resize(image, (W, H))
# Reshape to network input shape
input_image = np.expand_dims(resized_image.transpose(2, 0, 1), 0)
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB));
# Do Inference
output_key = detection_compiled_model.output("boxes")
boxes = detection_compiled_model([input_image])[output_key]
# Remove zero only boxes
boxes = boxes[~np.all(boxes == 0, axis=1)]
# Get Detection Results
def multiply_by_ratio(ratio_x, ratio_y, box):
return [
max(shape * ratio_y, 10) if idx % 2 else shape * ratio_x
for idx, shape in enumerate(box[:-1])
]
def run_preprocesing_on_crop(crop, net_shape):
temp_img = cv2.resize(crop, net_shape)
temp_img = temp_img.reshape((1,) * 2 + temp_img.shape)
return temp_img
def convert_result_to_image(bgr_image, resized_image, boxes, threshold=0.3, conf_labels=True):
# Define colors for boxes and descriptions
colors = {"red": (255, 0, 0), "green": (0, 255, 0), "white": (255, 255, 255)}
# Fetch image shapes to calculate ratio
(real_y, real_x), (resized_y, resized_x) = image.shape[:2], resized_image.shape[:2]
ratio_x, ratio_y = real_x / resized_x, real_y / resized_y
# Convert base image from bgr to rgb format
rgb_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)
# Iterate through non-zero boxes
for box, annotation in boxes:
# Pick confidence factor from last place in array
conf = box[-1]
if conf > threshold:
# Convert float to int and multiply position of each box by x and y ratio
(x_min, y_min, x_max, y_max) = map(int, multiply_by_ratio(ratio_x, ratio_y, box))
# Draw box based on position, parameters in rectangle function are: image, start_point, end_point, color, thickness
cv2.rectangle(rgb_image, (x_min, y_min), (x_max, y_max), colors["green"], 3)
# Add text to image based on position and confidence, parameters in putText function are: image, text, bottomleft_corner_textfield, font, font_scale, color, thickness, line_type
if conf_labels:
# Create background box based on annotation length
(text_w, text_h), _ = cv2.getTextSize(
f"{annotation}", cv2.FONT_HERSHEY_TRIPLEX, 0.8, 1
)
image_copy = rgb_image.copy()
cv2.rectangle(
image_copy,
(x_min, y_min - text_h - 10),
(x_min + text_w, y_min - 10),
colors["white"],
-1,
)
# Add weighted image copy with white boxes under text
cv2.addWeighted(image_copy, 0.4, rgb_image, 0.6, 0, rgb_image)
cv2.putText(
rgb_image,
f"{annotation}",
(x_min, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
colors["red"],
1,
cv2.LINE_AA,
)
return rgb_image
# Load Text Recognition Model
recognition_model = ie.read_model(
model=recognition_model_path, weights=recognition_model_path.with_suffix(".bin")
)
recognition_compiled_model = ie.compile_model(model=recognition_model, device_name="CPU")
recognition_output_layer = recognition_compiled_model.output(0)
recognition_input_layer = recognition_compiled_model.input(0)
# Get height and width of input layer
_, _, H, W = recognition_input_layer.shape
# Do Inference
# Calculate scale for image resizing
(real_y, real_x), (resized_y, resized_x) = image.shape[:2], resized_image.shape[:2]
ratio_x, ratio_y = real_x / resized_x, real_y / resized_y
# Convert image to grayscale for text recognition model
grayscale_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Get dictionary to encode output, based on model documentation
letters = "~0123456789abcdefghijklmnopqrstuvwxyz"
# Prepare empty list for annotations
annotations = list()
cropped_images = list()
# fig, ax = plt.subplots(len(boxes), 1, figsize=(5,15), sharex=True, sharey=True)
# For each crop, based on boxes given by detection model we want to get annotations
for i, crop in enumerate(boxes):
# Get coordinates on corners of crop
(x_min, y_min, x_max, y_max) = map(int, multiply_by_ratio(ratio_x, ratio_y, crop))
image_crop = run_preprocesing_on_crop(grayscale_image[y_min:y_max, x_min:x_max], (W, H))
# Run inference with recognition model
result = recognition_compiled_model([image_crop])[recognition_output_layer]
# Squeeze output to remove unnececery dimension
recognition_results_test = np.squeeze(result)
# Read annotation based on probabilities from output layer
annotation = list()
for letter in recognition_results_test:
parsed_letter = letters[letter.argmax()]
# Returning 0 index from argmax signalises end of string
if parsed_letter == letters[0]:
break
annotation.append(parsed_letter)
annotations.append("".join(annotation))
cropped_image = Image.fromarray(image[y_min:y_max, x_min:x_max])
cropped_images.append(cropped_image)
boxes_with_annotations = list(zip(boxes, annotations))
# Show Detected Text Boxes and OCR Results for the Image
plt.figure(figsize=(12, 12))
plt.imshow(convert_result_to_image(image, resized_image, boxes_with_annotations, conf_labels=True));
# Show the OCR Result per Bounding Box
for cropped_image, annotation in zip(cropped_images, annotations):
display(cropped_image, Markdown("".join(annotation)))
# Print Annotations in Plain Text Format
[
annotation
for _, annotation in sorted(zip(boxes, annotations), key=lambda x: x[0][0] ** 2 + x[0][1] ** 2)
]
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #question-answering #bert
213-question-answering: Interactive Question Answering with OpenVINO# Imports
import operator
import time
from urllib import parse
import numpy as np
from openvino.runtime import Core
import html_reader as reader
import tokens_bert as tokens
# Download the model
# directory where model will be downloaded
base_model_dir = "model"
# desired precision
precision = "FP16-INT8"
# model name as named in Open Model Zoo
model_name = "bert-small-uncased-whole-word-masking-squad-int8-0002"
model_path = f"model/intel/{model_name}/{precision}/{model_name}.xml"
model_weights_path = f"model/intel/{model_name}/{precision}/{model_name}.bin"
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir} " \
f"--cache_dir {base_model_dir}"
! $download_command
# Load the model
# initialize inference engine
core = Core()
# read the network and corresponding weights from file
model = core.read_model(model=model_path, weights=model_weights_path)
# load the model on the CPU (you can use GPU as well)
compiled_model = core.compile_model(model=model, device_name="CPU")
# get input and output names of nodes
input_keys = list(compiled_model.inputs)
output_keys = list(compiled_model.outputs)
# get network input size
input_size = compiled_model.input(0).shape[1]
# Processing
# path to vocabulary file
vocab_file_path = "data/vocab.txt"
# create dictionary with words and their indices
vocab = tokens.load_vocab_file(vocab_file_path)
# define special tokens
cls_token = vocab["[CLS]"]
pad_token = vocab["[PAD]"]
sep_token = vocab["[SEP]"]
# function to load text from given urls
def load_context(sources):
input_urls = []
paragraphs = []
for source in sources:
result = parse.urlparse(source)
if all([result.scheme, result.netloc]):
input_urls.append(source)
else:
paragraphs.append(source)
paragraphs.extend(reader.get_paragraphs(input_urls))
# produce one big context string
return "\n".join(paragraphs)
# Preprocessing
# generator of a sequence of inputs
def prepare_input(question_tokens, context_tokens):
# length of question in tokens
question_len = len(question_tokens)
# context part size
context_len = input_size - question_len - 3
if context_len < 16:
raise RuntimeError("Question is too long in comparison to input size. No space for context")
# take parts of context with overlapping by 0.5
for start in range(0, max(1, len(context_tokens) - context_len), context_len // 2):
# part of context
part_context_tokens = context_tokens[start:start + context_len]
# input: question and context separated by special tokens
input_ids = [cls_token] + question_tokens + [sep_token] + part_context_tokens + [sep_token]
# 1 for any index if there is no padding token, 0 otherwise
attention_mask = [1] * len(input_ids)
# 0 for question tokens, 1 for context part
token_type_ids = [0] * (question_len + 2) + [1] * (len(part_context_tokens) + 1)
# add padding at the end
(input_ids, attention_mask, token_type_ids), pad_number = pad(input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids)
# create input to feed the model
input_dict = {
"input_ids": np.array([input_ids], dtype=np.int32),
"attention_mask": np.array([attention_mask], dtype=np.int32),
"token_type_ids": np.array([token_type_ids], dtype=np.int32),
}
# some models require additional position_ids
if "position_ids" in [i_key.any_name for i_key in input_keys]:
position_ids = np.arange(len(input_ids))
input_dict["position_ids"] = np.array([position_ids], dtype=np.int32)
yield input_dict, pad_number, start
# function to add padding
def pad(input_ids, attention_mask, token_type_ids):
# how many padding tokens
diff_input_size = input_size - len(input_ids)
if diff_input_size > 0:
# add padding to all inputs
input_ids = input_ids + [pad_token] * diff_input_size
attention_mask = attention_mask + [0] * diff_input_size
token_type_ids = token_type_ids + [0] * diff_input_size
return (input_ids, attention_mask, token_type_ids), diff_input_size
# Postprocessing
# based on https://github.com/openvinotoolkit/open_model_zoo/blob/bf03f505a650bafe8da03d2747a8b55c5cb2ef16/demos/common/python/openvino/model_zoo/model_api/models/bert.py#L163
def postprocess(output_start, output_end, question_tokens, context_tokens_start_end, padding, start_idx):
def get_score(logits):
out = np.exp(logits)
return out / out.sum(axis=-1)
# get start-end scores for context
score_start = get_score(output_start)
score_end = get_score(output_end)
# index of first context token in tensor
context_start_idx = len(question_tokens) + 2
# index of last+1 context token in tensor
context_end_idx = input_size - padding - 1
# find product of all start-end combinations to find the best one
max_score, max_start, max_end = find_best_answer_window(start_score=score_start,
end_score=score_end,
context_start_idx=context_start_idx,
context_end_idx=context_end_idx)
# convert to context text start-end index
max_start = context_tokens_start_end[max_start + start_idx][0]
max_end = context_tokens_start_end[max_end + start_idx][1]
return max_score, max_start, max_end
# based on https://github.com/openvinotoolkit/open_model_zoo/blob/bf03f505a650bafe8da03d2747a8b55c5cb2ef16/demos/common/python/openvino/model_zoo/model_api/models/bert.py#L188
def find_best_answer_window(start_score, end_score, context_start_idx, context_end_idx):
context_len = context_end_idx - context_start_idx
score_mat = np.matmul(
start_score[context_start_idx:context_end_idx].reshape((context_len, 1)),
end_score[context_start_idx:context_end_idx].reshape((1, context_len)),
)
# reset candidates with end before start
score_mat = np.triu(score_mat)
# reset long candidates (>16 words)
score_mat = np.tril(score_mat, 16)
# find the best start-end pair
max_s, max_e = divmod(score_mat.flatten().argmax(), score_mat.shape[1])
max_score = score_mat[max_s, max_e]
return max_score, max_s, max_e
def get_best_answer(question, context):
# convert context string to tokens
context_tokens, context_tokens_start_end = tokens.text_to_tokens(text=context.lower(),
vocab=vocab)
# convert question string to tokens
question_tokens, _ = tokens.text_to_tokens(text=question.lower(), vocab=vocab)
results = []
# iterate through different parts of context
for network_input, padding, start_idx in prepare_input(question_tokens=question_tokens,
context_tokens=context_tokens):
# get output layers
output_start_key = compiled_model.output("output_s")
output_end_key = compiled_model.output("output_e")
# openvino inference
result = compiled_model(network_input)
# postprocess the result getting the score and context range for the answer
score_start_end = postprocess(output_start=result[output_start_key][0],
output_end=result[output_end_key][0],
question_tokens=question_tokens,
context_tokens_start_end=context_tokens_start_end,
padding=padding,
start_idx=start_idx)
results.append(score_start_end)
# find the highest score
answer = max(results, key=operator.itemgetter(0))
# return the part of the context, which is already an answer
return context[answer[1]:answer[2]], answer[0]
# Main Processing Function
def run_question_answering(sources):
print(f"Context: {sources}", flush=True)
context = load_context(sources)
if len(context) == 0:
print("Error: Empty context or outside paragraphs")
return
while True:
question = input()
# if no question - break
if question == "":
break
# measure processing time
start_time = time.perf_counter()
answer, score = get_best_answer(question=question, context=context)
end_time = time.perf_counter()
print(f"Question: {question}")
print(f"Answer: {answer}")
print(f"Score: {score:.2f}")
print(f"Time: {end_time - start_time:.2f}s")
# Run on local paragraphs
sources = ["Computational complexity theory is a branch of the theory of computation in theoretical computer "
"science that focuses on classifying computational problems according to their inherent difficulty, "
"and relating those classes to each other. A computational problem is understood to be a task that "
"is in principle amenable to being solved by a computer, which is equivalent to stating that the "
"problem may be solved by mechanical application of mathematical steps, such as an algorithm."]
run_question_answering(sources)
# Run on websites
sources = ["https://en.wikipedia.org/wiki/OpenVINO"]
run_question_answering(sources)
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp ##speechtotext
211-speech-to-text: Speech to Text with OpenVINO# Imports
from pathlib import Path
import IPython.display as ipd
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import scipy
from openvino.runtime import Core
# Settings
model_folder = "model"
download_folder = "output"
data_folder = "data"
precision = "FP16"
model_name = "quartznet-15x5-en"
# Download Model
# Check if model is already downloaded in download directory
path_to_model_weights = Path(f'{download_folder}/public/{model_name}/models')
downloaded_model_file = list(path_to_model_weights.glob('*.pth'))
if not path_to_model_weights.is_dir() or len(downloaded_model_file) == 0:
download_command = f"omz_downloader --name {model_name} --output_dir {download_folder} --precision {precision}"
! $download_command
# Convert Model
# Check if model is already converted in model directory
path_to_converted_weights = Path(f'{model_folder}/public/{model_name}/{precision}/{model_name}.bin')
if not path_to_converted_weights.is_file():
convert_command = f"omz_converter --name {model_name} --precisions {precision} --download_dir {download_folder} --output_dir {model_folder}"
! $convert_command
# Defining constants
audio_file_name = "edge_to_cloud.ogg"
alphabet = " abcdefghijklmnopqrstuvwxyz'~"
# Load Audio File
audio, sampling_rate = librosa.load(path=f'{data_folder}/{audio_file_name}', sr=16000)
ipd.Audio(audio, rate=sampling_rate)
# Visualise Audio File
plt.figure()
librosa.display.waveplot(y=audio, sr=sampling_rate, max_points=50000.0, x_axis='time', offset=0.0, max_sr=1000);
plt.show()
specto_audio = librosa.stft(audio)
specto_audio = librosa.amplitude_to_db(np.abs(specto_audio), ref=np.max)
print(specto_audio.shape)
librosa.display.specshow(specto_audio, sr=sampling_rate, x_axis='time', y_axis='hz');
# Change Type of Data
if max(np.abs(audio)) <= 1:
audio = (audio * (2**15 - 1))
audio = audio.astype(np.int16)
# Convert Audio to Mel Spectrum
def audio_to_mel(audio, sampling_rate):
assert sampling_rate == 16000, "Only 16 KHz audio supported"
preemph = 0.97
preemphased = np.concatenate([audio[:1], audio[1:] - preemph * audio[:-1].astype(np.float32)])
# Calculate window length
win_length = round(sampling_rate * 0.02)
# Based on previously calculated window length run short-time Fourier transform
spec = np.abs(librosa.core.spectrum.stft(preemphased, n_fft=512, hop_length=round(sampling_rate * 0.01),
win_length=win_length, center=True, window=scipy.signal.windows.hann(win_length), pad_mode='reflect'))
# Create mel filter-bank, produce transformation matrix to project current values onto Mel-frequency bins
mel_basis = librosa.filters.mel(sampling_rate, 512, n_mels=64, fmin=0.0, fmax=8000.0, htk=False)
return mel_basis, spec
def mel_to_input(mel_basis, spec, padding=16):
# Convert to logarithmic scale
log_melspectrum = np.log(np.dot(mel_basis, np.power(spec, 2)) + 2 ** -24)
# Normalize output
normalized = (log_melspectrum - log_melspectrum.mean(1)[:, None]) / (log_melspectrum.std(1)[:, None] + 1e-5)
# Calculate padding
remainder = normalized.shape[1] % padding
if remainder != 0:
return np.pad(normalized, ((0, 0), (0, padding - remainder)))[None]
return normalized[None]
# Run Conversion from Audio to Mel Format
mel_basis, spec = audio_to_mel(audio=audio.flatten(), sampling_rate=sampling_rate)
# Visualise Mel Spectogram
librosa.display.specshow(data=spec, sr=sampling_rate, x_axis='time', y_axis='log');
plt.show();
librosa.display.specshow(data=mel_basis, sr=sampling_rate, x_axis='linear');
plt.ylabel('Mel filter');
# Adjust Mel scale to Input
audio = mel_to_input(mel_basis=mel_basis, spec=spec)
# Load Model
ie = Core()
model = ie.read_model(
model=f"{model_folder}/public/{model_name}/{precision}/{model_name}.xml"
)
model_input_layer = model.input(0)
shape = model_input_layer.partial_shape
shape[2] = -1
model.reshape({model_input_layer: shape})
compiled_model = ie.compile_model(model=model, device_name="CPU")
# Do Inference
output_layer_ir = compiled_model.output(0)
character_probabilities = compiled_model([audio])[output_layer_ir]
# Read Output
# Remove unnececery dimension
character_probabilities = np.squeeze(character_probabilities)
# Run argmax to pick most possible symbols
character_probabilities = np.argmax(character_probabilities, axis=1)
# Implementation of Decoding
def ctc_greedy_decode(predictions):
previous_letter_id = blank_id = len(alphabet) - 1
transcription = list()
for letter_index in predictions:
if previous_letter_id != letter_index != blank_id:
transcription.append(alphabet[letter_index])
previous_letter_id = letter_index
return ''.join(transcription)
# Run Decoding and Print Output
transcription = ctc_greedy_decode(character_probabilities)
print(transcription)
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #action-recognition
403-action-recognition-webcam: Human Action Recognition with OpenVINO# Imports
import collections
import os
import sys
import time
from typing import Tuple, List
import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core
from openvino.runtime.ie_api import CompiledModel
sys.path.append("../utils")
import notebook_utils as utils
# Download the models
# Directory where model will be downloaded
base_model_dir = "model"
# Model name as named in Open Model Zoo
model_name = "action-recognition-0001"
# Selected precision (FP32, FP16, FP16-INT8)
precision = "FP16"
model_path_decoder = (
f"model/intel/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"
)
model_path_encoder = (
f"model/intel/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
)
if not os.path.exists(model_path_decoder) or not os.path.exists(model_path_encoder):
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir}"
! $download_command
# Load your labels
labels = "data/kinetics.txt"
with open(labels) as f:
labels = [line.strip() for line in f]
print(labels[0:9], np.shape(labels))
# Model Initialization function
# Initialize inference engine
ie_core = Core()
def model_init(model_path: str) -> Tuple:
"""
Read the network and weights from file, load the
model on the CPU and get input and output names of nodes
:param: model: model architecture path *.xml
:retuns:
compiled_model: Compiled model
input_key: Input node for model
output_key: Output node for model
"""
# Read the network and corresponding weights from file
model = ie_core.read_model(model=model_path)
# compile the model for the CPU (you can use GPU or MYRIAD as well)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# Get input and output names of nodes
input_keys = compiled_model.input(0)
output_keys = compiled_model.output(0)
return input_keys, output_keys, compiled_model
# Initialization for Encoder and Decoder
# Encoder initialization
input_key_en, output_keys_en, compiled_model_en = model_init(model_path_encoder)
# Decoder initialization
input_key_de, output_keys_de, compiled_model_de = model_init(model_path_decoder)
# Get input size - Encoder
height_en, width_en = list(input_key_en.shape)[2:]
# Get input size - Decoder
frames2decode = list(input_key_de.shape)[0:][1]
# Helper functions
def center_crop(frame: np.ndarray) -> np.ndarray:
"""
Center crop squared the original frame to standardize the input image to the encoder model
:param frame: input frame
:returns: center-crop-squared frame
"""
img_h, img_w, _ = frame.shape
min_dim = min(img_h, img_w)
start_x = int((img_w - min_dim) / 2.0)
start_y = int((img_h - min_dim) / 2.0)
roi = [start_y, (start_y + min_dim), start_x, (start_x + min_dim)]
return frame[start_y : (start_y + min_dim), start_x : (start_x + min_dim), ...], roi
def adaptive_resize(frame: np.ndarray, size: int) -> np.ndarray:
"""
The frame going to be resized to have a height of size or a width of size
:param frame: input frame
:param size: input size to encoder model
:returns: resized frame, np.array type
"""
h, w, _ = frame.shape
scale = size / min(h, w)
w_scaled, h_scaled = int(w * scale), int(h * scale)
if w_scaled == w and h_scaled == h:
return frame
return cv2.resize(frame, (w_scaled, h_scaled))
def decode_output(probs: np.ndarray, labels: np.ndarray, top_k: int = 3) -> np.ndarray:
"""
Decodes top probabilities into corresponding label names
:param probs: confidence vector for 400 actions
:param labels: list of actions
:param top_k: The k most probable positions in the list of labels
:returns: decoded_labels: The k most probable actions from the labels list
decoded_top_probs: confidence for the k most probable actions
"""
top_ind = np.argsort(-1 * probs)[:top_k]
out_label = np.array(labels)[top_ind.astype(int)]
decoded_labels = [out_label[0][0], out_label[0][1], out_label[0][2]]
top_probs = np.array(probs)[0][top_ind.astype(int)]
decoded_top_probs = [top_probs[0][0], top_probs[0][1], top_probs[0][2]]
return decoded_labels, decoded_top_probs
def rec_frame_display(frame: np.ndarray, roi) -> np.ndarray:
"""
Draw a rec frame over actual frame
:param frame: input frame
:param roi: Region of interest, image section processed by the Encoder
:returns: frame with drawed shape
"""
cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 3, roi[0] + 100), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 100, roi[0] + 3), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 3, roi[1] - 100), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 100, roi[1] - 3), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 3, roi[0] + 100), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 100, roi[0] + 3), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 3, roi[1] - 100), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 100, roi[1] - 3), (0, 200, 0), 2)
# Write ROI over actual frame
FONT_STYLE = cv2.FONT_HERSHEY_SIMPLEX
org = (roi[2] + 3, roi[1] - 3)
org2 = (roi[2] + 2, roi[1] - 2)
FONT_SIZE = 0.5
FONT_COLOR = (0, 200, 0)
FONT_COLOR2 = (0, 0, 0)
cv2.putText(frame, "ROI", org2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
cv2.putText(frame, "ROI", org, FONT_STYLE, FONT_SIZE, FONT_COLOR)
return frame
def display_text_fnc(frame: np.ndarray, display_text: str, index: int):
"""
Include text on the analized frame
:param frame: input frame
:param display_text: text to add on the frame
:param index: index line dor adding text
"""
# Configuration for displaying images with text
FONT_COLOR = (255, 255, 255)
FONT_COLOR2 = (0, 0, 0)
FONT_STYLE = cv2.FONT_HERSHEY_DUPLEX
FONT_SIZE = 0.7
TEXT_VERTICAL_INTERVAL = 25
TEXT_LEFT_MARGIN = 15
# ROI over actual frame
(processed, roi) = center_crop(frame)
# Draw a ROI over actual frame
frame = rec_frame_display(frame, roi)
# Put text over actual frame
text_loc = (TEXT_LEFT_MARGIN, TEXT_VERTICAL_INTERVAL * (index + 1))
text_loc2 = (TEXT_LEFT_MARGIN + 1, TEXT_VERTICAL_INTERVAL * (index + 1) + 1)
cv2.putText(frame, display_text, text_loc2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
cv2.putText(frame, display_text, text_loc, FONT_STYLE, FONT_SIZE, FONT_COLOR)
# AI Functions
def preprocessing(frame: np.ndarray, size: int) -> np.ndarray:
"""
Preparing frame before Encoder.
The image should be scaled to its shortest dimension at "size"
and cropped, centered, and squared so that both width and
height have lengths "size". Frame must be transposed from
Height-Width-Channels (HWC) to Channels-Height-Width (CHW).
:param frame: input frame
:param size: input size to encoder model
:returns: resized and cropped frame
"""
# Adaptative resize
preprocessed = adaptive_resize(frame, size)
# Center_crop
(preprocessed, roi) = center_crop(preprocessed)
# Transpose frame HWC -> CHW
preprocessed = preprocessed.transpose((2, 0, 1))[None,] # HWC -> CHW
return preprocessed, roi
def encoder(
preprocessed: np.ndarray,
compiled_model: CompiledModel
) -> List:
"""
Encoder Inference per frame. This function calls the network previously
configured for the encoder model (compiled_model), extracts the data
from the output node, and appends it in an array to be used by the decoder.
:param: preprocessed: preprocessing frame
:param: compiled_model: Encoder model network
:returns: encoder_output: embedding layer that is appended with each arriving frame
"""
output_key_en = compiled_model.output(0)
# Get results on action-recognition-0001-encoder model
infer_result_encoder = compiled_model([preprocessed])[output_key_en]
return infer_result_encoder
def decoder(encoder_output: List, compiled_model_de: CompiledModel) -> List:
"""
Decoder inference per set of frames. This function concatenates the embedding layer
froms the encoder output, transpose the array to match with the decoder input size.
Calls the network previously configured for the decoder model (compiled_model_de), extracts
the logits and normalize those to get confidence values along specified axis.
Decodes top probabilities into corresponding label names
:param: encoder_output: embedding layer for 16 frames
:param: compiled_model_de: Decoder model network
:returns: decoded_labels: The k most probable actions from the labels list
decoded_top_probs: confidence for the k most probable actions
"""
# Concatenate sample_duration frames in just one array
decoder_input = np.concatenate(encoder_output, axis=0)
# Organize input shape vector to the Decoder (shape: [1x16x512]]
decoder_input = decoder_input.transpose((2, 0, 1, 3))
decoder_input = np.squeeze(decoder_input, axis=3)
output_key_de = compiled_model_de.output(0)
# Get results on action-recognition-0001-decoder model
result_de = compiled_model_de([decoder_input])[output_key_de]
# Normalize logits to get confidence values along specified axis
probs = softmax(result_de - np.max(result_de))
# Decodes top probabilities into corresponding label names
decoded_labels, decoded_top_probs = decode_output(probs, labels, top_k=3)
return decoded_labels, decoded_top_probs
def softmax(x: np.ndarray) -> np.ndarray:
"""
Normalizes logits to get confidence values along specified axis
x: np.array, axis=None
"""
exp = np.exp(x)
return exp / np.sum(exp, axis=None)
# Main Processing Function
def run_action_recognition(
source: str = "0",
flip: bool = True,
use_popup: bool = False,
compiled_model_en: CompiledModel = compiled_model_en,
compiled_model_de: CompiledModel = compiled_model_de,
skip_first_frames: int = 0,
):
"""
Use the "source" webcam or video file to run the complete pipeline for action-recognition problem
1. Create a video player to play with target fps
2. Prepare a set of frames to be encoded-decoded
3. Preprocess frame before Encoder
4. Encoder Inference per frame
5. Decoder inference per set of frames
6. Visualize the results
:param: source: webcam "0" or video path
:param: flip: to be used by VideoPlayer function for flipping capture image
:param: use_popup: False for showing encoded frames over this notebook, True for creating a popup window.
:param: skip_first_frames: Number of frames to skip at the beginning of the video.
:returns: display video over the notebook or in a popup window
"""
size = height_en # Endoder input size - From Cell 5_9
sample_duration = frames2decode # Decoder input size - From Cell 5_7
# Select frames per second of your source
fps = 30
player = None
try:
# Create a video player
player = utils.VideoPlayer(source, flip=flip, fps=fps, skip_first_frames=skip_first_frames)
# Start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
processing_time = 0
encoder_output = []
decoded_labels = [0, 0, 0]
decoded_top_probs = [0, 0, 0]
counter = 0
# Create a text template to show inference results over video
text_inference_template = "Infer Time:{Time:.1f}ms,{fps:.1f}FPS"
text_template = "{label},{conf:.2f}%"
while True:
counter = counter + 1
# read a frame from the video stream
frame = player.next()
if frame is None:
print("Source ended")
break
scale = 1280 / max(frame.shape)
# Adaptative resize for visualization
if scale < 1:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
# Select one frame every two for processing through the encoder.
# After 16 frames are processed, the decoder will find the action,
# and the label will be printed over the frames.
if counter % 2 == 0:
# Preprocess frame before Encoder
(preprocessed, _) = preprocessing(frame, size)
# Measure processing time
start_time = time.time()
# Encoder Inference per frame
encoder_output.append(encoder(preprocessed, compiled_model_en))
# Decoder inference per set of frames
# Wait for sample duration to work with decoder model
if len(encoder_output) == sample_duration:
decoded_labels, decoded_top_probs = decoder(encoder_output, compiled_model_de)
encoder_output = []
# Inference has finished ... Let's to display results
stop_time = time.time()
# Calculate processing time
processing_times.append(stop_time - start_time)
# Use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
# Mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
# Visualize the results
for i in range(0, 3):
display_text = text_template.format(
label=decoded_labels[i],
conf=decoded_top_probs[i] * 100,
)
display_text_fnc(frame, display_text, i)
display_text = text_inference_template.format(Time=processing_time, fps=fps)
display_text_fnc(frame, display_text, 3)
# Use this workaround you experience flickering
if use_popup:
cv2.imshow(title, frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# Encode numpy array to jpg
_, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
# Create IPython image
i = display.Image(data=encoded_img)
# Display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# Any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Action Recognition on a Video File
video_file = "https://archive.org/serve/ISSVideoResourceLifeOnStation720p/ISS%20Video%20Resource_LifeOnStation_720p.mp4"
run_action_recognition(source=video_file, flip=False, use_popup=False, skip_first_frames=600)
# Run Action Recognition using your webcam
run_action_recognition(source=0, flip=False, use_popup=False, skip_first_frames=0)
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection #pose-estimation
402-pose-estimation-webcam: Live Human Pose Estimation with OpenVINO# Imports
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
from openvino.runtime import Core
from decoder import OpenPoseDecoder
sys.path.append("../utils")
import notebook_utils as utils
# Download the model
# directory where model will be downloaded
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "human-pose-estimation-0001"
# selected precision (FP32, FP16, FP16-INT8)
precision = "FP16-INT8"
model_path = f"model/intel/{model_name}/{precision}/{model_name}.xml"
model_weights_path = f"model/intel/{model_name}/{precision}/{model_name}.bin"
if not os.path.exists(model_path):
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir}"
! $download_command
# Load the model
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=model_path, weights=model_weights_path)
# load the model on the CPU (you can use GPU or MYRIAD as well)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input and output names of nodes
input_layer = compiled_model.input(0)
output_layers = list(compiled_model.outputs)
# get input size
height, width = list(input_layer.shape)[2:]
# Processing OpenPoseDecoder
decoder = OpenPoseDecoder()
# Process Results
# 2d pooling in numpy (from: htt11ps://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
"""
2D Pooling
Parameters:
A: input 2D array
kernel_size: int, the size of the window
stride: int, the stride of the window
padding: int, implicit zero paddings on both sides of the input
pool_mode: string, 'max' or 'avg'
"""
# Padding
A = np.pad(A, padding, mode="constant")
# Window view of A
output_shape = (
(A.shape[0] - kernel_size) // stride + 1,
(A.shape[1] - kernel_size) // stride + 1,
)
kernel_size = (kernel_size, kernel_size)
A_w = as_strided(
A,
shape=output_shape + kernel_size,
strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
)
A_w = A_w.reshape(-1, *kernel_size)
# Return the result of pooling
if pool_mode == "max":
return A_w.max(axis=(1, 2)).reshape(output_shape)
elif pool_mode == "avg":
return A_w.mean(axis=(1, 2)).reshape(output_shape)
# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
return heatmaps * (heatmaps == pooled_heatmaps)
# get poses from results
def process_results(img, pafs, heatmaps):
# this processing comes from
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
pooled_heatmaps = np.array(
[[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
)
nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)
# decode poses
poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
output_shape = list(compiled_model.output(index=0).partial_shape)
output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
# multiply coordinates by scaling factor
poses[:, :, :2] *= output_scale
return poses, scores
# Draw Pose Overlays
colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
(255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
(0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))
default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
(6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))
def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
if poses.size == 0:
return img
img_limbs = np.copy(img)
for pose in poses:
points = pose[:, :2].astype(np.int32)
points_scores = pose[:, 2]
# Draw joints.
for i, (p, v) in enumerate(zip(points, points_scores)):
if v > point_score_threshold:
cv2.circle(img, tuple(p), 1, colors[i], 2)
# Draw limbs.
for i, j in skeleton:
if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
return img
# Main Processing Function
# main processing function to run pose estimation
def run_pose_estimation(source=0, flip=False, use_popup=False, skip_first_frames=0):
pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
player = None
try:
# create video player to play with target fps
player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
# resize image and change dims to fit neural network input
# (see https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/human-pose-estimation-0001)
input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
# create batch of images (size = 1)
input_img = input_img.transpose((2,0,1))[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])
stop_time = time.time()
pafs = results[pafs_output_key]
heatmaps = results[heatmaps_output_key]
# get poses from network results
poses, scores = process_results(frame, pafs, heatmaps)
# draw poses on a frame
frame = draw_poses(frame, poses, 0.1)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(frame, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
cv2.FONT_HERSHEY_COMPLEX, f_width / 1000, (0, 0, 255), 1, cv2.LINE_AA)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(title, frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live Pose Estimation
run_pose_estimation(source=0, flip=True, use_popup=False)
# Run Pose Estimation on a Video File
video_file = "https://github.com/intel-iot-devkit/sample-videos/blob/master/store-aisle-detection.mp4?raw=true"
run_pose_estimation(video_file, flip=False, use_popup=False, skip_first_frames=500)
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection
401-object-detection-webcam: Live Object Detection with OpenVINO# Imports
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core
sys.path.append("../utils")
import notebook_utils as utils
# Download the Model
# directory where model will be downloaded
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "ssdlite_mobilenet_v2"
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--output_dir {base_model_dir} " \
f"--cache_dir {base_model_dir}"
! $download_command
# Convert the Model
precision = "FP16"
# output path for the conversion
converted_model_path = f"model/public/{model_name}/{precision}/{model_name}.xml"
if not os.path.exists(converted_model_path):
convert_command = f"omz_converter " \
f"--name {model_name} " \
f"--download_dir {base_model_dir} " \
f"--precisions {precision}"
! $convert_command
# Load the Model
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=converted_model_path)
# compile the model for the CPU (you can choose manually CPU, GPU, MYRIAD etc.)
# or let the engine choose the best available device (AUTO)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input and output nodes
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)
# get input size
height, width = list(input_layer.shape)[1:3]
# Process Results
# https://tech.amikelive.com/node-718/what-object-categories-labels-are-in-coco-dataset/
classes = [
"background", "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train",
"truck", "boat", "traffic light", "fire hydrant", "street sign", "stop sign",
"parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant",
"bear", "zebra", "giraffe", "hat", "backpack", "umbrella", "shoe", "eye glasses",
"handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite",
"baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle",
"plate", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
"couch", "potted plant", "bed", "mirror", "dining table", "window", "desk", "toilet",
"door", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven",
"toaster", "sink", "refrigerator", "blender", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush", "hair brush"
]
# colors for above classes (Rainbow Color Map)
colors = cv2.applyColorMap(
src=np.arange(0, 255, 255 / len(classes), dtype=np.float32).astype(np.uint8),
colormap=cv2.COLORMAP_RAINBOW,
).squeeze()
def process_results(frame, results, thresh=0.6):
# size of the original frame
h, w = frame.shape[:2]
# results is a tensor [1, 1, 100, 7]
results = results.squeeze()
boxes = []
labels = []
scores = []
for _, label, score, xmin, ymin, xmax, ymax in results:
# create a box with pixels coordinates from the box with normalized coordinates [0,1]
boxes.append(
tuple(map(int, (xmin * w, ymin * h, (xmax - xmin) * w, (ymax - ymin) * h)))
)
labels.append(int(label))
scores.append(float(score))
# apply non-maximum suppression to get rid of many overlapping entities
# see https://paperswithcode.com/method/non-maximum-suppression
# this algorithm returns indices of objects to keep
indices = cv2.dnn.NMSBoxes(
bboxes=boxes, scores=scores, score_threshold=thresh, nms_threshold=0.6
)
# if there are no boxes
if len(indices) == 0:
return []
# filter detected objects
return [(labels[idx], scores[idx], boxes[idx]) for idx in indices.flatten()]
def draw_boxes(frame, boxes):
for label, score, box in boxes:
# choose color for the label
color = tuple(map(int, colors[label]))
# draw box
x2 = box[0] + box[2]
y2 = box[1] + box[3]
cv2.rectangle(img=frame, pt1=box[:2], pt2=(x2, y2), color=color, thickness=3)
# draw label name inside the box
cv2.putText(
img=frame,
text=f"{classes[label]} {score:.2f}",
org=(box[0] + 10, box[1] + 30),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=frame.shape[1] / 1000,
color=color,
thickness=1,
lineType=cv2.LINE_AA,
)
return frame
# Main Processing Function
# main processing function to run object detection
def run_object_detection(source=0, flip=False, use_popup=False, skip_first_frames=0):
player = None
try:
# create video player to play with target fps
player = utils.VideoPlayer(
source=source, flip=flip, fps=30, skip_first_frames=skip_first_frames
)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(
winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE
)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(
src=frame,
dsize=None,
fx=scale,
fy=scale,
interpolation=cv2.INTER_AREA,
)
# resize image and change dims to fit neural network input
input_img = cv2.resize(
src=frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
# create batch of images (size = 1)
input_img = input_img[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])[output_layer]
stop_time = time.time()
# get poses from network results
boxes = process_results(frame=frame, results=results)
# draw boxes on a frame
frame = draw_boxes(frame=frame, boxes=boxes)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(
img=frame,
text=f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)",
org=(20, 40),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=f_width / 1000,
color=(0, 0, 255),
thickness=1,
lineType=cv2.LINE_AA,
)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(winname=title, mat=frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(
ext=".jpg", img=frame, params=[cv2.IMWRITE_JPEG_QUALITY, 100]
)
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live Object Detection
run_object_detection(source=0, flip=True, use_popup=False)
# Run Object Detection on a Video File
video_file = "../201-vision-monodepth/data/Coco Walking in Berkeley.mp4"
run_object_detection(source=video_file, flip=False, use_popup=False)
#python #openvino #openvino-notebooks #live-inference #ct-scan #deeplearning #accelerated-inference
210-ct-scan-live-inference: Live Inference and Benchmark CT-scan Data with OpenVINO# Imports
import os
import sys
import zipfile
from pathlib import Path
import numpy as np
from monai.transforms import LoadImage
from openvino.inference_engine import IECore
sys.path.append("../utils")
from models.custom_segmentation import SegmentationModel
from notebook_utils import benchmark_model, download_file, show_live_inference
# Settings
# The directory that contains the IR model (xml and bin) files
MODEL_PATH = "pretrained_model/quantized_unet_kits19.xml"
# Uncomment the next line to use the FP16 model instead of the quantized model
# MODEL_PATH = "pretrained_model/unet_kits19.xml"
# Benchmark Model Performance
ie = IECore()
# By default, benchmark on MULTI:CPU,GPU if a GPU is available, otherwise on CPU.
device = "MULTI:CPU,GPU" if "GPU" in ie.available_devices else "CPU"
# Uncomment one of the options below to benchmark on other devices
# device = "GPU"
# device = "CPU"
# device = "AUTO"
# Benchmark model
benchmark_model(model_path=MODEL_PATH, device=device, seconds=15)
# Download and Prepare Data
# Directory that contains the CT scan data. This directory should contain subdirectories
# case_00XXX where XXX is between 000 and 299
BASEDIR = Path("kits19_frames_1")
# The CT scan case number. For example: 16 for data from the case_00016 directory
# Currently only 117 is supported
CASE = 117
case_path = BASEDIR / f"case_{CASE:05d}"
if not case_path.exists():
filename = download_file(
f"https://storage.openvinotoolkit.org/data/test_data/openvino_notebooks/kits19/case_{CASE:05d}.zip"
)
with zipfile.ZipFile(filename, "r") as zip_ref:
zip_ref.extractall(path=BASEDIR)
os.remove(filename) # remove zipfile
print(f"Downloaded and extracted data for case_{CASE:05d}")
else:
print(f"Data for case_{CASE:05d} exists")
# Load model
ie = IECore()
segmentation_model = SegmentationModel(
ie=ie, model_path=Path(MODEL_PATH), sigmoid=True, rotate_and_flip=True
)
image_paths = sorted(case_path.glob("imaging_frames/*jpg"))
print(f"{case_path.name}, {len(image_paths)} images")
# Show Live Inference
# Possible options for device include "CPU", "GPU", "AUTO", "MULTI"
device = "MULTI:CPU,GPU" if "GPU" in ie.available_devices else "CPU"
reader = LoadImage(image_only=True, dtype=np.uint8)
show_live_inference(
ie=ie, image_paths=image_paths, model=segmentation_model, device=device, reader=reader
)
Sat Jun 18 2022 21:07:22 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/204-named-entity-recognition/204-named-entity-recognition.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #nlp #entity-recognition #bertSat Jun 18 2022 21:03:36 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/301-tensorflow-training-openvino/301-tensorflow-training-openvino.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #optimization #tensorflowSat Jun 18 2022 20:58:59 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/301-tensorflow-training-openvino/301-tensorflow-training-openvino-pot.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #optimization #tensorflowSat Jun 18 2022 20:53:15 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/302-pytorch-quantization-aware-training/302-pytorch-quantization-aware-training.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #quantization #nncf #optimization #pytorchSat Jun 18 2022 20:47:05 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/305-tensorflow-quantization-aware-training/305-tensorflow-quantization-aware-training.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #tensorflow #quantization #nncf #optimizationSat Jun 18 2022 20:42:14 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/405-paddle-ocr-webcam/405-paddle-ocr-webcam.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #ocr #paddle-paddle #paddle-ocr #nlpFri Jun 17 2022 09:51:37 GMT+0000 (Coordinated Universal Time)
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #object-detection #onnx #onnx-runtime #openvino-onnx-runtime #openvino-execution-provider-for-onnx #tiny-yolov4Fri Jun 17 2022 09:49:43 GMT+0000 (Coordinated Universal Time) https://github.com/microsoft/onnxruntime-inference-examples/blob/main/python/OpenVINO_EP/tiny_yolo_v2_object_detection/tiny_yolov2_obj_detection_sample.py
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference #object-detection #onnx #onnx-runtime #openvino-onnx-runtime #yolov2 #openvino-execution-provider-for-onnxFri Jun 17 2022 05:10:39 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/209-handwritten-ocr/209-handwritten-ocr.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #ocr #chinese #japanese #handwrittenFri Jun 17 2022 05:06:53 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/208-optical-character-recognition/208-optical-character-recognition.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #ocrFri Jun 17 2022 05:03:05 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/213-question-answering/213-question-answering.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp #question-answering #bertFri Jun 17 2022 04:58:24 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/211-speech-to-text/211-speech-to-text.ipynb
#python #openvino #openvino-notebooks #deeplearning #accelerated-inference ##nlp ##speechtotextFri Jun 17 2022 04:49:49 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/403-action-recognition-webcam/403-action-recognition-webcam.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #action-recognitionFri Jun 17 2022 04:28:34 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/402-pose-estimation-webcam/402-pose-estimation.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection #pose-estimationThu Jun 16 2022 14:49:51 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/401-object-detection-webcam/401-object-detection.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detectionThu Jun 16 2022 14:44:32 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/210-ct-scan-live-inference/210-ct-scan-live-inference.ipynb
#python #openvino #openvino-notebooks #live-inference #ct-scan #deeplearning #accelerated-inference

