Untitled

 avatar
wyc1230
plain_text
a month ago
4.0 kB
4
Indexable
private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE";

private List<StreamBatchInfoDTO> updateBatch(
        List<MultiStreamBatchDTO> batchList,
        MultiOptimizationContext context
) {
    ProcessPath processPath = context.getProcessPath();
    String traceId = context.getTraceId();
    Long centerId = processPath.getCenterId();
    List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
            .filter(this::isPushAvailableBatch)
            .map(batch -> asStreamBatchInfoDTO(batch, context))
            .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
            .collect(Collectors.toList());

    if (CollectionUtils.isEmpty(batchInfoList)) {
        return null;
    }

    List<StreamBatchInfoDTO> result = Collections.synchronizedList(new ArrayList<>());
    
    // Check if the feature flag is enabled
    if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
        // Use threadPoolExecutor for parallel processing
        List<Future<Void>> futures = batchInfoList.stream()
                .map(batchInfo -> threadPoolExecutor.submit(() -> {
                    processBatchInfo(batchInfo, processPath, traceId, centerId, result);
                    return null;
                }))
                .collect(Collectors.toList());

        // Wait for all tasks to complete
        for (Future<Void> future : futures) {
            try {
                future.get(); // Wait for task completion
            } catch (Exception e) {
                log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e);
            }
        }
    } else {
        // Sequential processing
        for (StreamBatchInfoDTO batchInfo : batchInfoList) {
            processBatchInfo(batchInfo, processPath, traceId, centerId, result);
        }
    }

    return result;
}

// Helper method to process individual StreamBatchInfoDTO
private void processBatchInfo(
        StreamBatchInfoDTO batchInfo,
        ProcessPath processPath,
        String traceId,
        Long centerId,
        List<StreamBatchInfoDTO> result
) {
    try {
        RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Lists.newArrayList(batchInfo));
        multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] REQUEST = {}", traceId, JsonUtils.convertObjectToString(recommendBatches));
        PickingResponseEntity response = streamAdapter.postApiStreamPickingV1UpdateBatch(
                recommendBatches, centerFacadeService.buildOutboundApiHeader(centerId)
        );
        multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] RESPONSE = {}", traceId, JsonUtils.convertObjectToString(response));
        if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) {
            log.warn("TraceId:{} [STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {} - {} - {}", traceId, batchInfo.getBatchId(), response.getResultCode(), response.getErrorCode(), response.getMessage());
        }
        if (CollectionUtils.isNotEmpty(recommendBatches.getBatchInfoDTOList())) {
            result.addAll(recommendBatches.getBatchInfoDTOList());
        }
    } catch (ApiException e) {
        UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0496",
                "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " API ERROR! " + e.getMessage(), e);
    } catch (Exception e) {
        UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0497",
                "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " ERROR! " + e.getMessage(), e);
    }
}

// Mock feature flag check (replace with actual implementation)
private boolean isFeatureEnabled(String featureFlag) {
    // Replace this logic with your feature flag implementation
    return "ENABLE_PARALLEL_UPDATE".equals(featureFlag);
}
Editor is loading...
Leave a Comment