Untitled
wyc1230
plain_text
a year ago
1.9 kB
15
Indexable
private List<StreamBatchInfoDTO> updateBatch(
List<MultiStreamBatchDTO> batchList,
MultiOptimizationContext context
) {
ProcessPath processPath = context.getProcessPath();
String traceId = context.getTraceId();
Long centerId = processPath.getCenterId();
List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
.filter(this::isPushAvailableBatch)
.map(batch -> asStreamBatchInfoDTO(batch, context))
.filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(batchInfoList)) {
return null;
}
// Check if the feature flag is enabled
if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
// Use threadPoolExecutor for parallel processing
List<Future<List<StreamBatchInfoDTO>>> futures = batchInfoList.stream()
.map(batchInfo -> threadPoolExecutor.submit(() -> processBatchInfo(batchInfo, processPath, traceId, centerId)))
.collect(Collectors.toList());
// Collect results from parallel tasks
List<StreamBatchInfoDTO> result = futures.stream()
.flatMap(future -> {
try {
return future.get().stream(); // Collect results
} catch (Exception e) {
log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e);
return Stream.empty();
}
})
.collect(Collectors.toList());
return result;
} else {
// Sequential processing
return batchInfoList.stream()
.flatMap(batchInfo -> processBatchInfo(batchInfo, processPath, traceId, centerId).stream())
.collect(Collectors.toList());
}
}Editor is loading...
Leave a Comment