Code main

 avatar
unknown
plain_text
3 years ago
13 kB
5
Indexable
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import load_model
import numpy as np
import concurrent.futures
# import imutils
import cv2
import os
from Cam_lib.Stream_lib import Picam_lib
import socket
import sys
import datetime

# Sht_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Sht_sock.bind(("192.168.31.255", 5000))


class ShtCommunicate:
    def __init__(self):
        self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.Client_address = ["", int()]
        self.Server_address = ("192.168.31.255", 5000)
        self.Received_data = bytearray(100)
        self.Temperature = 0

    def receive(self):
        try:
            self.Received_data, self.Client_address = self.Server_sock.recvfrom(1024)
            self.Temperature = self.Received_data.decode("utf-8")
        except Exception:
            pass
        return self.Temperature


class LeptonThreadClass:
    def __init__(self):
        self.Server_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.Client_address = ["", int()]
        self.Server_address = ("127.0.0.1", 6000)

        self.Received_data = bytearray(10240)
        self.Serialized_bytes_received = np.zeros(4800)
        self.Img_received = np.zeros([60, 80], dtype=np.uint16)
        self.Calculated_temp_img = np.ones([60, 80], dtype=np.float)
        self.offset = 0

    def receive(self):
        try:
            self.Received_data, self.Client_address = self.Server_sock.recvfrom(10240)
            self.Serialized_bytes_received = np.frombuffer(self.Received_data, dtype=np.uint16)
            self.Img_received = np.reshape(self.Serialized_bytes_received, newshape=(60, 80))
        except Exception:
            return self.Calculated_temp_img
        self.Calculated_temp_img = default_temp(self.Img_received.astype(float), self.offset)
        return self.Calculated_temp_img


class CameraParams:
    camera_width = 800
    cam_scale_factor = camera_width / 400
    lepton_left_top_pixel = (int(7*cam_scale_factor+0.5), int(58*cam_scale_factor+0.5))
    lepton_right_bot_pixel = (int(320*cam_scale_factor+0.5), int(285*cam_scale_factor+0.5))


class VisualizeParams:
    cam_resolution = (800, 600)
    lepton_resolution = (800, 600)
    lept_scale_factor = lepton_resolution[0]/80


def nothing():
    pass


def default_temp(pixel_value, offset):
    temperature = pixel_value / 100 - 273.3 - 5 + offset
    return temperature


