Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
3.2 kB
3
Indexable
@Override
  public void processRecords(
          final ProcessRecordsInput processRecordsInput)
  {
    try
    {
      final List<Record> records = processRecordsInput.getRecords();
      final IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();

      logger_.trace("processRecords(" + records + ")");

      List<ListenableFuture<Void>> continuations = new ArrayList<ListenableFuture<Void>>();

      for (Record record : records)
      {
        logger_.info("PartitionKey: " + record.getPartitionKey());

        Request request;

        try
        {
          Optional<Request> possibleRequest = createRequest(metricsFactory_.newMetrics(), record);
          if(!possibleRequest.isPresent()){
            return;
          }
          request = possibleRequest.get();
        }
        catch (amazon.odin.InternalException | MaterialNotFoundException | IVRequiredException | IllegalArgumentException | IllegalBlockSizeException | MaterialAccessDeniedException | IllegalModeException | InvalidMaterialException | MaterialRevokedException | IOException e)
        {
          throw new InternalException(e.getMessage(), e);
        }

        Job job = new JobImpl(request);

        ContinuationHelper<Void> continuation = ContinuationHelper.create(job.getMetrics());

        try
        {
          continuationOrchestrator_.enqueue(job, continuation);
        }
        catch (OrchestrationException | InterruptedException e)
        {
          throw new InternalException(e.getMessage(), e);
        }

        continuations.add(continuation);
      }

      ListenableFuture<List<Void>> finalContinuation = Futures.allAsList(continuations);

      Futures.addCallback(finalContinuation, new FutureCallback<List<Void>>()
      {
        @Override
        public void onSuccess(
                List<Void> result)
        {
          final Metrics successMetrics = metricsFactory_.newMetrics();
          try
          {
            checkpointer.checkpoint();
            successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededTimes", 1, Unit.ONE);
          }
          catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e)
          {
            logger_.error(e.getMessage(), e);
            successMetrics.addCount(this.getClass().getCanonicalName() + "_allRecordsInBatchSucceededButCheckpointFailedTimes", 1, Unit.ONE);
          } finally {
            successMetrics.close();
          }
        }

        @Override
        public void onFailure(
                Throwable t)
        {
          try (final Metrics failureMetrics = metricsFactory_.newMetrics()) {
            logger_.error(t.getMessage(), t);
            failureMetrics.addCount(this.getClass().getCanonicalName() + "_atLeastOneRecordInBatchFailedTimes", 1, Unit.ONE);
          }
        }
      });
    }
    catch(Throwable t)
    {
      logger_.error("Throwing back Exception in processRecords: " + t.getMessage());
      recordProcessorMetrics.addCount(this.getClass().getCanonicalName() + "_processRecordsThrewExceptionTimes", 1, Unit.ONE);
      throw t;
    }
  }