import cv2 as cv
import numpy as np
import math
import requests
import os
from datetime import datetime
from collections import deque
class ProcessVideo:
def __init__(self, rtsp, camid, instanceId, config):
self.rtsp = rtsp
self.camid = camid
self.instanceId = instanceId
self.config = config
self.left_x = 0
self.right_x = 0
self.top_y = 0
self.bottom_y = 0
self.pts = deque(maxlen=64)
self.day_time = None
self.warn_filename = None
self.error_filename = None
self.cur_time = None
def process_video(self):
path = "./log"
if not os.path.exists(path):
self.mkdir(path)
cap = cv.VideoCapture(self.rtsp)
if not cap.isOpened():
# print("Unable to open this video.")
# exit(-1)
msg = f"在{datetime.now().isoformat().replace(':', '-').replace('T', ' ').split('.')[0]}时,摄像头{self.camid}无法连接rtsp视频流。\n"
print(msg, end="")
self.error_filename = path+'/'+datetime.now().isoformat().replace(':', '-').replace('T', ' ').split(' ')[0]+" error.txt"
self.create_or_write_file(self.error_filename, msg)
raise Exception(msg)
ret, first_frame = cap.read()
cv.namedWindow(f"cam{self.camid}")
# cv.setMouseCallback("video", on_mouse, first_frame)
width = cap.get(cv.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv.CAP_PROP_FPS)
self.left_x = int(width/9)
self.right_x = int(width/9*8)
self.top_y = int(height/9)
self.bottom_y = int(height/9*8)
mog = cv.createBackgroundSubtractorMOG2()
kernel = cv.getStructuringElement(cv.MORPH_RECT, (5, 5))
first_frame = self.process_frame(first_frame)
fgMask = mog.apply(first_frame)
fgMask = cv.morphologyEx(fgMask, cv.MORPH_OPEN, kernel)
fgMask = cv.morphologyEx(fgMask, cv.MORPH_CLOSE, kernel)
contours, hierarchy = cv.findContours(fgMask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
center_before = [[(cv.boundingRect(cnt))] for cnt in contours]
cnt_before = [[cnt] for cnt in contours]
lens = []
while True:
self.day_time = datetime.now().isoformat().replace(':', '-').replace('T', ' ').split(' ')[0]
self.cur_time = datetime.now().isoformat().replace(':', '-').replace('T', ' ').split('.')[0]
self.warn_filename = path+'/'+self.day_time+" warn.log"
self.error_filename = path+'/'+self.day_time+" error.log"
self.create_or_write_file(self.warn_filename, "")
self.create_or_write_file(self.error_filename, "")
ret, frame = cap.read()
if not ret:
msg = f"在{self.cur_time}时,摄像头{self.camid}rtsp视频流中断。\n"
self.create_or_write_file(self.error_filename, msg)
raise Exception(f"摄像头{self.camid}rtsp视频流中断!")
last_day_time = self.day_time
mask = self.process_frame(frame)
fgMask = mog.apply(mask, 0.01)
fgMask = cv.morphologyEx(fgMask, cv.MORPH_OPEN, kernel)
fgMask = cv.dilate(fgMask, kernel, iterations = 7)
#fgMask = cv.morphologyEx(fgMask, cv.MORPH_CLOSE, kernel)
contours, hierarchy = cv.findContours(fgMask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
center_cur = []
cnt_cur = []
cv.rectangle(frame,(self.left_x, self.top_y), (self.right_x, self.bottom_y), (0, 0, 255), 2)
for cnt in contours:
if cv.contourArea(cnt):
cnt_cur.append(cnt)
x, y, w, h = cv.boundingRect(cnt)
if self.left_x < x+w//2 and x + w//2 < self.right_x and self.top_y < y+h//2 and y+h//2 < self.bottom_y:
self.draw_trajectory(frame, cv.boundingRect(cnt))
lens.append((len(self.pts)))
if (len(lens) == 10):
if lens[0] == lens[3]:
self.pts.clear()
lens.clear()
cv.imshow(f"cam{self.camid}", frame)
# cv.imshow("FG Mask", fgMask)
if cv.waitKey(1) & 0xff == 27:
break
def process_frame(self, frame):
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
binary = cv.adaptiveThreshold(gray, 255, cv.ADAPTIVE_THRESH_GAUSSIAN_C, cv.THRESH_BINARY, 25, 7)
return binary
def draw_trajectory(self, frame, boundingBox):
self.pts.appendleft(boundingBox)
for i in range(0, len(self.pts)):
cv.rectangle(frame, self.pts[i], (0, 0, 255), 2)
if (0 <= self.bottom_y-boundingBox[1]-boundingBox[3]//2 <= 50 or 0 <= boundingBox[0]+boundingBox[2]//2-self.left_x <= 50 or 0 <= self.right_x-boundingBox[0]-boundingBox[2]//2 <= 50) and len(self.pts) > 5:
self.pts.clear()
path = "./pic/"
self.mkdir(path)
filename = path+self.cur_time + ".png"
msg = f"在{self.cur_time}时,可能有抛物,摄像头id:{self.camid}。\n"
print(msg, end="")
self.create_or_write_file(self.warn_filename, msg)
cv.imwrite(filename, frame)
image_url = self.post_picture(filename)
if image_url is not None:
self.post_notify(image_url)
else:
msg = f"在{self.cur_time}时,未获得图片url,无法发起通知,摄像头id:{self.camid}。\n"
print(msg, end="")
self.create_or_write_file(self.error_filename, msg)
def post_picture(self, filename):
try:
params = {"appKey" : self.config["appkey"]}
image_file = {"file" : (filename, open(filename, "rb"), "image/png")}
r = requests.post(self.config["uploadsingleurl"], params=params, files=image_file)
r.raise_for_status()
print(f"post_picture_url:{r.url}")
image_url = r.json()["data"]
return r.json()["data"]
except Exception as e:
msg = f"在{self.cur_time}时,上传图片时出现问题:{e}. 摄像头id:{self.camid}\n"
print(msg, end="")
self.create_or_write_file(self.error_filename, msg)
def post_notify(self, image_url):
try:
params = {"appKey" : self.config["appkey"], "instanceId" : self.instanceId, "cameraId" : self.camid, "imageUrl" : image_url}
r = requests.post(self.config["notifyurl"], params=params)
r.raise_for_status()
print(f"post_notify_url:{r.url}")
except Exception as e:
msg = f"在{self.cur_time}时,发送notify请求时出现问题:{e}. 摄像头id:{self.camid}\n"
print(msg, end="")
self.create_or_write_file(self.error_filename, msg)
def mkdir(self, path):
if not os.path.exists(path):
os.makedirs(path)
def create_or_write_file(self, filename, msg):
f = open(filename, 'a', encoding="utf-8")
f.write(msg)
f.close()
- 1
- 2
- 3
前往页