Untitled
wyc1230
plain_text
a year ago
4.0 kB
17
Indexable
private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE";
private List<StreamBatchInfoDTO> updateBatch(
List<MultiStreamBatchDTO> batchList,
MultiOptimizationContext context
) {
ProcessPath processPath = context.getProcessPath();
String traceId = context.getTraceId();
Long centerId = processPath.getCenterId();
List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
.filter(this::isPushAvailableBatch)
.map(batch -> asStreamBatchInfoDTO(batch, context))
.filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(batchInfoList)) {
return null;
}
List<StreamBatchInfoDTO> result = Collections.synchronizedList(new ArrayList<>());
// Check if the feature flag is enabled
if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
// Use threadPoolExecutor for parallel processing
List<Future<Void>> futures = batchInfoList.stream()
.map(batchInfo -> threadPoolExecutor.submit(() -> {
processBatchInfo(batchInfo, processPath, traceId, centerId, result);
return null;
}))
.collect(Collectors.toList());
// Wait for all tasks to complete
for (Future<Void> future : futures) {
try {
future.get(); // Wait for task completion
} catch (Exception e) {
log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e);
}
}
} else {
// Sequential processing
for (StreamBatchInfoDTO batchInfo : batchInfoList) {
processBatchInfo(batchInfo, processPath, traceId, centerId, result);
}
}
return result;
}
// Helper method to process individual StreamBatchInfoDTO
private void processBatchInfo(
StreamBatchInfoDTO batchInfo,
ProcessPath processPath,
String traceId,
Long centerId,
List<StreamBatchInfoDTO> result
) {
try {
RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Lists.newArrayList(batchInfo));
multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] REQUEST = {}", traceId, JsonUtils.convertObjectToString(recommendBatches));
PickingResponseEntity response = streamAdapter.postApiStreamPickingV1UpdateBatch(
recommendBatches, centerFacadeService.buildOutboundApiHeader(centerId)
);
multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] RESPONSE = {}", traceId, JsonUtils.convertObjectToString(response));
if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) {
log.warn("TraceId:{} [STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {} - {} - {}", traceId, batchInfo.getBatchId(), response.getResultCode(), response.getErrorCode(), response.getMessage());
}
if (CollectionUtils.isNotEmpty(recommendBatches.getBatchInfoDTOList())) {
result.addAll(recommendBatches.getBatchInfoDTOList());
}
} catch (ApiException e) {
UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0496",
"[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " API ERROR! " + e.getMessage(), e);
} catch (Exception e) {
UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0497",
"[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " ERROR! " + e.getMessage(), e);
}
}
// Mock feature flag check (replace with actual implementation)
private boolean isFeatureEnabled(String featureFlag) {
// Replace this logic with your feature flag implementation
return "ENABLE_PARALLEL_UPDATE".equals(featureFlag);
}
Editor is loading...
Leave a Comment