Untitled
unknown
plain_text
2 years ago
13 kB
11
Indexable
package com.amazon.physalia.processingservice.processors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.measure.unit.Unit;
import com.amazon.coral.client.Calls;
import com.amazon.coral.metrics.Metrics;
import com.amazon.ebs.clients.healthtracker.Constants;
import com.amazon.ebs.physaliahealthtracker.PhysaliaHealthTrackerServiceClient;
import com.amazon.ebs.physaliahealthtracker.UpdateNodeOperation;
import com.amazon.ebs.physaliahealthtracker.UpdateNodeStateRequest;
import com.amazon.ebs.physaliahealthtracker.impl.UpdateNodeStateCall;
import com.amazon.physalia.ServiceAddress;
import com.amazon.physalia.colonydata.NodeInfo;
import com.amazon.physalia.colonydataservice.colonydata.config.Config;
import com.amazon.physalia.colonydata.CellInfo;
import com.amazon.physalia.colonydataservice.colonydata.nodesweep.NodeScanContext;
import com.amazon.physalia.colonydataservice.colonydata.oldrepair.CellStateClient;
import com.amazon.physalia.processingservice.AbstractBaseProcessor;
import com.amazon.physalia.processingservice.ProcessorFailedException;
import com.amazon.physalia.processingservice.handlers.AsyncProcessor;
import com.amazon.physaliahive.CellState;
/**
* Processor that contains the logic to determine if a node is safe for isolation.
* https://w.amazon.com/index.php/EC2/EBS/Projects/Oracle/LocalBlessings/Health_Tracker_Is_Node_Isolation_Safe
*/
@Slf4j
@Singleton
public class IsNodeIsolationSafeProcessor extends AbstractBaseProcessor implements AsyncProcessor {
private final CellStateClient cellStateClient;
private final Config config;
private final PhysaliaHealthTrackerServiceClient healthTrackerServiceClient;
private final static String NAME = "IS-NODE-ISOLATION-SAFE";
private final static String CELLS_NOT_FOUND_IN_CACHE = "CellsNotFoundInCache";
private final static String ISOLATION_SAFE_NODE_COUNT = "IsolationSafeNodeCount";
private final static String ISOLATION_REASON = "NodeIsolationReason-%s";
private final static String ISOLATIONS_DISABLED = "IsolationsDisabled";
private final static String BATCH_UPDATE_FAILED = "BatchUpdateFailed";
private final static String UNREACHABLE_DP = "UnreachableDpInNodeIsolationSafeProcessor";
private final static int BATCH_SIZE = 10;
public enum IsolationSafeReason {
NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY,
NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS,
NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY,
ONLY_ERASED_OR_ORPHAN_CELLS
}
@Inject
public IsNodeIsolationSafeProcessor(CellStateClient cellStateClient, Config config, PhysaliaHealthTrackerServiceClient healthTrackerServiceClient) {
// isAffectedByNodeScanThreshold is set True to signify that the Processor's execution depends on
// whether there was a significant percentage of successful node scans in the current sweep
super(true);
this.cellStateClient = cellStateClient;
this.config = config;
this.healthTrackerServiceClient = healthTrackerServiceClient;
}
@Override
public String getId() {
return NAME;
}
@Override
protected void processInternal(Metrics metrics, NodeScanContext ctx) throws InterruptedException, TimeoutException, ProcessorFailedException {
List<String> nodesSafeForIsolation = new ArrayList<>();
try {
nodesSafeForIsolation = getNodesSafeForIsolation(ctx, metrics);
updateNodesAsSafeToIsolate(ctx.getScanId(), nodesSafeForIsolation, metrics);
} finally {
metrics.addCount(ISOLATION_SAFE_NODE_COUNT, nodesSafeForIsolation.size(), Unit.ONE);
metrics.addCount(ISOLATIONS_DISABLED, 0, Unit.ONE);
for (IsolationSafeReason reason : IsolationSafeReason.values()) {
metrics.addCount(String.format(ISOLATION_REASON, reason.toString()), 0, Unit.ONE);
}
}
}
@VisibleForTesting
protected List<String> getNodesSafeForIsolation(NodeScanContext ctx, Metrics metrics) {
Map<String, Set<String>> dpViewOfRecyclingNodes = getDPViewOfRecyclingNodes(ctx, metrics);
Set<String> nodeIdsInRecycling = ctx.getNodesInRecycling();
List<String> nodesSafeToIsolate = new ArrayList<>();
for (String nodeId : nodeIdsInRecycling) {
boolean safeToIsolate = false;
boolean doesNodeHaveNoCellsOrOrphanCellsOnly = false;
NodeInfo nodeInfo = ctx.getNodeInfoMap().get(nodeId);
// We treat a node reachable only when all pages returned successfully during scan. This is in line with existing behavior in HT
boolean isReachable = nodeInfo.isReachable();
boolean isLocalViewEmpty = nodeInfo.getCells().isEmpty();
boolean isDPViewEmpty = isDPViewEmpty(dpViewOfRecyclingNodes, nodeId);
if (nodeInfo.isSubzoneBlockedFromIsolation()) {
safeToIsolate = false;
}
//Row 8
else if (isDPViewEmpty && isLocalViewEmpty && !isReachable) {
metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE);
safeToIsolate = true;
}
// Row 5 & 6
else if (isLocalViewEmpty && isReachable) {
if (isDPViewEmpty) {
metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE);
} else {
metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS.toString()), 1, Unit.ONE);
}
safeToIsolate = true;
} else {
//At this point, we want to take a union of the local view of cells and dp view to check if it is either empty
// or has only erased cells.
doesNodeHaveNoCellsOrOrphanCellsOnly = doesNodeHaveNoCellsOrOrphanCellsOnly(
isDPViewEmpty, isLocalViewEmpty, dpViewOfRecyclingNodes, nodeInfo, metrics);
if (doesNodeHaveNoCellsOrOrphanCellsOnly) {
metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.ONLY_ERASED_OR_ORPHAN_CELLS.toString()), 1, Unit.ONE);
safeToIsolate = true;
}
}
if (safeToIsolate) {
nodesSafeToIsolate.add(nodeId);
}
log.info(
"IsSafeToIsolate result for node {} safeToIsolate {}. isSubzoneBlockedFromIsolation {} isDPViewEmpty {} isLocalViewEmpty {} isReachable {} doesNodeHaveNoCellsOrOrphanCellsOnly {}",
nodeId, safeToIsolate, nodeInfo.isSubzoneBlockedFromIsolation(), isDPViewEmpty, isLocalViewEmpty, isReachable, doesNodeHaveNoCellsOrOrphanCellsOnly);
}
return nodesSafeToIsolate;
}
protected boolean doesNodeHaveNoCellsOrOrphanCellsOnly(boolean isDPViewEmpty, boolean isLocalViewEmpty, Map<String, Set<String>> dpViewOfNodes,
NodeInfo nodeInfo, Metrics metrics) {
//The DP view and local view could intersect, in such cases a CompositeSet breaks and requires one of the sets to drop the intersecting keys from within it
//To avoid that going with copying references to a new set.
Set<String> cellsFromDPAndLocalView = new HashSet<>();
String nodeId = nodeInfo.getId();
if (!isDPViewEmpty) {
cellsFromDPAndLocalView.addAll(dpViewOfNodes.get(nodeId));
}
if (!isLocalViewEmpty) {
cellsFromDPAndLocalView.addAll(nodeInfo.getCells().keySet());
}
if (cellsFromDPAndLocalView.isEmpty()) {
return true;
}
Map<String, CellStateClient.State> cellStatesByCellId = cellStateClient.getCellStates(cellsFromDPAndLocalView, metrics);
if (cellsFromDPAndLocalView.size() > cellStatesByCellId.size()) {
log.error("Mismatch found! cellsFromDPAndLocalView {} cellStatesByCellId from cache {}", cellsFromDPAndLocalView.size(), cellStatesByCellId.size());
metrics.addCount(CELLS_NOT_FOUND_IN_CACHE, 1, Unit.ONE);
// If entry in cell cache was expired/evicted and cds failed to reach hive, then this node could falsely be marked for isIsolationSafe == true.
// Cell are never deleted off the cell table we should always be able to verify the cell states.
// Therefore, if cell information was not retrieved then we return false here to avoid falsely setting isIsolationSafe to true.
return false;
}
return
cellStatesByCellId.entrySet().stream().allMatch(
entry -> entry.getValue().goalState.equalsIgnoreCase(CellState.ERASED));
}
private boolean isDPViewEmpty(Map<String, Set<String>> dpViewOfNodes, String nodeId) {
return !dpViewOfNodes.containsKey(nodeId) || dpViewOfNodes.get(nodeId).isEmpty();
}
@VisibleForTesting
protected void updateNodesAsSafeToIsolate(String scanId, List<String> nodesSafeToIsolate, Metrics metrics) throws ProcessorFailedException {
if (config.areIsolationsEnabled()) {
boolean batchFailed = false;
metrics.addCount(BATCH_UPDATE_FAILED, 0, Unit.ONE);
log.info("Number of nodes to isolate : {}", nodesSafeToIsolate.size());
List<List<String>> batchedNodeIdsForIsolation = Lists.partition(nodesSafeToIsolate, BATCH_SIZE);
for (List<String> batchOfNodeIds : batchedNodeIdsForIsolation) {
try {
UpdateNodeStateRequest updateNodeStateRequest = UpdateNodeStateRequest.builder().withNodeIds(batchOfNodeIds).withUpdateNodeOperation(
UpdateNodeOperation.MARK_ISOLATION_SAFE).withRequestId(scanId).build();
UpdateNodeStateCall updateNodeStateCall = healthTrackerServiceClient.newUpdateNodeStateCall();
updateNodeStateCall.setRequestId(scanId);
updateNodeStateCall.setMetrics(metrics);
updateNodeStateCall.addAttachment(Calls.retry(Constants.DEFAULT_RETRY_STRATEGY));
updateNodeStateCall.call(updateNodeStateRequest);
} catch (Exception ex) {
batchFailed = true;
metrics.addCount(BATCH_UPDATE_FAILED, 1, Unit.ONE);
log.error("Caught exception while calling HT", ex);
}
}
if (batchFailed) {
throw new ProcessorFailedException("One or more batch calls to HT for marking nodes as isolationSafe failed!");
}
} else {
metrics.addCount(ISOLATIONS_DISABLED, 1, Unit.ONE);
}
}
protected Map<String, Set<String>> getDPViewOfRecyclingNodes(NodeScanContext context, Metrics metrics) {
Map<String, Set<String>> nodeIdToSetOfCellsFromDPView = new HashMap<>();
for (Map.Entry<String, CellInfo> entry : context.getCellInfoMap().entrySet()) {
Set<ServiceAddress> jurors;
if (entry.getValue().isDPReachable()) {
jurors = entry.getValue().getJurorsBasedOnDp();
} else {
jurors = entry.getValue().getJurors();
metrics.addCount(UNREACHABLE_DP, 1, Unit.ONE);
}
Sets.SetView<String> intersection = Sets.intersection(context.getNodesInRecycling(),
jurors.stream().map(sa -> sa.getNodeId()).collect(Collectors.toSet()));
intersection.stream().forEach(nodeId -> {
nodeIdToSetOfCellsFromDPView.putIfAbsent(nodeId, new HashSet<>());
nodeIdToSetOfCellsFromDPView.get(nodeId).add(entry.getKey());
});
}
return nodeIdToSetOfCellsFromDPView;
}
}
Editor is loading...
Leave a Comment