Untitled

 avatar
wyc1230
plain_text
a month ago
2.6 kB
5
Indexable
	private final static int PUBLISHER_CORE_POOL_SIZE = 5;
	private final static int PUBLISHER_MAXIMUM_POOL_SIZE = 20;
	private final static int PUBLISHER_KEEP_ALIVE_TIME = 60;

	private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(PUBLISHER_CORE_POOL_SIZE, PUBLISHER_MAXIMUM_POOL_SIZE,
		PUBLISHER_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());


	private List<StreamBatchInfoDTO> updateBatch(
			List<MultiStreamBatchDTO> batchList,
			MultiOptimizationContext context
	) {
		ProcessPath processPath = context.getProcessPath();
		String traceId = context.getTraceId();
		Long centerId = processPath.getCenterId();
		List<StreamBatchInfoDTO> batchInfoList = batchList.stream()
				.filter(this::isPushAvailableBatch)
				.map(batch -> asStreamBatchInfoDTO(batch, context))
				.filter(batch -> CollectionUtils.isNotEmpty(batch.getPickerJobDTOList()))
				.collect(Collectors.toList());

		if (CollectionUtils.isEmpty(batchInfoList)) {
			return null;
		}
		List<StreamBatchInfoDTO> result = Lists.newArrayList();
		for (StreamBatchInfoDTO batchInfo : batchInfoList) {
			try {
				RecommendStreamBatchDTO recommendBatches = asRecommendStreamBatchDTO(processPath, Lists.newArrayList(batchInfo));
				multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] REQUEST = {}", traceId, JsonUtils.convertObjectToString(recommendBatches));
				PickingResponseEntity response = streamAdapter.postApiStreamPickingV1UpdateBatch(
						recommendBatches, centerFacadeService.buildOutboundApiHeader(centerId)
				);
				multiStreamLogger.info("TraceId:{} [STREAM UPDATE BATCH] RESPONSE = {}", traceId, JsonUtils.convertObjectToString(response));
				if (!Objects.equals(response.getResultCode(), RestResultCode.SUCCESS.getValue())) {
					log.warn("TraceId:{} [STREAM UPDATE BATCH] BATCH ID {} RESPONSE ERROR! {} - {} - {}", traceId, batchInfo.getBatchId(), response.getResultCode(), response.getErrorCode(), response.getMessage());
				}
				if (CollectionUtils.isNotEmpty(recommendBatches.getBatchInfoDTOList())) {
					result.addAll(recommendBatches.getBatchInfoDTOList());
				}
			} catch (ApiException e) {
				UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0496",
						"[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " API ERROR! " + e.getMessage(), e);
			} catch (Exception e) {
				UehMetricAndLogWrapper.metricAndLogWithErrorCode("WOB-ERR-0497",
						"[STREAM UPDATE BATCH] BATCH ID " + batchInfo.getBatchId() + " ERROR! " + e.getMessage(), e);
			}
		}
		return result;
	}
Leave a Comment