Untitled
unknown
plain_text
a month ago
3.2 kB
2
Indexable
Never
@Override public void processRecords( final ProcessRecordsInput processRecordsInput) { try { final List<Record> records = processRecordsInput.getRecords(); final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); logger_.trace("processRecords(" + records + ")"); List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>(); for (Record record : records) { logger_.info("PartitionKey: " + record.getPartitionKey()); Request request; try { Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record); if(!possibleRequest.isPresent()){ return; } request = possibleRequest.get(); } catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException | IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException | IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e) { throw new InternalException(e.getMessage(), e); } Job job = new JobImpl(request); ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics()); try { continuationOrchestrator_.enqueue(job, continuation); } catch (OrchestrationException | InterruptedException e) { throw new InternalException(e.getMessage(), e); } continuations.add(continuation); } ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations); Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>() { @Override public void onSuccess( List<Void> result) { final Metrics successMetrics = metricsFactory_.newMetrics(); try { checkpointer.checkpoint(); successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { logger_.error(e.getMessage(), e); successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE); } finally { successMetrics.close(); } } @Override public void onFailure( Throwable t) { try (final Metrics failureMetrics = metricsFactory_.newMetrics()) { logger_.error(t.getMessage(), t); failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE); } } }); } catch(Throwable t) { logger_.error("Throwing back Exception in processRecords: " + t.getMessage()); recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE); throw t; } }