Untitled
unknown
plain_text
a year ago
13 kB
10
Indexable
package com.amazon.physalia.processingservice.processors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Singleton; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.measure.unit.Unit; import com.amazon.coral.client.Calls; import com.amazon.coral.metrics.Metrics; import com.amazon.ebs.clients.healthtracker.Constants; import com.amazon.ebs.physaliahealthtracker.PhysaliaHealthTrackerServiceClient; import com.amazon.ebs.physaliahealthtracker.UpdateNodeOperation; import com.amazon.ebs.physaliahealthtracker.UpdateNodeStateRequest; import com.amazon.ebs.physaliahealthtracker.impl.UpdateNodeStateCall; import com.amazon.physalia.ServiceAddress; import com.amazon.physalia.colonydata.NodeInfo; import com.amazon.physalia.colonydataservice.colonydata.config.Config; import com.amazon.physalia.colonydata.CellInfo; import com.amazon.physalia.colonydataservice.colonydata.nodesweep.NodeScanContext; import com.amazon.physalia.colonydataservice.colonydata.oldrepair.CellStateClient; import com.amazon.physalia.processingservice.AbstractBaseProcessor; import com.amazon.physalia.processingservice.ProcessorFailedException; import com.amazon.physalia.processingservice.handlers.AsyncProcessor; import com.amazon.physaliahive.CellState; /** * Processor that contains the logic to determine if a node is safe for isolation. * https://w.amazon.com/index.php/EC2/EBS/Projects/Oracle/LocalBlessings/Health_Tracker_Is_Node_Isolation_Safe */ @Slf4j @Singleton public class IsNodeIsolationSafeProcessor extends AbstractBaseProcessor implements AsyncProcessor { private final CellStateClient cellStateClient; private final Config config; private final PhysaliaHealthTrackerServiceClient healthTrackerServiceClient; private final static String NAME = "IS-NODE-ISOLATION-SAFE"; private final static String CELLS_NOT_FOUND_IN_CACHE = "CellsNotFoundInCache"; private final static String ISOLATION_SAFE_NODE_COUNT = "IsolationSafeNodeCount"; private final static String ISOLATION_REASON = "NodeIsolationReason-%s"; private final static String ISOLATIONS_DISABLED = "IsolationsDisabled"; private final static String BATCH_UPDATE_FAILED = "BatchUpdateFailed"; private final static String UNREACHABLE_DP = "UnreachableDpInNodeIsolationSafeProcessor"; private final static int BATCH_SIZE = 10; public enum IsolationSafeReason { NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY, NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS, NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY, ONLY_ERASED_OR_ORPHAN_CELLS } @Inject public IsNodeIsolationSafeProcessor(CellStateClient cellStateClient, Config config, PhysaliaHealthTrackerServiceClient healthTrackerServiceClient) { // isAffectedByNodeScanThreshold is set True to signify that the Processor's execution depends on // whether there was a significant percentage of successful node scans in the current sweep super(true); this.cellStateClient = cellStateClient; this.config = config; this.healthTrackerServiceClient = healthTrackerServiceClient; } @Override public String getId() { return NAME; } @Override protected void processInternal(Metrics metrics, NodeScanContext ctx) throws InterruptedException, TimeoutException, ProcessorFailedException { List<String> nodesSafeForIsolation = new ArrayList<>(); try { nodesSafeForIsolation = getNodesSafeForIsolation(ctx, metrics); updateNodesAsSafeToIsolate(ctx.getScanId(), nodesSafeForIsolation, metrics); } finally { metrics.addCount(ISOLATION_SAFE_NODE_COUNT, nodesSafeForIsolation.size(), Unit.ONE); metrics.addCount(ISOLATIONS_DISABLED, 0, Unit.ONE); for (IsolationSafeReason reason : IsolationSafeReason.values()) { metrics.addCount(String.format(ISOLATION_REASON, reason.toString()), 0, Unit.ONE); } } } @VisibleForTesting protected List<String> getNodesSafeForIsolation(NodeScanContext ctx, Metrics metrics) { Map<String, Set<String>> dpViewOfRecyclingNodes = getDPViewOfRecyclingNodes(ctx, metrics); Set<String> nodeIdsInRecycling = ctx.getNodesInRecycling(); List<String> nodesSafeToIsolate = new ArrayList<>(); for (String nodeId : nodeIdsInRecycling) { boolean safeToIsolate = false; boolean doesNodeHaveNoCellsOrOrphanCellsOnly = false; NodeInfo nodeInfo = ctx.getNodeInfoMap().get(nodeId); // We treat a node reachable only when all pages returned successfully during scan. This is in line with existing behavior in HT boolean isReachable = nodeInfo.isReachable(); boolean isLocalViewEmpty = nodeInfo.getCells().isEmpty(); boolean isDPViewEmpty = isDPViewEmpty(dpViewOfRecyclingNodes, nodeId); if (nodeInfo.isSubzoneBlockedFromIsolation()) { safeToIsolate = false; } //Row 8 else if (isDPViewEmpty && isLocalViewEmpty && !isReachable) { metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE); safeToIsolate = true; } // Row 5 & 6 else if (isLocalViewEmpty && isReachable) { if (isDPViewEmpty) { metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE); } else { metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS.toString()), 1, Unit.ONE); } safeToIsolate = true; } else { //At this point, we want to take a union of the local view of cells and dp view to check if it is either empty // or has only erased cells. doesNodeHaveNoCellsOrOrphanCellsOnly = doesNodeHaveNoCellsOrOrphanCellsOnly( isDPViewEmpty, isLocalViewEmpty, dpViewOfRecyclingNodes, nodeInfo, metrics); if (doesNodeHaveNoCellsOrOrphanCellsOnly) { metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.ONLY_ERASED_OR_ORPHAN_CELLS.toString()), 1, Unit.ONE); safeToIsolate = true; } } if (safeToIsolate) { nodesSafeToIsolate.add(nodeId); } log.info( "IsSafeToIsolate result for node {} safeToIsolate {}. isSubzoneBlockedFromIsolation {} isDPViewEmpty {} isLocalViewEmpty {} isReachable {} doesNodeHaveNoCellsOrOrphanCellsOnly {}", nodeId, safeToIsolate, nodeInfo.isSubzoneBlockedFromIsolation(), isDPViewEmpty, isLocalViewEmpty, isReachable, doesNodeHaveNoCellsOrOrphanCellsOnly); } return nodesSafeToIsolate; } protected boolean doesNodeHaveNoCellsOrOrphanCellsOnly(boolean isDPViewEmpty, boolean isLocalViewEmpty, Map<String, Set<String>> dpViewOfNodes, NodeInfo nodeInfo, Metrics metrics) { //The DP view and local view could intersect, in such cases a CompositeSet breaks and requires one of the sets to drop the intersecting keys from within it //To avoid that going with copying references to a new set. Set<String> cellsFromDPAndLocalView = new HashSet<>(); String nodeId = nodeInfo.getId(); if (!isDPViewEmpty) { cellsFromDPAndLocalView.addAll(dpViewOfNodes.get(nodeId)); } if (!isLocalViewEmpty) { cellsFromDPAndLocalView.addAll(nodeInfo.getCells().keySet()); } if (cellsFromDPAndLocalView.isEmpty()) { return true; } Map<String, CellStateClient.State> cellStatesByCellId = cellStateClient.getCellStates(cellsFromDPAndLocalView, metrics); if (cellsFromDPAndLocalView.size() > cellStatesByCellId.size()) { log.error("Mismatch found! cellsFromDPAndLocalView {} cellStatesByCellId from cache {}", cellsFromDPAndLocalView.size(), cellStatesByCellId.size()); metrics.addCount(CELLS_NOT_FOUND_IN_CACHE, 1, Unit.ONE); // If entry in cell cache was expired/evicted and cds failed to reach hive, then this node could falsely be marked for isIsolationSafe == true. // Cell are never deleted off the cell table we should always be able to verify the cell states. // Therefore, if cell information was not retrieved then we return false here to avoid falsely setting isIsolationSafe to true. return false; } return cellStatesByCellId.entrySet().stream().allMatch( entry -> entry.getValue().goalState.equalsIgnoreCase(CellState.ERASED)); } private boolean isDPViewEmpty(Map<String, Set<String>> dpViewOfNodes, String nodeId) { return !dpViewOfNodes.containsKey(nodeId) || dpViewOfNodes.get(nodeId).isEmpty(); } @VisibleForTesting protected void updateNodesAsSafeToIsolate(String scanId, List<String> nodesSafeToIsolate, Metrics metrics) throws ProcessorFailedException { if (config.areIsolationsEnabled()) { boolean batchFailed = false; metrics.addCount(BATCH_UPDATE_FAILED, 0, Unit.ONE); log.info("Number of nodes to isolate : {}", nodesSafeToIsolate.size()); List<List<String>> batchedNodeIdsForIsolation = Lists.partition(nodesSafeToIsolate, BATCH_SIZE); for (List<String> batchOfNodeIds : batchedNodeIdsForIsolation) { try { UpdateNodeStateRequest updateNodeStateRequest = UpdateNodeStateRequest.builder().withNodeIds(batchOfNodeIds).withUpdateNodeOperation( UpdateNodeOperation.MARK_ISOLATION_SAFE).withRequestId(scanId).build(); UpdateNodeStateCall updateNodeStateCall = healthTrackerServiceClient.newUpdateNodeStateCall(); updateNodeStateCall.setRequestId(scanId); updateNodeStateCall.setMetrics(metrics); updateNodeStateCall.addAttachment(Calls.retry(Constants.DEFAULT_RETRY_STRATEGY)); updateNodeStateCall.call(updateNodeStateRequest); } catch (Exception ex) { batchFailed = true; metrics.addCount(BATCH_UPDATE_FAILED, 1, Unit.ONE); log.error("Caught exception while calling HT", ex); } } if (batchFailed) { throw new ProcessorFailedException("One or more batch calls to HT for marking nodes as isolationSafe failed!"); } } else { metrics.addCount(ISOLATIONS_DISABLED, 1, Unit.ONE); } } protected Map<String, Set<String>> getDPViewOfRecyclingNodes(NodeScanContext context, Metrics metrics) { Map<String, Set<String>> nodeIdToSetOfCellsFromDPView = new HashMap<>(); for (Map.Entry<String, CellInfo> entry : context.getCellInfoMap().entrySet()) { Set<ServiceAddress> jurors; if (entry.getValue().isDPReachable()) { jurors = entry.getValue().getJurorsBasedOnDp(); } else { jurors = entry.getValue().getJurors(); metrics.addCount(UNREACHABLE_DP, 1, Unit.ONE); } Sets.SetView<String> intersection = Sets.intersection(context.getNodesInRecycling(), jurors.stream().map(sa -> sa.getNodeId()).collect(Collectors.toSet())); intersection.stream().forEach(nodeId -> { nodeIdToSetOfCellsFromDPView.putIfAbsent(nodeId, new HashSet<>()); nodeIdToSetOfCellsFromDPView.get(nodeId).add(entry.getKey()); }); } return nodeIdToSetOfCellsFromDPView; } }
Editor is loading...
Leave a Comment