Untitled
unknown
plain_text
3 months ago
4.6 kB
5
Indexable
public void processMessage(Exchange exchange) throws Exception {
Socket socket = null;
InputStream inputStream = null;
DataOutputStream dataOut = null;
String resp = null;
final String host =
exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_HOST, String.class);
final String port =
exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_PORT, String.class);
final Boolean providerLevelTimeoutHandle =
(exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_LEVEL_TIMEOUT_HANDLE,
Boolean.class) != null) ? exchange.getProperty(
OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_LEVEL_TIMEOUT_HANDLE, Boolean.class)
: false;
Integer readTimeout = (int) config.getReadTimeout().toMillis();
if (Boolean.TRUE.equals(providerLevelTimeoutHandle)) {
readTimeout = exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_READ_TIMEOUT,
Integer.class);
}
int prefixMessageLength =
exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_SOCKET_MESSAGE_PREFIX_LENGTH, int.class);
try {
byte[] request_data = exchange.getIn().getBody(byte[].class);
socket = new Socket(host, Integer.parseInt(port));
socket.setSoTimeout(readTimeout);
dataOut = new DataOutputStream(socket.getOutputStream());
dataOut.write(request_data, 0, request_data.length);
dataOut.flush();
inputStream = socket.getInputStream();
if (prefixMessageLength == 0) {
byte[] response = inputStream.readAllBytes();
resp = SocketMessageHandler.bytesToHexString(response);
exchange.getIn().setBody(resp);
} else {
byte[] msgLenSize = new byte[prefixMessageLength];
int msgLenBytes = inputStream.read(msgLenSize);
int msgLen = 0;
LOG.debug("Message length (bytes): {}", msgLenBytes);
String msgLenHex = null;
if (msgLenBytes == 4) {
msgLenHex = SocketMessageHandler.bytesToHexString(msgLenSize);
msgLen = SocketMessageHandler.hexStringToNumber(msgLenHex);
LOG.debug("Message length: {}", msgLen);
}
byte[] resp_data = new byte[msgLen];
int totalBytesRead = 0;
while (totalBytesRead < msgLen) {
int bytesRead = inputStream.read(resp_data, totalBytesRead, msgLen - totalBytesRead);
if (bytesRead == -1) {
break;
}
totalBytesRead += bytesRead;
}
LOG.debug("Bytes read in buffer: {}", totalBytesRead);
resp = SocketMessageHandler.bytesToHexString(resp_data);
resp = new StringBuffer().append(msgLenHex).append(resp).toString();
exchange.getIn().setBody(resp);
}
LOG.debug("Message from provider: {}", resp);
} catch (Exception e) {
LOG.error("Socket Connection error: {}", e.getMessage(), e);
final String serviceProviderImplName =
exchange.getProperty(OBRHExchangeProperties.SERVICE_PROVIDER_IMPL_NAME, String.class);
throw new OBRHException(
String.format("Failed to get response from ServiceProviderImpl[%s]. %s",
serviceProviderImplName, e.getMessage()),
e, ErrorCodeEnum.ERROR_ROUTE_CLIENT_RESPONSE_FAILED, Arrays.asList(serviceProviderImplName));
} finally {
if (dataOut != null) {
try {
dataOut.close();
} catch (Exception e) {
LOG.error("Socket OutputStream error: {}", e.toString());
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception e) {
LOG.error("Socket InputStream error: {}", e.toString());
}
}
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
LOG.error("Socket Close error: {}", e.toString());
}
}
}
}Editor is loading...
Leave a Comment