Code main
unknown
plain_text
4 years ago
13 kB
7
Indexable
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import load_model
import numpy as np
import concurrent.futures
# import imutils
import cv2
import os
from Cam_lib.Stream_lib import Picam_lib
import socket
import sys
import datetime
# Sht_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Sht_sock.bind(("192.168.31.255", 5000))
class ShtCommunicate:
def __init__(self):
self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.Client_address = ["", int()]
self.Server_address = ("192.168.31.255", 5000)
self.Received_data = bytearray(100)
self.Temperature = 0
def receive(self):
try:
self.Received_data, self.Client_address = self.Server_sock.recvfrom(1024)
self.Temperature = self.Received_data.decode("utf-8")
except Exception:
pass
return self.Temperature
class LeptonThreadClass:
def __init__(self):
self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.Client_address = ["", int()]
self.Server_address = ("127.0.0.1", 6000)
self.Received_data = bytearray(10240)
self.Serialized_bytes_received = np.zeros(4800)
self.Img_received = np.zeros([60, 80], dtype=np.uint16)
self.Calculated_temp_img = np.ones([60, 80], dtype=np.float)
self.offset = 0
def receive(self):
try:
self.Received_data, self.Client_address = self.Server_sock.recvfrom(10240)
self.Serialized_bytes_received = np.frombuffer(self.Received_data, dtype=np.uint16)
self.Img_received = np.reshape(self.Serialized_bytes_received, newshape=(60, 80))
except Exception:
return self.Calculated_temp_img
self.Calculated_temp_img = default_temp(self.Img_received.astype(float), self.offset)
return self.Calculated_temp_img
class CameraParams:
camera_width = 800
cam_scale_factor = camera_width / 400
lepton_left_top_pixel = (int(7*cam_scale_factor+0.5), int(58*cam_scale_factor+0.5))
lepton_right_bot_pixel = (int(320*cam_scale_factor+0.5), int(285*cam_scale_factor+0.5))
class VisualizeParams:
cam_resolution = (800, 600)
lepton_resolution = (800, 600)
lept_scale_factor = lepton_resolution[0]/80
def nothing():
pass
def default_temp(pixel_value, offset):
temperature = pixel_value / 100 - 273.3 - 5 + offset
return temperature
def calc_lepton_coord(start_x_cam, start_y_cam, end_x_cam, end_y_cam, cam_scale_factor):
start_x_lep = max(0, int((start_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor) + 1 + 0.5))
end_x_lep = min(79, int((end_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor)+ 0.5))
start_y_lep = max(0, int((start_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5))
end_y_lep = min(59, int((end_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5))
return start_x_lep, start_y_lep, end_x_lep, end_y_lep
def calc_lept_resized_coord(start_pixel, end_pixel, scale_factor):
start_pixel_resized = int(scale_factor*max(0, start_pixel-1)+1.5)
end_pixel_resized = int(min(80*scale_factor - 1, scale_factor*(end_pixel-1)+1.5))
return start_pixel_resized, end_pixel_resized
def run_model():
"""Initialize Connection"""
lepton_thread = LeptonThreadClass()
lepton_thread.Server_sock.bind(lepton_thread.Server_address)
lepton_thread.Server_sock.setblocking(False)
"""Initialize Connection"""
"""Initialize sensor connection"""
sht_connection = ShtCommunicate()
sht_connection.Server_sock.bind(sht_connection.Server_address)
sht_connection.Server_sock.setblocking(False)
"""Initialize sensor connection"""
"""Start of initialize model"""
print("[INFO] loading face detector model...")
prototxt_path = os.path.sep.join(["Cam_lib", "Detection_lib", "deploy.prototxt"])
weights_path = os.path.sep.join(["Cam_lib", "Detection_lib", "res10_300x300_ssd_iter_140000.caffemodel"])
face_net = cv2.dnn.readNet(prototxt_path, weights_path)
print("[INFO] loading face mask detector model...")
mask_net = load_model("Cam_lib/Detection_lib/mask_detector_5k.model")
print("[INFO] starting video stream...")
"""End of initialize model"""
"""Start of create windows"""
cv2.namedWindow("Lepton frame")
cv2.createTrackbar("Offset_calib (mK)", "Lepton frame", 5000, 10000, nothing)
# cv2.setWindowProperty("Camera frame", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
"""End of create windows"""
timestamp_wait = 8
while True:
temp = float(sht_connection.receive())
# print("Sensor temperature is: {}".format(temp))
lepton_thread.offset = cv2.getTrackbarPos("Offset_calib (mK)", "Lepton frame")/1000
lepton_frame = lepton_thread.receive()
diff = temp - lepton_frame[0][0]
lepton_frame += diff + lepton_thread.offset
lepton_frame = (lepton_frame + 273.15)/0.987746 - 273.15 - 5
# print("Lepton temperature is: {}".format(lepton_frame[0][0]))
# diff = temp - lepton_frame[0][0]
# print(diff)
if np.max(lepton_frame) > np.min(lepton_frame):
lepton_frame_norm = (lepton_frame - np.min(lepton_frame))/(np.max(lepton_frame) - np.min(lepton_frame)) \
* 255
else:
lepton_frame_norm = lepton_frame
lepton_frame_norm = lepton_frame_norm.astype(np.uint8)
lepton_frame_norm_resized = cv2.resize(lepton_frame_norm, VisualizeParams.lepton_resolution,
interpolation=cv2.INTER_AREA)
lepton_frame_norm_colored = cv2.applyColorMap(lepton_frame_norm_resized, cv2.COLORMAP_INFERNO)
frame = cv2.imread("/mnt/ramdisk/out.bmp")
frame = cv2.flip(frame, 0)
if frame is not None:
try:
# frame = imutils.resize(frame, width=400)
# print("Camera: {}".format(frame.shape))
# Define max initial max temp
max_temp = 0
(locs, preds) = detect_and_predict_mask(frame, face_net, mask_net)
for (box_cam, pred_cam) in zip(locs, preds):
(startX_cam, startY_cam, endX_cam, endY_cam) = box_cam
(mask, withoutMask) = pred_cam
start_x_lept, start_y_lept, end_x_lept, end_y_lept = \
calc_lepton_coord(startX_cam, startY_cam, endX_cam, endY_cam, CameraParams.cam_scale_factor)
face_temp = lepton_frame[start_y_lept:(end_y_lept + 1), start_x_lept:(end_x_lept + 1)]
if face_temp.size != 0:
temperature = np.max(face_temp)
if temperature > max_temp:
max_temp = temperature
temp_text = "{:.2f} degree".format(temperature)
# Put temperature text to camera frame
cv2.putText(frame, temp_text, (startX_cam - 20, endY_cam + 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 140, 255), 2)
# Boxing and put temperature text to Lepton frame
# Add box and temperature to lepton
start_box_lept_resized = calc_lept_resized_coord(start_x_lept, start_y_lept,
VisualizeParams.lept_scale_factor)
end_box_lept_resized = calc_lept_resized_coord(end_x_lept, end_y_lept,
VisualizeParams.lept_scale_factor)
cv2.rectangle(lepton_frame_norm_colored, start_box_lept_resized, end_box_lept_resized,
(255, 255, 255), 2)
cv2.putText(lepton_frame_norm_colored,
temp_text, (start_box_lept_resized[0] - 20, end_box_lept_resized[1] + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1)
label = "Mask" if mask > withoutMask else "No Mask"
color = (0, 255, 0) if label == "Mask" else (0, 0, 255)
label = "{}: {:.2f}%".format(label, max(mask, withoutMask) * 100)
# Add facemask text
cv2.putText(frame, label, (startX_cam-20, startY_cam - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
cv2.rectangle(frame, (startX_cam, startY_cam), (endX_cam, endY_cam), color, 2)
cv2.rectangle(frame, CameraParams.lepton_left_top_pixel, CameraParams.lepton_right_bot_pixel, 255, 1)
cropped_frame = frame[117:572, 16:640, :]
cropped_frame_resized = cv2.resize(cropped_frame, VisualizeParams.lepton_resolution,
interpolation=cv2.INTER_AREA)
if max_temp > 37.5:
timestamp_wait -= 1
if timestamp_wait == 0:
current_time = datetime.datetime.now()
name = str(current_time) + ".jpeg"
location = "/home/khaitam/time_stamp/"
cv2.imwrite(location + name, cropped_frame_resized)
timestamp_wait = 8
frame = cv2.resize(frame, VisualizeParams.cam_resolution, interpolation=cv2.INTER_AREA)
cv2.imshow("Camera frame", frame)
cv2.imshow("Lepton frame", lepton_frame_norm_colored)
cv2.imshow("Cropped frame", cropped_frame_resized)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, file_name, exc_tb.tb_lineno)
print(e)
# do a bit of cleanup
cv2.destroyAllWindows()
def detect_and_predict_mask(frame, facenet, masknet):
# grab the dimensions of the frame and then construct a blob
# from it
(h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300),
(104.0, 177.0, 123.0))
# pass the blob through the network and obtain the face detections
facenet.setInput(blob)
detections = facenet.forward()
# initialize our list of faces, their corresponding locations,
# and the list of predictions from our face mask network
faces = []
locs = []
preds = []
# loop over the detections
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with
# the detection
confidence = detections[0, 0, i, 2]
# filter out weak detections by ensuring the confidence is
# greater than the minimum confidence
if confidence > 0.5:
# compute the (x, y)-coordinates of the bounding box for
# the object
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
# ensure the bounding boxes fall within the dimensions of
# the frame
(startX, startY) = (max(0, startX), max(0, startY))
(endX, endY) = (min(w - 1, endX), min(h - 1, endY))
# extract the face ROI, convert it from BGR to RGB channel
# ordering, resize it to 224x224, and preprocess it
face = frame[startY:endY, startX:endX]
face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
face = cv2.resize(face, (224, 224))
face = img_to_array(face)
face = preprocess_input(face)
# add the face and bounding boxes to their respective
# lists
faces.append(face)
locs.append((startX, startY, endX, endY))
# only make a predictions if at least one face was detected
if len(faces) > 0:
# for faster inference we'll make batch predictions on *all*
# faces at the same time rather than one-by-one predictions
# in the above `for` loop
faces = np.array(faces, dtype="float32")
preds = masknet.predict(faces, batch_size=32)
# return a 2-tuple of the face locations and their corresponding
# locations
return locs, preds
def rev_and_run_model():
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.submit(Picam_lib.read_fifo)
executor.submit(run_model)
if __name__ == "__main__":
rev_and_run_model()
Editor is loading...