Issue
I am referring to this repo to adapt mmaction2 grad-cam demo from short video offline inference to long video online inference. The script is shown below:
Note: to make this script can be easily reproduce, i comment out some codes that needs many dependencies.
import cv2
import numpy as np
import torchvision.transforms as transforms
import sys
from PIL import Image
#from mmaction.apis import init_recognizer
#from utils.gradcam_utils import GradCAM
import torch
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# sys.path.append('./utils')
async def preprocess_img(arr):
image = Image.fromarray(np.uint8(arr))
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
transform = transforms.Compose([
transforms.Resize((model_input_height, model_input_width)),
transforms.ToTensor(),
transforms.Normalize(mean, std, inplace=False),
])
normalized_img = transform(image)
img_np = normalized_img.numpy()
return img_np
async def inference(frame_buffer):
print("starting inference")
# inputs = {}
# input_tensor = torch.from_numpy(frame_buffer).type(torch.FloatTensor)
# input_cuda_tensor = input_tensor.cuda()
# inputs['imgs'] = input_cuda_tensor
# results = gradcam(inputs)
# display_buffer = np.squeeze(results[0].cpu().detach().numpy(), axis=0)
# return display_buffer
async def run_blocking_func(loop_, queue_, frame_buffer):
with ProcessPoolExecutor() as pool:
blocking_func = partial(inference, frame_buffer)
frame = await loop_.run_in_executor(pool, blocking_func)
print(frame)
await queue_.put(frame)
await asyncio.sleep(0.01)
async def get_frames(capture):
capture.grab()
ret, frame = capture.retrieve()
if not ret:
print("empty frame")
return
for i in range(32):
img = await preprocess_img(frame)
expandimg = np.expand_dims(img, axis=(0, 1, 3))
print(f'expandimg.shape{expandimg.shape}')
frame_buffer[:, :, :, i, :, :] = expandimg[:, :, :, 0, :, :]
return frame_buffer
async def show_frame(queue_: asyncio.LifoQueue):
display_buffer = await queue_.get()
for i in range(32):
blended_image = display_buffer[i, :, :, :]
cv2.imshow('Grad-CAM VIS', blended_image)
if cv2.waitKey(10) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
break
async def produce(loop_, queue_, cap):
while True:
frame_buffer = await asyncio.create_task(get_frames(cap))
# Apply Grad-CAM
display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
await queue_.put(display_buffer)
async def consume(queue_):
while True:
if queue_.qsize():
task1 = asyncio.create_task(show_frame(queue_))
await asyncio.wait(task1)
if cv2.waitKey(1) == 27:
break
else:
await asyncio.sleep(0.01)
async def run(loop_, queue_, cap_):
producer_task = asyncio.create_task(produce(loop_, queue_, cap_))
consumer_task = asyncio.create_task(consume(queue_))
await asyncio.gather(producer_task, consumer_task)
if __name__ == '__main__':
# config = '/home/user/Repo/mmaction2/configs/recognition/i3d/i3d_r50_video_inference_32x2x1_100e_kinetics400_rgb.py'
# checkpoint = '/home/user/Repo/mmaction2/checkpoints/i3d_r50_video_32x2x1_100e_kinetics400_rgb_20200826-e31c6f52.pth'
# device = torch.device('cuda:0')
# model = init_recognizer(config, checkpoint, device=device, use_frames=False)
video_path = 'replace_with_your_video.mp4'
model_input_height = 256
model_input_width = 340
# target_layer_name = 'backbone/layer4/1/relu'
# gradcam = GradCAM(model, target_layer_name)
cap = cv2.VideoCapture(video_path)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) # float
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) # float
frame_buffer = np.zeros((1, 1, 3, 32, model_input_height, model_input_width))
display_buffer = np.zeros((32, model_input_height, model_input_width, 3)) # (32, 256, 340, 3)
loop = asyncio.get_event_loop()
queue = asyncio.LifoQueue(maxsize=2)
try:
loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
finally:
print("shutdown service")
loop.close()
But when i run it, it reports following error :
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/user/miniconda3/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
exception=exception))
File "/home/user/miniconda3/lib/python3.7/multiprocessing/queues.py", line 358, in put
obj = _ForkingPickler.dumps(obj)
File "/home/user/miniconda3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 120, in <module>
loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
File "/home/user/miniconda3/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
return future.result()
File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 94, in run
await asyncio.gather(producer_task, consumer_task)
File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 76, in produce
display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 42, in run_blocking_func
frame = await loop_.run_in_executor(pool, blocking_func)
TypeError: can't pickle coroutine objects
Task was destroyed but it is pending!
task: <Task pending coro=<consume() running at /home/user/Repo/Python-AI-Action-Utils/temp2.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7cf1418cd0>()]> cb=[gather.<locals>._done_callback() at /home/user/miniconda3/lib/python3.7/asyncio/tasks.py:691]>
Process finished with exit code 1
Solution
If you use run_in_executor
, target function should not be async
. You need to remove async
keyword before def inference()
.
Answered By - alex_noname
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.