Untitled

 avatar
unknown
plain_text
a year ago
2.7 kB
4
Indexable
import socket
import threading
import json

class SensorGateway:
    def __init__(self, sensor_port, user_port):
        self.sensor_data = {}
        self.sensor_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sensor_socket.bind(('', sensor_port))
        self.user_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.user_socket.bind(('', user_port))
        self.user_socket.listen()
        self.trusted_devices = set()
        self.trusted_users = set()

    def start(self):
        threading.Thread(target=self.handle_sensor_data).start()
        threading.Thread(target=self.handle_user_requests).start()

    def handle_sensor_data(self):
        while True:
            data, addr = self.sensor_socket.recvfrom(1024)
            if addr[0] in self.trusted_devices:
                try:
                    sensor_info = json.loads(data.decode())
                    self.sensor_data[sensor_info['sensor_id']] = sensor_info
                except json.JSONDecodeError:
                    pass  # Handle invalid JSON format

    def handle_user_requests(self):
        while True:
            conn, addr = self.user_socket.accept()
            if addr[0] in self.trusted_users:
                threading.Thread(target=self.handle_user_connection, args=(conn,)).start()

    def handle_user_connection(self, conn):
        try:
            data = conn.recv(1024)
            request = json.loads(data.decode())
            action = request.get('action')
            if action == 'ask':
                sensor_id = request.get('sensor_id')
                response = self.sensor_data.get(sensor_id, {})
                conn.sendall(json.dumps(response).encode())
            elif action == 'register':
                # Implement registration logic
                pass
            elif action == 'logout':
                # Implement logout logic
                pass
        except json.JSONDecodeError:
            pass  # Handle invalid JSON format
        finally:
            conn.close()

    def authorize_device(self, ip_address):
        self.trusted_devices.add(ip_address)

    def authorize_user(self, ip_address):
        self.trusted_users.add(ip_address)

    def revoke_device(self, ip_address):
        self.trusted_devices.discard(ip_address)

    def revoke_user(self, ip_address):
        self.trusted_users.discard(ip_address)


if __name__ == "__main__":
    gateway = SensorGateway(sensor_port=12345, user_port=12346)
    gateway.authorize_device("192.168.1.1")
    gateway.authorize_user("192.168.1.100")
    gateway.start()
Editor is loading...
Leave a Comment