@Override
public void processRecords(
final ProcessRecordsInput processRecordsInput) {
try {
final List<Record> records = processRecordsInput.getRecords();
final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();
logger_.trace("processRecords(" + records + ")");
boolean skipJobCreation = false;
List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>();
if (BACKFILL_USE_CASE.equals(useCase_)) {
skipJobCreation = true;
logger_.debug("Skipping for DEMO record");
}
for (Record record : records) {
logger_.info("PartitionKey: " + record.getPartitionKey());
Request request;
if (skipJobCreation) {
logger_.debug("Record data : " + record.getData() + " Arrival Timestamp : " + record.getApproximateArrivalTimestamp()
+ "Sequence Number : " + record.getSequenceNumber());
} else {
try {
Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record);
if (!possibleRequest.isPresent()) {
return;
}
request = possibleRequest.get();
} catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException |
IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException |
IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e) {
throw new InternalException(e.getMessage(), e);
}
Job job = new JobImpl(request);
ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics());
try {
continuationOrchestrator_.enqueue(job, continuation);
} catch (OrchestrationException | InterruptedException e) {
throw new InternalException(e.getMessage(), e);
}
continuations.add(continuation);
}
ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations);
Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(
List<Void> result) {
final Metrics successMetrics = metricsFactory_.newMetrics();
try {
checkpointer.checkpoint();
successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE);
} catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException |
ShutdownException e) {
logger_.error(e.getMessage(), e);
successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE);
} finally {
successMetrics.close();
}
}
@Override
public void onFailure(
Throwable t) {
try (final Metrics failureMetrics = metricsFactory_.newMetrics()) {
logger_.error(t.getMessage(), t);
failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE);
}
}
});
}
}
catch(Throwable t)
{
logger_.error("Throwing back Exception in processRecords: " + t.getMessage());
recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE);
throw t;
}
}