Untitled

 avatar
unknown
plain_text
2 months ago
4.6 kB
4
Indexable
public void processMessage(Exchange exchange) throws Exception {
        Socket socket = null;
        InputStream inputStream = null;
        DataOutputStream dataOut = null;
        String resp = null;

        final String host =
                exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_HOST, String.class);
        final String port =
                exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_PORT, String.class);
        final Boolean providerLevelTimeoutHandle =
                (exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_LEVEL_TIMEOUT_HANDLE,
                        Boolean.class) != null) ? exchange.getProperty(
                        OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_LEVEL_TIMEOUT_HANDLE, Boolean.class)
                        : false;
        Integer readTimeout = (int) config.getReadTimeout().toMillis();
        if (Boolean.TRUE.equals(providerLevelTimeoutHandle)) {
            readTimeout = exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_READ_TIMEOUT,
                    Integer.class);
        }
        int prefixMessageLength =
                exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_SOCKET_MESSAGE_PREFIX_LENGTH, int.class);

        try {
            byte[] request_data = exchange.getIn().getBody(byte[].class);
            socket = new Socket(host, Integer.parseInt(port));
            socket.setSoTimeout(readTimeout);
            dataOut = new DataOutputStream(socket.getOutputStream());
            dataOut.write(request_data, 0, request_data.length);
            dataOut.flush();

            inputStream = socket.getInputStream();
            if (prefixMessageLength == 0) {
                byte[] response = inputStream.readAllBytes();
                resp = SocketMessageHandler.bytesToHexString(response);
                exchange.getIn().setBody(resp);
            } else {
                byte[] msgLenSize = new byte[prefixMessageLength];
                int msgLenBytes = inputStream.read(msgLenSize);
                int msgLen = 0;
                LOG.debug("Message length (bytes): {}", msgLenBytes);
                String msgLenHex = null;
                if (msgLenBytes == 4) {
                    msgLenHex = SocketMessageHandler.bytesToHexString(msgLenSize);
                    msgLen = SocketMessageHandler.hexStringToNumber(msgLenHex);
                    LOG.debug("Message length: {}", msgLen);
                }

                byte[] resp_data = new byte[msgLen];
                int totalBytesRead = 0;
                while (totalBytesRead < msgLen) {
                    int bytesRead = inputStream.read(resp_data, totalBytesRead, msgLen - totalBytesRead);
                    if (bytesRead == -1) {
                        break;
                    }
                    totalBytesRead += bytesRead;
                }
                LOG.debug("Bytes read in buffer: {}", totalBytesRead);
                resp = SocketMessageHandler.bytesToHexString(resp_data);
                resp = new StringBuffer().append(msgLenHex).append(resp).toString();
                exchange.getIn().setBody(resp);
            }
            LOG.debug("Message from provider: {}", resp);
        } catch (Exception e) {
            LOG.error("Socket Connection error: {}", e.getMessage(), e);
            final String serviceProviderImplName =
                    exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_NAME, String.class);
            throw new OBRHException(
                    String.format("Failed to get response from ServiceProviderImpl[%s]. %s",
                            serviceProviderImplName, e.getMessage()),
                    e, ErrorCodeEnum.ERROR_ROUTE_CLIENT_RESPONSE_FAILED, Arrays.asList(serviceProviderImplName));
        } finally {
            if (dataOut != null) {
                try {
                    dataOut.close();
                } catch (Exception e) {
                    LOG.error("Socket OutputStream error: {}", e.toString());
                }
            }
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Exception e) {
                    LOG.error("Socket InputStream error: {}", e.toString());
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (Exception e) {
                    LOG.error("Socket Close error: {}", e.toString());
                }
            }
        }
    }
Editor is loading...
Leave a Comment