Untitled
unknown
python
3 years ago
1.9 kB
5
Indexable
import concurrent.futures
import socket
from google.protobuf.message import DecodeError
from my_message_pb2 import MyMessage
def validate_protobuf_message(expected_message: MyMessage, received_message: bytes) -> bool:
try:
# Decode the received message
decoded_message = MyMessage().FromString(received_message)
except DecodeError:
# The received message is not a valid protobuf message
return False
# Compare the expected message and the received message
return expected_message == decoded_message
def run_validation_service(expected_message: MyMessage):
# Create a socket to listen for incoming connections
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 0)) # Use port 0 to let the operating system choose a free port
server_socket.listen()
# Get the chosen port
port = server_socket.getsockname()[1]
while True:
# Accept an incoming connection
conn, addr = server_socket.accept()
# Receive the message from the connection
received_message = conn.recv(1024)
# Validate the received message
is_valid = validate_protobuf_message(expected_message, received_message)
# Send the result of the validation back to the sender
conn.send(bytes(str(is_valid), "utf-8"))
# Close the connection
conn.close()
if __name__ == "__main__":
# Create a list of expected messages to validate
expected_messages = [
MyMessage(field1="hello", field2=123, field3=[1, 2, 3]),
MyMessage(field1="world", field2=456, field3=[4, 5, 6]),
]
# Start a validation service for each expected message
with concurrent.futures.ThreadPoolExecutor() as executor:
for expected_message in expected_messages:
executor.submit(run_validation_service, expected_message)Editor is loading...