Code main
unknown
plain_text
3 years ago
13 kB
5
Indexable
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input from tensorflow.keras.preprocessing.image import img_to_array from tensorflow.keras.models import load_model import numpy as np import concurrent.futures # import imutils import cv2 import os from Cam_lib.Stream_lib import Picam_lib import socket import sys import datetime # Sht_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Sht_sock.bind(("192.168.31.255", 5000)) class ShtCommunicate: def __init__(self): self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.Client_address = ["", int()] self.Server_address = ("192.168.31.255", 5000) self.Received_data = bytearray(100) self.Temperature = 0 def receive(self): try: self.Received_data, self.Client_address = self.Server_sock.recvfrom(1024) self.Temperature = self.Received_data.decode("utf-8") except Exception: pass return self.Temperature class LeptonThreadClass: def __init__(self): self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.Client_address = ["", int()] self.Server_address = ("127.0.0.1", 6000) self.Received_data = bytearray(10240) self.Serialized_bytes_received = np.zeros(4800) self.Img_received = np.zeros([60, 80], dtype=np.uint16) self.Calculated_temp_img = np.ones([60, 80], dtype=np.float) self.offset = 0 def receive(self): try: self.Received_data, self.Client_address = self.Server_sock.recvfrom(10240) self.Serialized_bytes_received = np.frombuffer(self.Received_data, dtype=np.uint16) self.Img_received = np.reshape(self.Serialized_bytes_received, newshape=(60, 80)) except Exception: return self.Calculated_temp_img self.Calculated_temp_img = default_temp(self.Img_received.astype(float), self.offset) return self.Calculated_temp_img class CameraParams: camera_width = 800 cam_scale_factor = camera_width / 400 lepton_left_top_pixel = (int(7*cam_scale_factor+0.5), int(58*cam_scale_factor+0.5)) lepton_right_bot_pixel = (int(320*cam_scale_factor+0.5), int(285*cam_scale_factor+0.5)) class VisualizeParams: cam_resolution = (800, 600) lepton_resolution = (800, 600) lept_scale_factor = lepton_resolution[0]/80 def nothing(): pass def default_temp(pixel_value, offset): temperature = pixel_value / 100 - 273.3 - 5 + offset return temperature def calc_lepton_coord(start_x_cam, start_y_cam, end_x_cam, end_y_cam, cam_scale_factor): start_x_lep = max(0, int((start_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor) + 1 + 0.5)) end_x_lep = min(79, int((end_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor)+ 0.5)) start_y_lep = max(0, int((start_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5)) end_y_lep = min(59, int((end_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5)) return start_x_lep, start_y_lep, end_x_lep, end_y_lep def calc_lept_resized_coord(start_pixel, end_pixel, scale_factor): start_pixel_resized = int(scale_factor*max(0, start_pixel-1)+1.5) end_pixel_resized = int(min(80*scale_factor - 1, scale_factor*(end_pixel-1)+1.5)) return start_pixel_resized, end_pixel_resized def run_model(): """Initialize Connection""" lepton_thread = LeptonThreadClass() lepton_thread.Server_sock.bind(lepton_thread.Server_address) lepton_thread.Server_sock.setblocking(False) """Initialize Connection""" """Initialize sensor connection""" sht_connection = ShtCommunicate() sht_connection.Server_sock.bind(sht_connection.Server_address) sht_connection.Server_sock.setblocking(False) """Initialize sensor connection""" """Start of initialize model""" print("[INFO] loading face detector model...") prototxt_path = os.path.sep.join(["Cam_lib", "Detection_lib", "deploy.prototxt"]) weights_path = os.path.sep.join(["Cam_lib", "Detection_lib", "res10_300x300_ssd_iter_140000.caffemodel"]) face_net = cv2.dnn.readNet(prototxt_path, weights_path) print("[INFO] loading face mask detector model...") mask_net = load_model("Cam_lib/Detection_lib/mask_detector_5k.model") print("[INFO] starting video stream...") """End of initialize model""" """Start of create windows""" cv2.namedWindow("Lepton frame") cv2.createTrackbar("Offset_calib (mK)", "Lepton frame", 5000, 10000, nothing) # cv2.setWindowProperty("Camera frame", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) """End of create windows""" timestamp_wait = 8 while True: temp = float(sht_connection.receive()) # print("Sensor temperature is: {}".format(temp)) lepton_thread.offset = cv2.getTrackbarPos("Offset_calib (mK)", "Lepton frame")/1000 lepton_frame = lepton_thread.receive() diff = temp - lepton_frame[0][0] lepton_frame += diff + lepton_thread.offset lepton_frame = (lepton_frame + 273.15)/0.987746 - 273.15 - 5 # print("Lepton temperature is: {}".format(lepton_frame[0][0])) # diff = temp - lepton_frame[0][0] # print(diff) if np.max(lepton_frame) > np.min(lepton_frame): lepton_frame_norm = (lepton_frame - np.min(lepton_frame))/(np.max(lepton_frame) - np.min(lepton_frame)) \ * 255 else: lepton_frame_norm = lepton_frame lepton_frame_norm = lepton_frame_norm.astype(np.uint8) lepton_frame_norm_resized = cv2.resize(lepton_frame_norm, VisualizeParams.lepton_resolution, interpolation=cv2.INTER_AREA) lepton_frame_norm_colored = cv2.applyColorMap(lepton_frame_norm_resized, cv2.COLORMAP_INFERNO) frame = cv2.imread("/mnt/ramdisk/out.bmp") frame = cv2.flip(frame, 0) if frame is not None: try: # frame = imutils.resize(frame, width=400) # print("Camera: {}".format(frame.shape)) # Define max initial max temp max_temp = 0 (locs, preds) = detect_and_predict_mask(frame, face_net, mask_net) for (box_cam, pred_cam) in zip(locs, preds): (startX_cam, startY_cam, endX_cam, endY_cam) = box_cam (mask, withoutMask) = pred_cam start_x_lept, start_y_lept, end_x_lept, end_y_lept = \ calc_lepton_coord(startX_cam, startY_cam, endX_cam, endY_cam, CameraParams.cam_scale_factor) face_temp = lepton_frame[start_y_lept:(end_y_lept + 1), start_x_lept:(end_x_lept + 1)] if face_temp.size != 0: temperature = np.max(face_temp) if temperature > max_temp: max_temp = temperature temp_text = "{:.2f} degree".format(temperature) # Put temperature text to camera frame cv2.putText(frame, temp_text, (startX_cam - 20, endY_cam + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 140, 255), 2) # Boxing and put temperature text to Lepton frame # Add box and temperature to lepton start_box_lept_resized = calc_lept_resized_coord(start_x_lept, start_y_lept, VisualizeParams.lept_scale_factor) end_box_lept_resized = calc_lept_resized_coord(end_x_lept, end_y_lept, VisualizeParams.lept_scale_factor) cv2.rectangle(lepton_frame_norm_colored, start_box_lept_resized, end_box_lept_resized, (255, 255, 255), 2) cv2.putText(lepton_frame_norm_colored, temp_text, (start_box_lept_resized[0] - 20, end_box_lept_resized[1] + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) label = "Mask" if mask > withoutMask else "No Mask" color = (0, 255, 0) if label == "Mask" else (0, 0, 255) label = "{}: {:.2f}%".format(label, max(mask, withoutMask) * 100) # Add facemask text cv2.putText(frame, label, (startX_cam-20, startY_cam - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2) cv2.rectangle(frame, (startX_cam, startY_cam), (endX_cam, endY_cam), color, 2) cv2.rectangle(frame, CameraParams.lepton_left_top_pixel, CameraParams.lepton_right_bot_pixel, 255, 1) cropped_frame = frame[117:572, 16:640, :] cropped_frame_resized = cv2.resize(cropped_frame, VisualizeParams.lepton_resolution, interpolation=cv2.INTER_AREA) if max_temp > 37.5: timestamp_wait -= 1 if timestamp_wait == 0: current_time = datetime.datetime.now() name = str(current_time) + ".jpeg" location = "/home/khaitam/time_stamp/" cv2.imwrite(location + name, cropped_frame_resized) timestamp_wait = 8 frame = cv2.resize(frame, VisualizeParams.cam_resolution, interpolation=cv2.INTER_AREA) cv2.imshow("Camera frame", frame) cv2.imshow("Lepton frame", lepton_frame_norm_colored) cv2.imshow("Cropped frame", cropped_frame_resized) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, file_name, exc_tb.tb_lineno) print(e) # do a bit of cleanup cv2.destroyAllWindows() def detect_and_predict_mask(frame, facenet, masknet): # grab the dimensions of the frame and then construct a blob # from it (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0)) # pass the blob through the network and obtain the face detections facenet.setInput(blob) detections = facenet.forward() # initialize our list of faces, their corresponding locations, # and the list of predictions from our face mask network faces = [] locs = [] preds = [] # loop over the detections for i in range(0, detections.shape[2]): # extract the confidence (i.e., probability) associated with # the detection confidence = detections[0, 0, i, 2] # filter out weak detections by ensuring the confidence is # greater than the minimum confidence if confidence > 0.5: # compute the (x, y)-coordinates of the bounding box for # the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # ensure the bounding boxes fall within the dimensions of # the frame (startX, startY) = (max(0, startX), max(0, startY)) (endX, endY) = (min(w - 1, endX), min(h - 1, endY)) # extract the face ROI, convert it from BGR to RGB channel # ordering, resize it to 224x224, and preprocess it face = frame[startY:endY, startX:endX] face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) face = cv2.resize(face, (224, 224)) face = img_to_array(face) face = preprocess_input(face) # add the face and bounding boxes to their respective # lists faces.append(face) locs.append((startX, startY, endX, endY)) # only make a predictions if at least one face was detected if len(faces) > 0: # for faster inference we'll make batch predictions on *all* # faces at the same time rather than one-by-one predictions # in the above `for` loop faces = np.array(faces, dtype="float32") preds = masknet.predict(faces, batch_size=32) # return a 2-tuple of the face locations and their corresponding # locations return locs, preds def rev_and_run_model(): with concurrent.futures.ProcessPoolExecutor() as executor: executor.submit(Picam_lib.read_fifo) executor.submit(run_model) if __name__ == "__main__": rev_and_run_model()
Editor is loading...