#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #action-recognition
403-action-recognition-webcam: Human Action Recognition with OpenVINO# Imports
import collections
import os
import sys
import time
from typing import Tuple, List
import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core
from openvino.runtime.ie_api import CompiledModel
sys.path.append("../utils")
import notebook_utils as utils
# Download the models
# Directory where model will be downloaded
base_model_dir = "model"
# Model name as named in Open Model Zoo
model_name = "action-recognition-0001"
# Selected precision (FP32, FP16, FP16-INT8)
precision = "FP16"
model_path_decoder = (
f"model/intel/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"
)
model_path_encoder = (
f"model/intel/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
)
if not os.path.exists(model_path_decoder) or not os.path.exists(model_path_encoder):
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir}"
! $download_command
# Load your labels
labels = "data/kinetics.txt"
with open(labels) as f:
labels = [line.strip() for line in f]
print(labels[0:9], np.shape(labels))
# Model Initialization function
# Initialize inference engine
ie_core = Core()
def model_init(model_path: str) -> Tuple:
"""
Read the network and weights from file, load the
model on the CPU and get input and output names of nodes
:param: model: model architecture path *.xml
:retuns:
compiled_model: Compiled model
input_key: Input node for model
output_key: Output node for model
"""
# Read the network and corresponding weights from file
model = ie_core.read_model(model=model_path)
# compile the model for the CPU (you can use GPU or MYRIAD as well)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# Get input and output names of nodes
input_keys = compiled_model.input(0)
output_keys = compiled_model.output(0)
return input_keys, output_keys, compiled_model
# Initialization for Encoder and Decoder
# Encoder initialization
input_key_en, output_keys_en, compiled_model_en = model_init(model_path_encoder)
# Decoder initialization
input_key_de, output_keys_de, compiled_model_de = model_init(model_path_decoder)
# Get input size - Encoder
height_en, width_en = list(input_key_en.shape)[2:]
# Get input size - Decoder
frames2decode = list(input_key_de.shape)[0:][1]
# Helper functions
def center_crop(frame: np.ndarray) -> np.ndarray:
"""
Center crop squared the original frame to standardize the input image to the encoder model
:param frame: input frame
:returns: center-crop-squared frame
"""
img_h, img_w, _ = frame.shape
min_dim = min(img_h, img_w)
start_x = int((img_w - min_dim) / 2.0)
start_y = int((img_h - min_dim) / 2.0)
roi = [start_y, (start_y + min_dim), start_x, (start_x + min_dim)]
return frame[start_y : (start_y + min_dim), start_x : (start_x + min_dim), ...], roi
def adaptive_resize(frame: np.ndarray, size: int) -> np.ndarray:
"""
The frame going to be resized to have a height of size or a width of size
:param frame: input frame
:param size: input size to encoder model
:returns: resized frame, np.array type
"""
h, w, _ = frame.shape
scale = size / min(h, w)
w_scaled, h_scaled = int(w * scale), int(h * scale)
if w_scaled == w and h_scaled == h:
return frame
return cv2.resize(frame, (w_scaled, h_scaled))
def decode_output(probs: np.ndarray, labels: np.ndarray, top_k: int = 3) -> np.ndarray:
"""
Decodes top probabilities into corresponding label names
:param probs: confidence vector for 400 actions
:param labels: list of actions
:param top_k: The k most probable positions in the list of labels
:returns: decoded_labels: The k most probable actions from the labels list
decoded_top_probs: confidence for the k most probable actions
"""
top_ind = np.argsort(-1 * probs)[:top_k]
out_label = np.array(labels)[top_ind.astype(int)]
decoded_labels = [out_label[0][0], out_label[0][1], out_label[0][2]]
top_probs = np.array(probs)[0][top_ind.astype(int)]
decoded_top_probs = [top_probs[0][0], top_probs[0][1], top_probs[0][2]]
return decoded_labels, decoded_top_probs
def rec_frame_display(frame: np.ndarray, roi) -> np.ndarray:
"""
Draw a rec frame over actual frame
:param frame: input frame
:param roi: Region of interest, image section processed by the Encoder
:returns: frame with drawed shape
"""
cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 3, roi[0] + 100), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 100, roi[0] + 3), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 3, roi[1] - 100), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 100, roi[1] - 3), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 3, roi[0] + 100), (0, 200, 0), 2)
cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 100, roi[0] + 3), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 3, roi[1] - 100), (0, 200, 0), 2)
cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 100, roi[1] - 3), (0, 200, 0), 2)
# Write ROI over actual frame
FONT_STYLE = cv2.FONT_HERSHEY_SIMPLEX
org = (roi[2] + 3, roi[1] - 3)
org2 = (roi[2] + 2, roi[1] - 2)
FONT_SIZE = 0.5
FONT_COLOR = (0, 200, 0)
FONT_COLOR2 = (0, 0, 0)
cv2.putText(frame, "ROI", org2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
cv2.putText(frame, "ROI", org, FONT_STYLE, FONT_SIZE, FONT_COLOR)
return frame
def display_text_fnc(frame: np.ndarray, display_text: str, index: int):
"""
Include text on the analized frame
:param frame: input frame
:param display_text: text to add on the frame
:param index: index line dor adding text
"""
# Configuration for displaying images with text
FONT_COLOR = (255, 255, 255)
FONT_COLOR2 = (0, 0, 0)
FONT_STYLE = cv2.FONT_HERSHEY_DUPLEX
FONT_SIZE = 0.7
TEXT_VERTICAL_INTERVAL = 25
TEXT_LEFT_MARGIN = 15
# ROI over actual frame
(processed, roi) = center_crop(frame)
# Draw a ROI over actual frame
frame = rec_frame_display(frame, roi)
# Put text over actual frame
text_loc = (TEXT_LEFT_MARGIN, TEXT_VERTICAL_INTERVAL * (index + 1))
text_loc2 = (TEXT_LEFT_MARGIN + 1, TEXT_VERTICAL_INTERVAL * (index + 1) + 1)
cv2.putText(frame, display_text, text_loc2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
cv2.putText(frame, display_text, text_loc, FONT_STYLE, FONT_SIZE, FONT_COLOR)
# AI Functions
def preprocessing(frame: np.ndarray, size: int) -> np.ndarray:
"""
Preparing frame before Encoder.
The image should be scaled to its shortest dimension at "size"
and cropped, centered, and squared so that both width and
height have lengths "size". Frame must be transposed from
Height-Width-Channels (HWC) to Channels-Height-Width (CHW).
:param frame: input frame
:param size: input size to encoder model
:returns: resized and cropped frame
"""
# Adaptative resize
preprocessed = adaptive_resize(frame, size)
# Center_crop
(preprocessed, roi) = center_crop(preprocessed)
# Transpose frame HWC -> CHW
preprocessed = preprocessed.transpose((2, 0, 1))[None,] # HWC -> CHW
return preprocessed, roi
def encoder(
preprocessed: np.ndarray,
compiled_model: CompiledModel
) -> List:
"""
Encoder Inference per frame. This function calls the network previously
configured for the encoder model (compiled_model), extracts the data
from the output node, and appends it in an array to be used by the decoder.
:param: preprocessed: preprocessing frame
:param: compiled_model: Encoder model network
:returns: encoder_output: embedding layer that is appended with each arriving frame
"""
output_key_en = compiled_model.output(0)
# Get results on action-recognition-0001-encoder model
infer_result_encoder = compiled_model([preprocessed])[output_key_en]
return infer_result_encoder
def decoder(encoder_output: List, compiled_model_de: CompiledModel) -> List:
"""
Decoder inference per set of frames. This function concatenates the embedding layer
froms the encoder output, transpose the array to match with the decoder input size.
Calls the network previously configured for the decoder model (compiled_model_de), extracts
the logits and normalize those to get confidence values along specified axis.
Decodes top probabilities into corresponding label names
:param: encoder_output: embedding layer for 16 frames
:param: compiled_model_de: Decoder model network
:returns: decoded_labels: The k most probable actions from the labels list
decoded_top_probs: confidence for the k most probable actions
"""
# Concatenate sample_duration frames in just one array
decoder_input = np.concatenate(encoder_output, axis=0)
# Organize input shape vector to the Decoder (shape: [1x16x512]]
decoder_input = decoder_input.transpose((2, 0, 1, 3))
decoder_input = np.squeeze(decoder_input, axis=3)
output_key_de = compiled_model_de.output(0)
# Get results on action-recognition-0001-decoder model
result_de = compiled_model_de([decoder_input])[output_key_de]
# Normalize logits to get confidence values along specified axis
probs = softmax(result_de - np.max(result_de))
# Decodes top probabilities into corresponding label names
decoded_labels, decoded_top_probs = decode_output(probs, labels, top_k=3)
return decoded_labels, decoded_top_probs
def softmax(x: np.ndarray) -> np.ndarray:
"""
Normalizes logits to get confidence values along specified axis
x: np.array, axis=None
"""
exp = np.exp(x)
return exp / np.sum(exp, axis=None)
# Main Processing Function
def run_action_recognition(
source: str = "0",
flip: bool = True,
use_popup: bool = False,
compiled_model_en: CompiledModel = compiled_model_en,
compiled_model_de: CompiledModel = compiled_model_de,
skip_first_frames: int = 0,
):
"""
Use the "source" webcam or video file to run the complete pipeline for action-recognition problem
1. Create a video player to play with target fps
2. Prepare a set of frames to be encoded-decoded
3. Preprocess frame before Encoder
4. Encoder Inference per frame
5. Decoder inference per set of frames
6. Visualize the results
:param: source: webcam "0" or video path
:param: flip: to be used by VideoPlayer function for flipping capture image
:param: use_popup: False for showing encoded frames over this notebook, True for creating a popup window.
:param: skip_first_frames: Number of frames to skip at the beginning of the video.
:returns: display video over the notebook or in a popup window
"""
size = height_en # Endoder input size - From Cell 5_9
sample_duration = frames2decode # Decoder input size - From Cell 5_7
# Select frames per second of your source
fps = 30
player = None
try:
# Create a video player
player = utils.VideoPlayer(source, flip=flip, fps=fps, skip_first_frames=skip_first_frames)
# Start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
processing_time = 0
encoder_output = []
decoded_labels = [0, 0, 0]
decoded_top_probs = [0, 0, 0]
counter = 0
# Create a text template to show inference results over video
text_inference_template = "Infer Time:{Time:.1f}ms,{fps:.1f}FPS"
text_template = "{label},{conf:.2f}%"
while True:
counter = counter + 1
# read a frame from the video stream
frame = player.next()
if frame is None:
print("Source ended")
break
scale = 1280 / max(frame.shape)
# Adaptative resize for visualization
if scale < 1:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
# Select one frame every two for processing through the encoder.
# After 16 frames are processed, the decoder will find the action,
# and the label will be printed over the frames.
if counter % 2 == 0:
# Preprocess frame before Encoder
(preprocessed, _) = preprocessing(frame, size)
# Measure processing time
start_time = time.time()
# Encoder Inference per frame
encoder_output.append(encoder(preprocessed, compiled_model_en))
# Decoder inference per set of frames
# Wait for sample duration to work with decoder model
if len(encoder_output) == sample_duration:
decoded_labels, decoded_top_probs = decoder(encoder_output, compiled_model_de)
encoder_output = []
# Inference has finished ... Let's to display results
stop_time = time.time()
# Calculate processing time
processing_times.append(stop_time - start_time)
# Use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
# Mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
# Visualize the results
for i in range(0, 3):
display_text = text_template.format(
label=decoded_labels[i],
conf=decoded_top_probs[i] * 100,
)
display_text_fnc(frame, display_text, i)
display_text = text_inference_template.format(Time=processing_time, fps=fps)
display_text_fnc(frame, display_text, 3)
# Use this workaround you experience flickering
if use_popup:
cv2.imshow(title, frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# Encode numpy array to jpg
_, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
# Create IPython image
i = display.Image(data=encoded_img)
# Display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# Any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Action Recognition on a Video File
video_file = "https://archive.org/serve/ISSVideoResourceLifeOnStation720p/ISS%20Video%20Resource_LifeOnStation_720p.mp4"
run_action_recognition(source=video_file, flip=False, use_popup=False, skip_first_frames=600)
# Run Action Recognition using your webcam
run_action_recognition(source=0, flip=False, use_popup=False, skip_first_frames=0)
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection #pose-estimation
402-pose-estimation-webcam: Live Human Pose Estimation with OpenVINO# Imports
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
from openvino.runtime import Core
from decoder import OpenPoseDecoder
sys.path.append("../utils")
import notebook_utils as utils
# Download the model
# directory where model will be downloaded
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "human-pose-estimation-0001"
# selected precision (FP32, FP16, FP16-INT8)
precision = "FP16-INT8"
model_path = f"model/intel/{model_name}/{precision}/{model_name}.xml"
model_weights_path = f"model/intel/{model_name}/{precision}/{model_name}.bin"
if not os.path.exists(model_path):
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir}"
! $download_command
# Load the model
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=model_path, weights=model_weights_path)
# load the model on the CPU (you can use GPU or MYRIAD as well)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input and output names of nodes
input_layer = compiled_model.input(0)
output_layers = list(compiled_model.outputs)
# get input size
height, width = list(input_layer.shape)[2:]
# Processing OpenPoseDecoder
decoder = OpenPoseDecoder()
# Process Results
# 2d pooling in numpy (from: htt11ps://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
"""
2D Pooling
Parameters:
A: input 2D array
kernel_size: int, the size of the window
stride: int, the stride of the window
padding: int, implicit zero paddings on both sides of the input
pool_mode: string, 'max' or 'avg'
"""
# Padding
A = np.pad(A, padding, mode="constant")
# Window view of A
output_shape = (
(A.shape[0] - kernel_size) // stride + 1,
(A.shape[1] - kernel_size) // stride + 1,
)
kernel_size = (kernel_size, kernel_size)
A_w = as_strided(
A,
shape=output_shape + kernel_size,
strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
)
A_w = A_w.reshape(-1, *kernel_size)
# Return the result of pooling
if pool_mode == "max":
return A_w.max(axis=(1, 2)).reshape(output_shape)
elif pool_mode == "avg":
return A_w.mean(axis=(1, 2)).reshape(output_shape)
# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
return heatmaps * (heatmaps == pooled_heatmaps)
# get poses from results
def process_results(img, pafs, heatmaps):
# this processing comes from
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
pooled_heatmaps = np.array(
[[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
)
nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)
# decode poses
poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
output_shape = list(compiled_model.output(index=0).partial_shape)
output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
# multiply coordinates by scaling factor
poses[:, :, :2] *= output_scale
return poses, scores
# Draw Pose Overlays
colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
(255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
(0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))
default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
(6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))
def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
if poses.size == 0:
return img
img_limbs = np.copy(img)
for pose in poses:
points = pose[:, :2].astype(np.int32)
points_scores = pose[:, 2]
# Draw joints.
for i, (p, v) in enumerate(zip(points, points_scores)):
if v > point_score_threshold:
cv2.circle(img, tuple(p), 1, colors[i], 2)
# Draw limbs.
for i, j in skeleton:
if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
return img
# Main Processing Function
# main processing function to run pose estimation
def run_pose_estimation(source=0, flip=False, use_popup=False, skip_first_frames=0):
pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
player = None
try:
# create video player to play with target fps
player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
# resize image and change dims to fit neural network input
# (see https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/human-pose-estimation-0001)
input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
# create batch of images (size = 1)
input_img = input_img.transpose((2,0,1))[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])
stop_time = time.time()
pafs = results[pafs_output_key]
heatmaps = results[heatmaps_output_key]
# get poses from network results
poses, scores = process_results(frame, pafs, heatmaps)
# draw poses on a frame
frame = draw_poses(frame, poses, 0.1)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(frame, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
cv2.FONT_HERSHEY_COMPLEX, f_width / 1000, (0, 0, 255), 1, cv2.LINE_AA)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(title, frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live Pose Estimation
run_pose_estimation(source=0, flip=True, use_popup=False)
# Run Pose Estimation on a Video File
video_file = "https://github.com/intel-iot-devkit/sample-videos/blob/master/store-aisle-detection.mp4?raw=true"
run_pose_estimation(video_file, flip=False, use_popup=False, skip_first_frames=500)
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection
401-object-detection-webcam: Live Object Detection with OpenVINO# Imports
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core
sys.path.append("../utils")
import notebook_utils as utils
# Download the Model
# directory where model will be downloaded
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "ssdlite_mobilenet_v2"
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--output_dir {base_model_dir} " \
f"--cache_dir {base_model_dir}"
! $download_command
# Convert the Model
precision = "FP16"
# output path for the conversion
converted_model_path = f"model/public/{model_name}/{precision}/{model_name}.xml"
if not os.path.exists(converted_model_path):
convert_command = f"omz_converter " \
f"--name {model_name} " \
f"--download_dir {base_model_dir} " \
f"--precisions {precision}"
! $convert_command
# Load the Model
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=converted_model_path)
# compile the model for the CPU (you can choose manually CPU, GPU, MYRIAD etc.)
# or let the engine choose the best available device (AUTO)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input and output nodes
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)
# get input size
height, width = list(input_layer.shape)[1:3]
# Process Results
# https://tech.amikelive.com/node-718/what-object-categories-labels-are-in-coco-dataset/
classes = [
"background", "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train",
"truck", "boat", "traffic light", "fire hydrant", "street sign", "stop sign",
"parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant",
"bear", "zebra", "giraffe", "hat", "backpack", "umbrella", "shoe", "eye glasses",
"handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite",
"baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle",
"plate", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
"couch", "potted plant", "bed", "mirror", "dining table", "window", "desk", "toilet",
"door", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven",
"toaster", "sink", "refrigerator", "blender", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush", "hair brush"
]
# colors for above classes (Rainbow Color Map)
colors = cv2.applyColorMap(
src=np.arange(0, 255, 255 / len(classes), dtype=np.float32).astype(np.uint8),
colormap=cv2.COLORMAP_RAINBOW,
).squeeze()
def process_results(frame, results, thresh=0.6):
# size of the original frame
h, w = frame.shape[:2]
# results is a tensor [1, 1, 100, 7]
results = results.squeeze()
boxes = []
labels = []
scores = []
for _, label, score, xmin, ymin, xmax, ymax in results:
# create a box with pixels coordinates from the box with normalized coordinates [0,1]
boxes.append(
tuple(map(int, (xmin * w, ymin * h, (xmax - xmin) * w, (ymax - ymin) * h)))
)
labels.append(int(label))
scores.append(float(score))
# apply non-maximum suppression to get rid of many overlapping entities
# see https://paperswithcode.com/method/non-maximum-suppression
# this algorithm returns indices of objects to keep
indices = cv2.dnn.NMSBoxes(
bboxes=boxes, scores=scores, score_threshold=thresh, nms_threshold=0.6
)
# if there are no boxes
if len(indices) == 0:
return []
# filter detected objects
return [(labels[idx], scores[idx], boxes[idx]) for idx in indices.flatten()]
def draw_boxes(frame, boxes):
for label, score, box in boxes:
# choose color for the label
color = tuple(map(int, colors[label]))
# draw box
x2 = box[0] + box[2]
y2 = box[1] + box[3]
cv2.rectangle(img=frame, pt1=box[:2], pt2=(x2, y2), color=color, thickness=3)
# draw label name inside the box
cv2.putText(
img=frame,
text=f"{classes[label]} {score:.2f}",
org=(box[0] + 10, box[1] + 30),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=frame.shape[1] / 1000,
color=color,
thickness=1,
lineType=cv2.LINE_AA,
)
return frame
# Main Processing Function
# main processing function to run object detection
def run_object_detection(source=0, flip=False, use_popup=False, skip_first_frames=0):
player = None
try:
# create video player to play with target fps
player = utils.VideoPlayer(
source=source, flip=flip, fps=30, skip_first_frames=skip_first_frames
)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(
winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE
)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(
src=frame,
dsize=None,
fx=scale,
fy=scale,
interpolation=cv2.INTER_AREA,
)
# resize image and change dims to fit neural network input
input_img = cv2.resize(
src=frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
# create batch of images (size = 1)
input_img = input_img[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])[output_layer]
stop_time = time.time()
# get poses from network results
boxes = process_results(frame=frame, results=results)
# draw boxes on a frame
frame = draw_boxes(frame=frame, boxes=boxes)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(
img=frame,
text=f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)",
org=(20, 40),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=f_width / 1000,
color=(0, 0, 255),
thickness=1,
lineType=cv2.LINE_AA,
)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(winname=title, mat=frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(
ext=".jpg", img=frame, params=[cv2.IMWRITE_JPEG_QUALITY, 100]
)
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live Object Detection
run_object_detection(source=0, flip=True, use_popup=False)
# Run Object Detection on a Video File
video_file = "../201-vision-monodepth/data/Coco Walking in Berkeley.mp4"
run_object_detection(source=video_file, flip=False, use_popup=False)
#python #openvino #openvino-notebooks #live-inference #ct-scan #deeplearning #accelerated-inference
210-ct-scan-live-inference: Live Inference and Benchmark CT-scan Data with OpenVINO# Imports
import os
import sys
import zipfile
from pathlib import Path
import numpy as np
from monai.transforms import LoadImage
from openvino.inference_engine import IECore
sys.path.append("../utils")
from models.custom_segmentation import SegmentationModel
from notebook_utils import benchmark_model, download_file, show_live_inference
# Settings
# The directory that contains the IR model (xml and bin) files
MODEL_PATH = "pretrained_model/quantized_unet_kits19.xml"
# Uncomment the next line to use the FP16 model instead of the quantized model
# MODEL_PATH = "pretrained_model/unet_kits19.xml"
# Benchmark Model Performance
ie = IECore()
# By default, benchmark on MULTI:CPU,GPU if a GPU is available, otherwise on CPU.
device = "MULTI:CPU,GPU" if "GPU" in ie.available_devices else "CPU"
# Uncomment one of the options below to benchmark on other devices
# device = "GPU"
# device = "CPU"
# device = "AUTO"
# Benchmark model
benchmark_model(model_path=MODEL_PATH, device=device, seconds=15)
# Download and Prepare Data
# Directory that contains the CT scan data. This directory should contain subdirectories
# case_00XXX where XXX is between 000 and 299
BASEDIR = Path("kits19_frames_1")
# The CT scan case number. For example: 16 for data from the case_00016 directory
# Currently only 117 is supported
CASE = 117
case_path = BASEDIR / f"case_{CASE:05d}"
if not case_path.exists():
filename = download_file(
f"https://storage.openvinotoolkit.org/data/test_data/openvino_notebooks/kits19/case_{CASE:05d}.zip"
)
with zipfile.ZipFile(filename, "r") as zip_ref:
zip_ref.extractall(path=BASEDIR)
os.remove(filename) # remove zipfile
print(f"Downloaded and extracted data for case_{CASE:05d}")
else:
print(f"Data for case_{CASE:05d} exists")
# Load model
ie = IECore()
segmentation_model = SegmentationModel(
ie=ie, model_path=Path(MODEL_PATH), sigmoid=True, rotate_and_flip=True
)
image_paths = sorted(case_path.glob("imaging_frames/*jpg"))
print(f"{case_path.name}, {len(image_paths)} images")
# Show Live Inference
# Possible options for device include "CPU", "GPU", "AUTO", "MULTI"
device = "MULTI:CPU,GPU" if "GPU" in ie.available_devices else "CPU"
reader = LoadImage(image_only=True, dtype=np.uint8)
show_live_inference(
ie=ie, image_paths=image_paths, model=segmentation_model, device=device, reader=reader
)
Fri Jun 17 2022 04:49:49 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/403-action-recognition-webcam/403-action-recognition-webcam.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #action-recognitionFri Jun 17 2022 04:28:34 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/402-pose-estimation-webcam/402-pose-estimation.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detection #pose-estimationThu Jun 16 2022 14:49:51 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/401-object-detection-webcam/401-object-detection.ipynb
#python #openvino #openvino-notebooks #live-inference #deeplearning #accelerated-inference #object-detectionThu Jun 16 2022 14:44:32 GMT+0000 (Coordinated Universal Time) https://github.com/openvinotoolkit/openvino_notebooks/blob/main/notebooks/210-ct-scan-live-inference/210-ct-scan-live-inference.ipynb
#python #openvino #openvino-notebooks #live-inference #ct-scan #deeplearning #accelerated-inference

