@Override
public void processRecords(
final ProcessRecordsInput processRecordsInput)
{
try
{
final List<Record> records = processRecordsInput.getRecords();
final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();
logger_.trace("processRecords(" + records + ")");
List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>();
for (Record record : records)
{
logger_.info("PartitionKey: " + record.getPartitionKey());
Request request;
try
{
Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record);
if(!possibleRequest.isPresent()){
return;
}
request = possibleRequest.get();
}
catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException | IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException | IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e)
{
throw new InternalException(e.getMessage(), e);
}
Job job = new JobImpl(request);
ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics());
try
{
continuationOrchestrator_.enqueue(job, continuation);
}
catch (OrchestrationException | InterruptedException e)
{
throw new InternalException(e.getMessage(), e);
}
continuations.add(continuation);
}
ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations);
Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>()
{
@Override
public void onSuccess(
List<Void> result)
{
final Metrics successMetrics = metricsFactory_.newMetrics();
try
{
checkpointer.checkpoint();
successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE);
}
catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e)
{
logger_.error(e.getMessage(), e);
successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE);
} finally {
successMetrics.close();
}
}
@Override
public void onFailure(
Throwable t)
{
try (final Metrics failureMetrics = metricsFactory_.newMetrics()) {
logger_.error(t.getMessage(), t);
failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE);
}
}
});
}
catch(Throwable t)
{
logger_.error("Throwing back Exception in processRecords: " + t.getMessage());
recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE);
throw t;
}
}