Untitled
wyc1230
plain_text
a month ago
4.0 kB
4
Indexable
private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE"; private List<StreamBatchInfoDTO> updateBatch( List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context ) { ProcessPath processPath = context.getProcessPath(); String traceId = context.getTraceId(); Long centerId = processPath.getCenterId(); List<StreamBatchInfoDTO> batchInfoList = batchList.stream() .filter(this::isPushAvailableBatch) .map(batch -> asStreamBatchInfoDTO(batch, context)) .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList())) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(batchInfoList)) { return null; } List<StreamBatchInfoDTO> result = Collections.synchronizedList(new ArrayList<>()); // Check if the feature flag is enabled if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) { // Use threadPoolExecutor for parallel processing List<Future<Void>> futures = batchInfoList.stream() .map(batchInfo -> threadPoolExecutor.submit(() -> { processBatchInfo(batchInfo, processPath, traceId, centerId, result); return null; })) .collect(Collectors.toList()); // Wait for all tasks to complete for (Future<Void> future : futures) { try { future.get(); // Wait for task completion } catch (Exception e) { log.error("TraceId:{} [STREAM UPDATE BATCH] Error during parallel execution: {}", traceId, e.getMessage(), e); } } } else { // Sequential processing for (StreamBatchInfoDTO batchInfo : batchInfoList) { processBatchInfo(batchInfo, processPath, traceId, centerId, result); } } return result; } // Helper method to process individual StreamBatchInfoDTO private void processBatchInfo( StreamBatchInfoDTO batchInfo, ProcessPath processPath, String traceId, Long centerId, List<StreamBatchInfoDTO> result ) { try { RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Lists.newArrayList(batchInfo)); multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] REQUEST = {}", traceId, JsonUtils.convertObjectToString(recommendBatches)); PickingResponseEntity response = streamAdapter.postApiStreamPickingV1UpdateBatch( recommendBatches, centerFacadeService.buildOutboundApiHeader(centerId) ); multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] RESPONSE = {}", traceId, JsonUtils.convertObjectToString(response)); if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) { log.warn("TraceId:{} [STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {} - {} - {}", traceId, batchInfo.getBatchId(), response.getResultCode(), response.getErrorCode(), response.getMessage()); } if (CollectionUtils.isNotEmpty(recommendBatches.getBatchInfoDTOList())) { result.addAll(recommendBatches.getBatchInfoDTOList()); } } catch (ApiException e) { UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0496", "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " API ERROR! " + e.getMessage(), e); } catch (Exception e) { UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0497", "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " ERROR! " + e.getMessage(), e); } } // Mock feature flag check (replace with actual implementation) private boolean isFeatureEnabled(String featureFlag) { // Replace this logic with your feature flag implementation return "ENABLE_PARALLEL_UPDATE".equals(featureFlag); }
Editor is loading...
Leave a Comment