Untitled
unknown
plain_text
a year ago
4.1 kB
2
Indexable
Never
@Override public void processRecords( final ProcessRecordsInput processRecordsInput) { try { final List<Record> records = processRecordsInput.getRecords(); final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); logger_.trace("processRecords(" + records + ")"); boolean skipJobCreation = false; List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>(); if (BACKFILL_USE_CASE.equals(useCase_)) { skipJobCreation = true; logger_.debug("Skipping for DEMO record"); } for (Record record : records) { logger_.info("PartitionKey: " + record.getPartitionKey()); Request request; if (skipJobCreation) { logger_.debug("Record data : " + record.getData() + " Arrival Timestamp : " + record.getApproximateArrivalTimestamp() + "Sequence Number : " + record.getSequenceNumber()); } else { try { Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record); if (!possibleRequest.isPresent()) { return; } request = possibleRequest.get(); } catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException | IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException | IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e) { throw new InternalException(e.getMessage(), e); } Job job = new JobImpl(request); ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics()); try { continuationOrchestrator_.enqueue(job, continuation); } catch (OrchestrationException | InterruptedException e) { throw new InternalException(e.getMessage(), e); } continuations.add(continuation); } ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations); Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>() { @Override public void onSuccess( List<Void> result) { final Metrics successMetrics = metricsFactory_.newMetrics(); try { checkpointer.checkpoint(); successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { logger_.error(e.getMessage(), e); successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE); } finally { successMetrics.close(); } } @Override public void onFailure( Throwable t) { try (final Metrics failureMetrics = metricsFactory_.newMetrics()) { logger_.error(t.getMessage(), t); failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE); } } }); } } catch(Throwable t) { logger_.error("Throwing back Exception in processRecords: " + t.getMessage()); recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE); throw t; } }