Untitled
unknown
python
2 years ago
1.9 kB
1
Indexable
import concurrent.futures import socket from google.protobuf.message import DecodeError from my_message_pb2 import MyMessage def validate_protobuf_message(expected_message: MyMessage, received_message: bytes) -> bool: try: # Decode the received message decoded_message = MyMessage().FromString(received_message) except DecodeError: # The received message is not a valid protobuf message return False # Compare the expected message and the received message return expected_message == decoded_message def run_validation_service(expected_message: MyMessage): # Create a socket to listen for incoming connections server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(("localhost", 0)) # Use port 0 to let the operating system choose a free port server_socket.listen() # Get the chosen port port = server_socket.getsockname()[1] while True: # Accept an incoming connection conn, addr = server_socket.accept() # Receive the message from the connection received_message = conn.recv(1024) # Validate the received message is_valid = validate_protobuf_message(expected_message, received_message) # Send the result of the validation back to the sender conn.send(bytes(str(is_valid), "utf-8")) # Close the connection conn.close() if __name__ == "__main__": # Create a list of expected messages to validate expected_messages = [ MyMessage(field1="hello", field2=123, field3=[1, 2, 3]), MyMessage(field1="world", field2=456, field3=[4, 5, 6]), ] # Start a validation service for each expected message with concurrent.futures.ThreadPoolExecutor() as executor: for expected_message in expected_messages: executor.submit(run_validation_service, expected_message)
Editor is loading...