Untitled

 avatar
unknown
python
2 years ago
1.9 kB
1
Indexable
import concurrent.futures
import socket
from google.protobuf.message import DecodeError

from my_message_pb2 import MyMessage

def validate_protobuf_message(expected_message: MyMessage, received_message: bytes) -> bool:
    try:
        # Decode the received message
        decoded_message = MyMessage().FromString(received_message)
    except DecodeError:
        # The received message is not a valid protobuf message
        return False
    
    # Compare the expected message and the received message
    return expected_message == decoded_message

def run_validation_service(expected_message: MyMessage):
    # Create a socket to listen for incoming connections
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("localhost", 0))  # Use port 0 to let the operating system choose a free port
    server_socket.listen()

    # Get the chosen port
    port = server_socket.getsockname()[1]

    while True:
        # Accept an incoming connection
        conn, addr = server_socket.accept()

        # Receive the message from the connection
        received_message = conn.recv(1024)

        # Validate the received message
        is_valid = validate_protobuf_message(expected_message, received_message)

        # Send the result of the validation back to the sender
        conn.send(bytes(str(is_valid), "utf-8"))

        # Close the connection
        conn.close()

if __name__ == "__main__":
    # Create a list of expected messages to validate
    expected_messages = [
        MyMessage(field1="hello", field2=123, field3=[1, 2, 3]),
        MyMessage(field1="world", field2=456, field3=[4, 5, 6]),
    ]

    # Start a validation service for each expected message
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for expected_message in expected_messages:
            executor.submit(run_validation_service, expected_message)
Editor is loading...