# Imports
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
from openvino.runtime import Core
from decoder import OpenPoseDecoder
sys.path.append("../utils")
import notebook_utils as utils
# Download the model
# directory where model will be downloaded
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "human-pose-estimation-0001"
# selected precision (FP32, FP16, FP16-INT8)
precision = "FP16-INT8"
model_path = f"model/intel/{model_name}/{precision}/{model_name}.xml"
model_weights_path = f"model/intel/{model_name}/{precision}/{model_name}.bin"
if not os.path.exists(model_path):
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--precision {precision} " \
f"--output_dir {base_model_dir}"
! $download_command
# Load the model
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=model_path, weights=model_weights_path)
# load the model on the CPU (you can use GPU or MYRIAD as well)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
# get input and output names of nodes
input_layer = compiled_model.input(0)
output_layers = list(compiled_model.outputs)
# get input size
height, width = list(input_layer.shape)[2:]
# Processing OpenPoseDecoder
decoder = OpenPoseDecoder()
# Process Results
# 2d pooling in numpy (from: htt11ps://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
"""
2D Pooling
Parameters:
A: input 2D array
kernel_size: int, the size of the window
stride: int, the stride of the window
padding: int, implicit zero paddings on both sides of the input
pool_mode: string, 'max' or 'avg'
"""
# Padding
A = np.pad(A, padding, mode="constant")
# Window view of A
output_shape = (
(A.shape[0] - kernel_size) // stride + 1,
(A.shape[1] - kernel_size) // stride + 1,
)
kernel_size = (kernel_size, kernel_size)
A_w = as_strided(
A,
shape=output_shape + kernel_size,
strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
)
A_w = A_w.reshape(-1, *kernel_size)
# Return the result of pooling
if pool_mode == "max":
return A_w.max(axis=(1, 2)).reshape(output_shape)
elif pool_mode == "avg":
return A_w.mean(axis=(1, 2)).reshape(output_shape)
# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
return heatmaps * (heatmaps == pooled_heatmaps)
# get poses from results
def process_results(img, pafs, heatmaps):
# this processing comes from
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
pooled_heatmaps = np.array(
[[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
)
nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)
# decode poses
poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
output_shape = list(compiled_model.output(index=0).partial_shape)
output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
# multiply coordinates by scaling factor
poses[:, :, :2] *= output_scale
return poses, scores
# Draw Pose Overlays
colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
(255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
(0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))
default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
(6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))
def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
if poses.size == 0:
return img
img_limbs = np.copy(img)
for pose in poses:
points = pose[:, :2].astype(np.int32)
points_scores = pose[:, 2]
# Draw joints.
for i, (p, v) in enumerate(zip(points, points_scores)):
if v > point_score_threshold:
cv2.circle(img, tuple(p), 1, colors[i], 2)
# Draw limbs.
for i, j in skeleton:
if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
return img
# Main Processing Function
# main processing function to run pose estimation
def run_pose_estimation(source=0, flip=False, use_popup=False, skip_first_frames=0):
pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
player = None
try:
# create video player to play with target fps
player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
# resize image and change dims to fit neural network input
# (see https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/human-pose-estimation-0001)
input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
# create batch of images (size = 1)
input_img = input_img.transpose((2,0,1))[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])
stop_time = time.time()
pafs = results[pafs_output_key]
heatmaps = results[heatmaps_output_key]
# get poses from network results
poses, scores = process_results(frame, pafs, heatmaps)
# draw poses on a frame
frame = draw_poses(frame, poses, 0.1)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(frame, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
cv2.FONT_HERSHEY_COMPLEX, f_width / 1000, (0, 0, 255), 1, cv2.LINE_AA)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(title, frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
# Run Live Pose Estimation
run_pose_estimation(source=0, flip=True, use_popup=False)
# Run Pose Estimation on a Video File
video_file = "https://github.com/intel-iot-devkit/sample-videos/blob/master/store-aisle-detection.mp4?raw=true"
run_pose_estimation(video_file, flip=False, use_popup=False, skip_first_frames=500)
Comments