Untitled

 avatar
wyc1230
plain_text
23 days ago
1.9 kB
6
Indexable
private List<StreamBatchInfoDTO> updateBatch(
        List<MultiStreamBatchDTO> batchList,
        MultiOptimizationContext context
) {
    ProcessPath processPath = context.getProcessPath();
    String traceId = context.getTraceId();
    Long centerId = processPath.getCenterId();
    List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
            .filter(this::isPushAvailableBatch)
            .map(batch -> asStreamBatchInfoDTO(batch, context))
            .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
            .collect(Collectors.toList());

    if (CollectionUtils.isEmpty(batchInfoList)) {
        return null;
    }

    // Check if the feature flag is enabled
    if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
        // Use threadPoolExecutor for parallel processing
        List<Future<List<StreamBatchInfoDTO>>> futures = batchInfoList.stream()
                .map(batchInfo -> threadPoolExecutor.submit(() -> processBatchInfo(batchInfo, processPath, traceId, centerId)))
                .collect(Collectors.toList());

        // Collect results from parallel tasks
        List<StreamBatchInfoDTO> result = futures.stream()
                .flatMap(future -> {
                    try {
                        return future.get().stream(); // Collect results
                    } catch (Exception e) {
                        log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e);
                        return Stream.empty();
                    }
                })
                .collect(Collectors.toList());
        return result;
    } else {
        // Sequential processing
        return batchInfoList.stream()
                .flatMap(batchInfo -> processBatchInfo(batchInfo, processPath, traceId, centerId).stream())
                .collect(Collectors.toList());
    }
}
Leave a Comment