Untitled

 avatar
wyc1230
plain_text
24 days ago
6.7 kB
7
Indexable
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BatchProcessor {

    private static final String FEATURE_FLAG_PARALLEL_UPDATE = "ENABLE_PARALLEL_UPDATE";
    private static final int PUBLISHER_CORE_POOL_SIZE = 5;
    private static final int PUBLISHER_MAXIMUM_POOL_SIZE = 20;
    private static final int PUBLISHER_KEEP_ALIVE_TIME = 60;

    private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
            PUBLISHER_CORE_POOL_SIZE, 
            PUBLISHER_MAXIMUM_POOL_SIZE,
            PUBLISHER_KEEP_ALIVE_TIME, 
            TimeUnit.SECONDS, 
            new SynchronousQueue<>(), 
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public List<StreamBatchInfoDTO> updateBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) {
        ProcessPath processPath = context.getProcessPath();
        String traceId = context.getTraceId();
        Long centerId = processPath.getCenterId();

        List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
                .filter(this::isPushAvailableBatch)
                .map(batch -> asStreamBatchInfoDTO(batch, context))
                .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
                .collect(Collectors.toList());

        if (CollectionUtils.isEmpty(batchInfoList)) {
            return null;
        }

        List<List<StreamBatchInfoDTO>> results = executeInParallelWithResult(
                batchInfoList,
                batchInfo -> processUpdateBatch(batchInfo, processPath, traceId, centerId),
                traceId,
                "STREAM UPDATE BATCH"
        );

        return results.stream().flatMap(List::stream).collect(Collectors.toList());
    }

    public List<StreamBatchInfoDTO> createBatch(List<MultiStreamBatchDTO> batchList, MultiOptimizationContext context) {
        List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
                .filter(this::isPushAvailableBatch)
                .map(batch -> asStreamBatchInfoDTO(batch, context))
                .filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
                .collect(Collectors.toList());

        if (CollectionUtils.isEmpty(batchInfoList)) {
            return null;
        }

        executeInParallelWithResult(
                batchInfoList,
                batchInfo -> {
                    callOutboundCreateBatchApi(
                            context.getProcessPath(),
                            context.getNow(),
                            Collections.singletonList(batchInfo),
                            context.getTraceId()
                    );
                    return null; // No result to collect
                },
                context.getTraceId(),
                "STREAM CREATE BATCH"
        );

        return batchInfoList;
    }

    private <T, R> List<R> executeInParallelWithResult(
            List<T> items,
            Function<T, R> taskProcessor,
            String traceId,
            String taskName
    ) {
        if (isFeatureEnabled(FEATURE_FLAG_PARALLEL_UPDATE)) {
            List<Future<R>> futures = items.stream()
                    .map(item -> threadPoolExecutor.submit(() -> taskProcessor.apply(item)))
                    .collect(Collectors.toList());

            List<R> results = new ArrayList<>();
            futures.forEach(future -> {
                try {
                    R result = future.get();
                    if (result != null) {
                        results.add(result);
                    }
                } catch (Exception e) {
                    logError(traceId, taskName, e);
                }
            });
            return results;
        } else {
            return items.stream()
                    .map(taskProcessor)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
        }
    }

    private List<StreamBatchInfoDTO> processUpdateBatch(
            StreamBatchInfoDTO batchInfo,
            ProcessPath processPath,
            String traceId,
            Long centerId
    ) {
        try {
            RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Collections.singletonList(batchInfo));
            logInfo(traceId, "[STREAM UPDATE BATCH] REQUEST = {}", recommendBatches);

            PickingResponseEntity response = postApiStreamPickingV1UpdateBatch(
                    recommendBatches, buildOutboundApiHeader(centerId)
            );
            logInfo(traceId, "[STREAM UPDATE BATCH] RESPONSE = {}", response);

            if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) {
                logWarn(traceId, "[STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {}", batchInfo.getBatchId(), response);
            }

            return recommendBatches.getBatchInfoDTOList();
        } catch (Exception e) {
            logError(traceId, "[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId(), e);
        }
        return Collections.emptyList();
    }

    // Mock utility methods
    private boolean isFeatureEnabled(String featureFlag) {
        return "ENABLE_PARALLEL_UPDATE".equals(featureFlag);
    }

    private boolean isPushAvailableBatch(MultiStreamBatchDTO batch) {
        return true; // Replace with actual logic
    }

    private StreamBatchInfoDTO asStreamBatchInfoDTO(MultiStreamBatchDTO batch, MultiOptimizationContext context) {
        return new StreamBatchInfoDTO(); // Replace with actual logic
    }

    private RecommendStreamBatchDTO asRecommendStreamBatchDTO(ProcessPath processPath, List<StreamBatchInfoDTO> batchInfoList) {
        return new RecommendStreamBatchDTO(); // Replace with actual logic
    }

    private PickingResponseEntity postApiStreamPickingV1UpdateBatch(RecommendStreamBatchDTO dto, Object headers) {
        return new PickingResponseEntity(); // Replace with actual logic
    }

    private Object buildOutboundApiHeader(Long centerId) {
        return new Object(); // Replace with actual logic
    }

    private void callOutboundCreateBatchApi(ProcessPath processPath, Object now, List<StreamBatchInfoDTO> batchInfoList, String traceId) {
        // Implement API call
    }

    private void logInfo(String traceId, String message, Object... ar
Leave a Comment