Untitled
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; public class BatchProcessor { private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE"; private static final int PUBLISHER_CORE_POOL_SIZE = 5; private static final int PUBLISHER_MAXIMUM_POOL_SIZE = 20; private static final int PUBLISHER_KEEP_ALIVE_TIME = 60; private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor( PUBLISHER_CORE_POOL_SIZE, PUBLISHER_MAXIMUM_POOL_SIZE, PUBLISHER_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy() ); public List<StreamBatchInfoDTO> updateBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) { ProcessPath processPath = context.getProcessPath(); String traceId = context.getTraceId(); Long centerId = processPath.getCenterId(); List<StreamBatchInfoDTO> batchInfoList = batchList.stream() .filter(this::isPushAvailableBatch) .map(batch -> asStreamBatchInfoDTO(batch, context)) .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList())) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(batchInfoList)) { return null; } List<List<StreamBatchInfoDTO>> results = executeInParallelWithResult( batchInfoList, batchInfo -> processUpdateBatch(batchInfo, processPath, traceId, centerId), traceId, "STREAM UPDATE BATCH" ); return results.stream().flatMap(List::stream).collect(Collectors.toList()); } public List<StreamBatchInfoDTO> createBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) { List<StreamBatchInfoDTO> batchInfoList = batchList.stream() .filter(this::isPushAvailableBatch) .map(batch -> asStreamBatchInfoDTO(batch, context)) .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList())) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(batchInfoList)) { return null; } executeInParallelWithResult( batchInfoList, batchInfo -> { callOutboundCreateBatchApi( context.getProcessPath(), context.getNow(), Collections.singletonList(batchInfo), context.getTraceId() ); return null; // No result to collect }, context.getTraceId(), "STREAM CREATE BATCH" ); return batchInfoList; } private <T, R> List<R> executeInParallelWithResult( List<T> items, Function<T, R> taskProcessor, String traceId, String taskName ) { if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) { List<Future<R>> futures = items.stream() .map(item -> threadPoolExecutor.submit(() -> taskProcessor.apply(item))) .collect(Collectors.toList()); List<R> results = new ArrayList<>(); futures.forEach(future -> { try { R result = future.get(); if (result != null) { results.add(result); } } catch (Exception e) { logError(traceId, taskName, e); } }); return results; } else { return items.stream() .map(taskProcessor) .filter(Objects::nonNull) .collect(Collectors.toList()); } } private List<StreamBatchInfoDTO> processUpdateBatch( StreamBatchInfoDTO batchInfo, ProcessPath processPath, String traceId, Long centerId ) { try { RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Collections.singletonList(batchInfo)); logInfo(traceId, "[STREAM UPDATE BATCH] REQUEST = {}", recommendBatches); PickingResponseEntity response = postApiStreamPickingV1UpdateBatch( recommendBatches, buildOutboundApiHeader(centerId) ); logInfo(traceId, "[STREAM UPDATE BATCH] RESPONSE = {}", response); if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) { logWarn(traceId, "[STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {}", batchInfo.getBatchId(), response); } return recommendBatches.getBatchInfoDTOList(); } catch (Exception e) { logError(traceId, "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId(), e); } return Collections.emptyList(); } // Mock utility methods private boolean isFeatureEnabled(String featureFlag) { return "ENABLE_PARALLEL_UPDATE".equals(featureFlag); } private boolean isPushAvailableBatch(MultiStreamBatchDTO batch) { return true; // Replace with actual logic } private StreamBatchInfoDTO asStreamBatchInfoDTO(MultiStreamBatchDTO batch, MultiOptimizationContext context) { return new StreamBatchInfoDTO(); // Replace with actual logic } private RecommendStreamBatchDTO asRecommendStreamBatchDTO(ProcessPath processPath, List<StreamBatchInfoDTO> batchInfoList) { return new RecommendStreamBatchDTO(); // Replace with actual logic } private PickingResponseEntity postApiStreamPickingV1UpdateBatch(RecommendStreamBatchDTO dto, Object headers) { return new PickingResponseEntity(); // Replace with actual logic } private Object buildOutboundApiHeader(Long centerId) { return new Object(); // Replace with actual logic } private void callOutboundCreateBatchApi(ProcessPath processPath, Object now, List<StreamBatchInfoDTO> batchInfoList, String traceId) { // Implement API call } private void logInfo(String traceId, String message, Object... ar
Leave a Comment