def calc_lepton_coord(start_x_cam, start_y_cam, end_x_cam, end_y_cam, cam_scale_factor):
    start_x_lep = max(0, int((start_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor) + 1 + 0.5))
    end_x_lep = min(79, int((end_x_cam - 16*cam_scale_factor)/(3.875*cam_scale_factor)+ 0.5))
    start_y_lep = max(0, int((start_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5))
    end_y_lep = min(59, int((end_y_cam - 46*cam_scale_factor)/(3.9*cam_scale_factor) + 0.5))
    return start_x_lep, start_y_lep, end_x_lep, end_y_lep


def calc_lept_resized_coord(start_pixel, end_pixel, scale_factor):
    start_pixel_resized = int(scale_factor*max(0, start_pixel-1)+1.5)
    end_pixel_resized = int(min(80*scale_factor - 1, scale_factor*(end_pixel-1)+1.5))
    return start_pixel_resized, end_pixel_resized


def run_model():
    """Initialize Connection"""
    lepton_thread = LeptonThreadClass()
    lepton_thread.Server_sock.bind(lepton_thread.Server_address)
    lepton_thread.Server_sock.setblocking(False)
    """Initialize Connection"""

    """Initialize sensor connection"""
    sht_connection = ShtCommunicate()
    sht_connection.Server_sock.bind(sht_connection.Server_address)
    sht_connection.Server_sock.setblocking(False)
    """Initialize sensor connection"""

    """Start of initialize model"""
    print("[INFO] loading face detector model...")
    prototxt_path = os.path.sep.join(["Cam_lib", "Detection_lib", "deploy.prototxt"])
    weights_path = os.path.sep.join(["Cam_lib", "Detection_lib", "res10_300x300_ssd_iter_140000.caffemodel"])
    face_net = cv2.dnn.readNet(prototxt_path, weights_path)
    print("[INFO] loading face mask detector model...")
    mask_net = load_model("Cam_lib/Detection_lib/mask_detector_5k.model")
    print("[INFO] starting video stream...")
    """End of initialize model"""
    """Start of create windows"""
    cv2.namedWindow("Lepton frame")
    cv2.createTrackbar("Offset_calib (mK)", "Lepton frame", 5000, 10000, nothing)
    # cv2.setWindowProperty("Camera frame", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    """End of create windows"""
    timestamp_wait = 8
    while True:
        temp = float(sht_connection.receive())
        # print("Sensor temperature is: {}".format(temp))

        lepton_thread.offset = cv2.getTrackbarPos("Offset_calib (mK)", "Lepton frame")/1000
        lepton_frame = lepton_thread.receive()
        diff = temp - lepton_frame[0][0]
        lepton_frame += diff + lepton_thread.offset
        lepton_frame = (lepton_frame + 273.15)/0.987746 - 273.15 - 5
        # print("Lepton temperature is: {}".format(lepton_frame[0][0]))
        # diff = temp - lepton_frame[0][0]
        # print(diff)

        if np.max(lepton_frame) > np.min(lepton_frame):
            lepton_frame_norm = (lepton_frame - np.min(lepton_frame))/(np.max(lepton_frame) - np.min(lepton_frame)) \
                                * 255
        else:
            lepton_frame_norm = lepton_frame
        lepton_frame_norm = lepton_frame_norm.astype(np.uint8)
        lepton_frame_norm_resized = cv2.resize(lepton_frame_norm, VisualizeParams.lepton_resolution,
                                               interpolation=cv2.INTER_AREA)
        lepton_frame_norm_colored = cv2.applyColorMap(lepton_frame_norm_resized, cv2.COLORMAP_INFERNO)
        frame = cv2.imread("/mnt/ramdisk/out.bmp")
        frame = cv2.flip(frame, 0)
        if frame is not None:
            try:
                # frame = imutils.resize(frame, width=400)
                # print("Camera: {}".format(frame.shape))
                # Define max initial max temp
                max_temp = 0
                (locs, preds) = detect_and_predict_mask(frame, face_net, mask_net)
                for (box_cam, pred_cam) in zip(locs, preds):
                    (startX_cam, startY_cam, endX_cam, endY_cam) = box_cam
                    (mask, withoutMask) = pred_cam
                    start_x_lept, start_y_lept, end_x_lept, end_y_lept = \
                        calc_lepton_coord(startX_cam, startY_cam, endX_cam, endY_cam, CameraParams.cam_scale_factor)
                    face_temp = lepton_frame[start_y_lept:(end_y_lept + 1), start_x_lept:(end_x_lept + 1)]
                    if face_temp.size != 0:
                        temperature = np.max(face_temp)
                        if temperature > max_temp:
                            max_temp = temperature
                        temp_text = "{:.2f} degree".format(temperature)
                        # Put temperature text to camera frame
                        cv2.putText(frame, temp_text, (startX_cam - 20, endY_cam + 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
                                   (0, 140, 255), 2)
                        # Boxing and put temperature text to Lepton frame
                        # Add box and temperature to lepton
                        start_box_lept_resized = calc_lept_resized_coord(start_x_lept, start_y_lept,
                                                                         VisualizeParams.lept_scale_factor)
                        end_box_lept_resized = calc_lept_resized_coord(end_x_lept, end_y_lept,
                                                                       VisualizeParams.lept_scale_factor)
                        cv2.rectangle(lepton_frame_norm_colored, start_box_lept_resized, end_box_lept_resized,
                                      (255, 255, 255), 2)
                        cv2.putText(lepton_frame_norm_colored,
                                    temp_text, (start_box_lept_resized[0] - 20, end_box_lept_resized[1] + 20),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1)
                    label = "Mask" if mask > withoutMask else "No Mask"
                    color = (0, 255, 0) if label == "Mask" else (0, 0, 255)
                    label = "{}: {:.2f}%".format(label, max(mask, withoutMask) * 100)
                    # Add facemask text
                    cv2.putText(frame, label, (startX_cam-20, startY_cam - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
                    cv2.rectangle(frame, (startX_cam, startY_cam), (endX_cam, endY_cam), color, 2)
                cv2.rectangle(frame, CameraParams.lepton_left_top_pixel, CameraParams.lepton_right_bot_pixel, 255, 1)
                cropped_frame = frame[117:572, 16:640, :]
                cropped_frame_resized = cv2.resize(cropped_frame, VisualizeParams.lepton_resolution,
                                                   interpolation=cv2.INTER_AREA)
                if max_temp > 37.5:
                    timestamp_wait -= 1
                    if timestamp_wait == 0:
                        current_time = datetime.datetime.now()
                        name = str(current_time) + ".jpeg"
                        location = "/home/khaitam/time_stamp/"
                        cv2.imwrite(location + name, cropped_frame_resized)
                        timestamp_wait = 8
                frame = cv2.resize(frame, VisualizeParams.cam_resolution, interpolation=cv2.INTER_AREA)
                cv2.imshow("Camera frame", frame)
                cv2.imshow("Lepton frame", lepton_frame_norm_colored)
                cv2.imshow("Cropped frame", cropped_frame_resized)
                key = cv2.waitKey(1) & 0xFF

                # if the `q` key was pressed, break from the loop
                if key == ord("q"):
                    break
            except Exception as e:
                exc_type, exc_obj, exc_tb = sys.exc_info()
                file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
                print(exc_type, file_name, exc_tb.tb_lineno)
                print(e)

    # do a bit of cleanup
    cv2.destroyAllWindows()


def detect_and_predict_mask(frame, facenet, masknet):
    # grab the dimensions of the frame and then construct a blob
    # from it
    (h, w) = frame.shape[:2]
    blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300),
                                 (104.0, 177.0, 123.0))

    # pass the blob through the network and obtain the face detections
    facenet.setInput(blob)
    detections = facenet.forward()

    # initialize our list of faces, their corresponding locations,
    # and the list of predictions from our face mask network
    faces = []
    locs = []
    preds = []

    # loop over the detections
    for i in range(0, detections.shape[2]):
        # extract the confidence (i.e., probability) associated with
        # the detection
        confidence = detections[0, 0, i, 2]

        # filter out weak detections by ensuring the confidence is
        # greater than the minimum confidence
        if confidence > 0.5:
            # compute the (x, y)-coordinates of the bounding box for
            # the object
            box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
            (startX, startY, endX, endY) = box.astype("int")

            # ensure the bounding boxes fall within the dimensions of
            # the frame
            (startX, startY) = (max(0, startX), max(0, startY))
            (endX, endY) = (min(w - 1, endX), min(h - 1, endY))

            # extract the face ROI, convert it from BGR to RGB channel
            # ordering, resize it to 224x224, and preprocess it
            face = frame[startY:endY, startX:endX]
            face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
            face = cv2.resize(face, (224, 224))
            face = img_to_array(face)
            face = preprocess_input(face)

            # add the face and bounding boxes to their respective
            # lists
            faces.append(face)
            locs.append((startX, startY, endX, endY))

    # only make a predictions if at least one face was detected
    if len(faces) > 0:
        # for faster inference we'll make batch predictions on *all*
        # faces at the same time rather than one-by-one predictions
        # in the above `for` loop
        faces = np.array(faces, dtype="float32")
        preds = masknet.predict(faces, batch_size=32)

    # return a 2-tuple of the face locations and their corresponding
    # locations
    return locs, preds


def rev_and_run_model():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.submit(Picam_lib.read_fifo)
        executor.submit(run_model)


if __name__ == "__main__":
    rev_and_run_model()
Editor is loading...