Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
4.1 kB
2
Indexable
Never
   @Override
	
	
   public void processRecords(
	
	
           final ProcessRecordsInput processRecordsInput) {
	
	
     try {
	
		  
	
		  
	
       final List<Record> records = processRecordsInput.getRecords();
	
       final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();
	
	
  
	
       logger_.trace("processRecords(" + records + ")");
	
	
       boolean skipJobCreation = false;
	
       List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>();
	
	
  
	
       if (BACKFILL_USE_CASE.equals(useCase_)) {
	
	
         skipJobCreation = true;
	
         logger_.debug("Skipping for DEMO record");
	
	
       }
	
	
  
	
	
       for (Record record : records) {
	
	
         logger_.info("PartitionKey: " + record.getPartitionKey());
	
	
         Request request;
	
	
         if (skipJobCreation) {
	
	
           logger_.debug("Record data : " + record.getData() + " Arrival Timestamp : " + record.getApproximateArrivalTimestamp()
	
	
                   + "Sequence Number : " + record.getSequenceNumber());
	
  
	
         } else {
	
           try {
	
	
             Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record);
	
	
             if (!possibleRequest.isPresent()) {
	
	
               return;
	
	
             }
	
	
             request = possibleRequest.get();
	
	
           } catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException |
	
	
                    IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException |
	
	
                    IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e) {
	
	
             throw new InternalException(e.getMessage(), e);
	
	
           }
		  
	
		  
		  
	
		  
		  
		  
	
	
  
	
           Job job = new JobImpl(request);
	
	
  
	
           ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics());
	
	
  
	
	
           try {
	
	
             continuationOrchestrator_.enqueue(job, continuation);
	
           } catch (OrchestrationException | InterruptedException e) {
	
	
             throw new InternalException(e.getMessage(), e);
	
           }
	
		  
		  
		  
	
	
  
	
           continuations.add(continuation);
	
         }
	
	
  
	
         ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations);
	
	
  
	
         Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>() {
	
           @Override
	
	
           public void onSuccess(
	
	
                   List<Void> result) {
	
	
             final Metrics successMetrics = metricsFactory_.newMetrics();
	
	
             try {
	
               checkpointer.checkpoint();
	
	
               successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE);
	
	
             } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException |
	
                      ShutdownException e) {
	
               logger_.error(e.getMessage(), e);
	
	
               successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE);
	
             } finally {
	
	
               successMetrics.close();
	
             }
		  
	
		  
		  
	
           }
		  
	
	
  
	
	
           @Override
	
	
           public void onFailure(
	
	
                   Throwable t) {
	
	
             try (final Metrics failureMetrics = metricsFactory_.newMetrics()) {
	
               logger_.error(t.getMessage(), t);
	
               failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE);
	
             }
	
           }
	
         });
	
	
       }
	
	
     }
	
     catch(Throwable t)
	
	
       {
	
         logger_.error("Throwing back Exception in processRecords: " + t.getMessage());
	
         recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE);
	
         throw t;
	
	
       }
	
     }