Untitled
wyc1230
plain_text
9 months ago
6.7 kB
9
Indexable
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
public class BatchProcessor {
private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE";
private static final int PUBLISHER_CORE_POOL_SIZE = 5;
private static final int PUBLISHER_MAXIMUM_POOL_SIZE = 20;
private static final int PUBLISHER_KEEP_ALIVE_TIME = 60;
private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
PUBLISHER_CORE_POOL_SIZE,
PUBLISHER_MAXIMUM_POOL_SIZE,
PUBLISHER_KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public List<StreamBatchInfoDTO> updateBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) {
ProcessPath processPath = context.getProcessPath();
String traceId = context.getTraceId();
Long centerId = processPath.getCenterId();
List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
.filter(this::isPushAvailableBatch)
.map(batch -> asStreamBatchInfoDTO(batch, context))
.filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(batchInfoList)) {
return null;
}
List<List<StreamBatchInfoDTO>> results = executeInParallelWithResult(
batchInfoList,
batchInfo -> processUpdateBatch(batchInfo, processPath, traceId, centerId),
traceId,
"STREAM UPDATE BATCH"
);
return results.stream().flatMap(List::stream).collect(Collectors.toList());
}
public List<StreamBatchInfoDTO> createBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) {
List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
.filter(this::isPushAvailableBatch)
.map(batch -> asStreamBatchInfoDTO(batch, context))
.filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(batchInfoList)) {
return null;
}
executeInParallelWithResult(
batchInfoList,
batchInfo -> {
callOutboundCreateBatchApi(
context.getProcessPath(),
context.getNow(),
Collections.singletonList(batchInfo),
context.getTraceId()
);
return null; // No result to collect
},
context.getTraceId(),
"STREAM CREATE BATCH"
);
return batchInfoList;
}
private <T, R> List<R> executeInParallelWithResult(
List<T> items,
Function<T, R> taskProcessor,
String traceId,
String taskName
) {
if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
List<Future<R>> futures = items.stream()
.map(item -> threadPoolExecutor.submit(() -> taskProcessor.apply(item)))
.collect(Collectors.toList());
List<R> results = new ArrayList<>();
futures.forEach(future -> {
try {
R result = future.get();
if (result != null) {
results.add(result);
}
} catch (Exception e) {
logError(traceId, taskName, e);
}
});
return results;
} else {
return items.stream()
.map(taskProcessor)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
private List<StreamBatchInfoDTO> processUpdateBatch(
StreamBatchInfoDTO batchInfo,
ProcessPath processPath,
String traceId,
Long centerId
) {
try {
RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Collections.singletonList(batchInfo));
logInfo(traceId, "[STREAM UPDATE BATCH] REQUEST = {}", recommendBatches);
PickingResponseEntity response = postApiStreamPickingV1UpdateBatch(
recommendBatches, buildOutboundApiHeader(centerId)
);
logInfo(traceId, "[STREAM UPDATE BATCH] RESPONSE = {}", response);
if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) {
logWarn(traceId, "[STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {}", batchInfo.getBatchId(), response);
}
return recommendBatches.getBatchInfoDTOList();
} catch (Exception e) {
logError(traceId, "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId(), e);
}
return Collections.emptyList();
}
// Mock utility methods
private boolean isFeatureEnabled(String featureFlag) {
return "ENABLE_PARALLEL_UPDATE".equals(featureFlag);
}
private boolean isPushAvailableBatch(MultiStreamBatchDTO batch) {
return true; // Replace with actual logic
}
private StreamBatchInfoDTO asStreamBatchInfoDTO(MultiStreamBatchDTO batch, MultiOptimizationContext context) {
return new StreamBatchInfoDTO(); // Replace with actual logic
}
private RecommendStreamBatchDTO asRecommendStreamBatchDTO(ProcessPath processPath, List<StreamBatchInfoDTO> batchInfoList) {
return new RecommendStreamBatchDTO(); // Replace with actual logic
}
private PickingResponseEntity postApiStreamPickingV1UpdateBatch(RecommendStreamBatchDTO dto, Object headers) {
return new PickingResponseEntity(); // Replace with actual logic
}
private Object buildOutboundApiHeader(Long centerId) {
return new Object(); // Replace with actual logic
}
private void callOutboundCreateBatchApi(ProcessPath processPath, Object now, List<StreamBatchInfoDTO> batchInfoList, String traceId) {
// Implement API call
}
private void logInfo(String traceId, String message, Object... ar
Editor is loading...
Leave a Comment