Untitled
private List<StreamBatchInfoDTO> updateBatch( List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context ) { ProcessPath processPath = context.getProcessPath(); String traceId = context.getTraceId(); Long centerId = processPath.getCenterId(); List<StreamBatchInfoDTO> batchInfoList = batchList.stream() .filter(this::isPushAvailableBatch) .map(batch -> asStreamBatchInfoDTO(batch, context)) .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList())) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(batchInfoList)) { return null; } // Check if the feature flag is enabled if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) { // Use threadPoolExecutor for parallel processing List<Future<List<StreamBatchInfoDTO>>> futures = batchInfoList.stream() .map(batchInfo -> threadPoolExecutor.submit(() -> processBatchInfo(batchInfo, processPath, traceId, centerId))) .collect(Collectors.toList()); // Collect results from parallel tasks List<StreamBatchInfoDTO> result = futures.stream() .flatMap(future -> { try { return future.get().stream(); // Collect results } catch (Exception e) { log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e); return Stream.empty(); } }) .collect(Collectors.toList()); return result; } else { // Sequential processing return batchInfoList.stream() .flatMap(batchInfo -> processBatchInfo(batchInfo, processPath, traceId, centerId).stream()) .collect(Collectors.toList()); } }
Leave a Comment