Untitled

 avatar
unknown
plain_text
a year ago
13 kB
10
Indexable
package com.amazon.physalia.processingservice.processors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.measure.unit.Unit;
import com.amazon.coral.client.Calls;
import com.amazon.coral.metrics.Metrics;
import com.amazon.ebs.clients.healthtracker.Constants;
import com.amazon.ebs.physaliahealthtracker.PhysaliaHealthTrackerServiceClient;
import com.amazon.ebs.physaliahealthtracker.UpdateNodeOperation;
import com.amazon.ebs.physaliahealthtracker.UpdateNodeStateRequest;
import com.amazon.ebs.physaliahealthtracker.impl.UpdateNodeStateCall;
import com.amazon.physalia.ServiceAddress;
import com.amazon.physalia.colonydata.NodeInfo;
import com.amazon.physalia.colonydataservice.colonydata.config.Config;
import com.amazon.physalia.colonydata.CellInfo;
import com.amazon.physalia.colonydataservice.colonydata.nodesweep.NodeScanContext;
import com.amazon.physalia.colonydataservice.colonydata.oldrepair.CellStateClient;
import com.amazon.physalia.processingservice.AbstractBaseProcessor;
import com.amazon.physalia.processingservice.ProcessorFailedException;
import com.amazon.physalia.processingservice.handlers.AsyncProcessor;
import com.amazon.physaliahive.CellState;

/**
 * Processor that contains the logic to determine if a node is safe for isolation.
 * https://w.amazon.com/index.php/EC2/EBS/Projects/Oracle/LocalBlessings/Health_Tracker_Is_Node_Isolation_Safe
 */
@Slf4j
@Singleton
public class IsNodeIsolationSafeProcessor extends AbstractBaseProcessor implements AsyncProcessor {

    private final CellStateClient cellStateClient;
    private final Config config;
    private final PhysaliaHealthTrackerServiceClient healthTrackerServiceClient;
    private final static String NAME = "IS-NODE-ISOLATION-SAFE";
    private final static String CELLS_NOT_FOUND_IN_CACHE = "CellsNotFoundInCache";
    private final static String ISOLATION_SAFE_NODE_COUNT = "IsolationSafeNodeCount";
    private final static String ISOLATION_REASON = "NodeIsolationReason-%s";
    private final static String ISOLATIONS_DISABLED = "IsolationsDisabled";
    private final static String BATCH_UPDATE_FAILED = "BatchUpdateFailed";
    private final static String UNREACHABLE_DP = "UnreachableDpInNodeIsolationSafeProcessor";

    private final static int BATCH_SIZE = 10;

    public enum IsolationSafeReason {
        NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY,
        NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS,
        NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY,
        ONLY_ERASED_OR_ORPHAN_CELLS
    }

    @Inject
    public IsNodeIsolationSafeProcessor(CellStateClient cellStateClient, Config config, PhysaliaHealthTrackerServiceClient healthTrackerServiceClient) {
        // isAffectedByNodeScanThreshold is set True to signify that the Processor's execution depends on
        // whether there was a significant percentage of successful node scans in the current sweep
        super(true);
        this.cellStateClient = cellStateClient;
        this.config = config;
        this.healthTrackerServiceClient = healthTrackerServiceClient;
    }

    @Override
    public String getId() {
        return NAME;
    }

    @Override
    protected void processInternal(Metrics metrics, NodeScanContext ctx) throws InterruptedException, TimeoutException, ProcessorFailedException {
        List<String> nodesSafeForIsolation = new ArrayList<>();
        try {
            nodesSafeForIsolation = getNodesSafeForIsolation(ctx, metrics);
            updateNodesAsSafeToIsolate(ctx.getScanId(), nodesSafeForIsolation, metrics);
        } finally {
            metrics.addCount(ISOLATION_SAFE_NODE_COUNT, nodesSafeForIsolation.size(), Unit.ONE);
            metrics.addCount(ISOLATIONS_DISABLED, 0, Unit.ONE);
            for (IsolationSafeReason reason : IsolationSafeReason.values()) {
                metrics.addCount(String.format(ISOLATION_REASON, reason.toString()), 0, Unit.ONE);
            }
        }
    }

    @VisibleForTesting
    protected List<String> getNodesSafeForIsolation(NodeScanContext ctx, Metrics metrics) {

        Map<String, Set<String>> dpViewOfRecyclingNodes = getDPViewOfRecyclingNodes(ctx, metrics);

        Set<String> nodeIdsInRecycling = ctx.getNodesInRecycling();
        List<String> nodesSafeToIsolate = new ArrayList<>();
        for (String nodeId : nodeIdsInRecycling) {
            boolean safeToIsolate = false;
            boolean doesNodeHaveNoCellsOrOrphanCellsOnly = false;
            NodeInfo nodeInfo = ctx.getNodeInfoMap().get(nodeId);

            // We treat a node reachable only when all pages returned successfully during scan. This is in line with existing behavior in HT
            boolean isReachable = nodeInfo.isReachable();

            boolean isLocalViewEmpty = nodeInfo.getCells().isEmpty();
            boolean isDPViewEmpty = isDPViewEmpty(dpViewOfRecyclingNodes, nodeId);

            if (nodeInfo.isSubzoneBlockedFromIsolation()) {
                safeToIsolate = false;
            }
            //Row 8
            else if (isDPViewEmpty && isLocalViewEmpty && !isReachable) {
                metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_UNREACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE);
                safeToIsolate = true;
            }

            // Row  5 & 6
            else if (isLocalViewEmpty && isReachable) {
                if (isDPViewEmpty) {
                    metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EMPTY.toString()), 1, Unit.ONE);
                } else {
                    metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.NODE_REACHABLE_LOCALVIEW_EMPTY_DPVIEW_EXISTS.toString()), 1, Unit.ONE);
                }
                safeToIsolate = true;
            } else {
                //At this point, we want to take a union of the local view of cells and dp view to check if it is either empty
                // or has only erased cells.
                doesNodeHaveNoCellsOrOrphanCellsOnly = doesNodeHaveNoCellsOrOrphanCellsOnly(
                    isDPViewEmpty, isLocalViewEmpty, dpViewOfRecyclingNodes, nodeInfo, metrics);
                if (doesNodeHaveNoCellsOrOrphanCellsOnly) {
                    metrics.addCount(String.format(ISOLATION_REASON, IsolationSafeReason.ONLY_ERASED_OR_ORPHAN_CELLS.toString()), 1, Unit.ONE);
                    safeToIsolate = true;
                }
            }

            if (safeToIsolate) {
                nodesSafeToIsolate.add(nodeId);
            }
            log.info(
                "IsSafeToIsolate result for node {} safeToIsolate {}. isSubzoneBlockedFromIsolation {} isDPViewEmpty {} isLocalViewEmpty {} isReachable {} doesNodeHaveNoCellsOrOrphanCellsOnly {}",
                nodeId, safeToIsolate, nodeInfo.isSubzoneBlockedFromIsolation(), isDPViewEmpty, isLocalViewEmpty, isReachable, doesNodeHaveNoCellsOrOrphanCellsOnly);
        }
        return nodesSafeToIsolate;
    }

    protected boolean doesNodeHaveNoCellsOrOrphanCellsOnly(boolean isDPViewEmpty, boolean isLocalViewEmpty, Map<String, Set<String>> dpViewOfNodes,
        NodeInfo nodeInfo, Metrics metrics) {
        //The DP view and local view could intersect, in such cases a CompositeSet breaks and requires one of the sets to drop the intersecting keys from within it
        //To avoid that going with copying references to a new set.
        Set<String> cellsFromDPAndLocalView = new HashSet<>();
        String nodeId = nodeInfo.getId();
        if (!isDPViewEmpty) {
            cellsFromDPAndLocalView.addAll(dpViewOfNodes.get(nodeId));
        }

        if (!isLocalViewEmpty) {
            cellsFromDPAndLocalView.addAll(nodeInfo.getCells().keySet());
        }

        if (cellsFromDPAndLocalView.isEmpty()) {
            return true;
        }

        Map<String, CellStateClient.State> cellStatesByCellId = cellStateClient.getCellStates(cellsFromDPAndLocalView, metrics);

        if (cellsFromDPAndLocalView.size() > cellStatesByCellId.size()) {
            log.error("Mismatch found! cellsFromDPAndLocalView {} cellStatesByCellId from cache {}", cellsFromDPAndLocalView.size(), cellStatesByCellId.size());
            metrics.addCount(CELLS_NOT_FOUND_IN_CACHE, 1, Unit.ONE);

            // If entry in cell cache was expired/evicted and cds failed to reach hive, then this node could falsely be marked for isIsolationSafe == true.
            // Cell are never deleted off the cell table we should always be able to verify the cell states.
            // Therefore, if cell information was not retrieved then we return false here to avoid falsely setting isIsolationSafe to true.
            return false;
        }

        return
            cellStatesByCellId.entrySet().stream().allMatch(
                entry -> entry.getValue().goalState.equalsIgnoreCase(CellState.ERASED));
    }

    private boolean isDPViewEmpty(Map<String, Set<String>> dpViewOfNodes, String nodeId) {
        return !dpViewOfNodes.containsKey(nodeId) || dpViewOfNodes.get(nodeId).isEmpty();
    }

    @VisibleForTesting
    protected void updateNodesAsSafeToIsolate(String scanId, List<String> nodesSafeToIsolate, Metrics metrics) throws ProcessorFailedException {
        if (config.areIsolationsEnabled()) {

            boolean batchFailed = false;
            metrics.addCount(BATCH_UPDATE_FAILED, 0, Unit.ONE);
            log.info("Number of nodes to isolate : {}", nodesSafeToIsolate.size());
            List<List<String>> batchedNodeIdsForIsolation = Lists.partition(nodesSafeToIsolate, BATCH_SIZE);
            for (List<String> batchOfNodeIds : batchedNodeIdsForIsolation) {

                try {
                    UpdateNodeStateRequest updateNodeStateRequest = UpdateNodeStateRequest.builder().withNodeIds(batchOfNodeIds).withUpdateNodeOperation(
                        UpdateNodeOperation.MARK_ISOLATION_SAFE).withRequestId(scanId).build();
                    UpdateNodeStateCall updateNodeStateCall = healthTrackerServiceClient.newUpdateNodeStateCall();
                    updateNodeStateCall.setRequestId(scanId);
                    updateNodeStateCall.setMetrics(metrics);
                    updateNodeStateCall.addAttachment(Calls.retry(Constants.DEFAULT_RETRY_STRATEGY));

                    updateNodeStateCall.call(updateNodeStateRequest);
                } catch (Exception ex) {
                    batchFailed = true;
                    metrics.addCount(BATCH_UPDATE_FAILED, 1, Unit.ONE);
                    log.error("Caught exception while calling HT", ex);
                }
            }

            if (batchFailed) {
                throw new ProcessorFailedException("One or more batch calls to HT for marking nodes as isolationSafe failed!");
            }
        } else {
            metrics.addCount(ISOLATIONS_DISABLED, 1, Unit.ONE);
        }
    }

    protected Map<String, Set<String>> getDPViewOfRecyclingNodes(NodeScanContext context, Metrics metrics) {

        Map<String, Set<String>> nodeIdToSetOfCellsFromDPView = new HashMap<>();

        for (Map.Entry<String, CellInfo> entry : context.getCellInfoMap().entrySet()) {

            Set<ServiceAddress> jurors;
            if (entry.getValue().isDPReachable()) {
                jurors = entry.getValue().getJurorsBasedOnDp();
            } else {
                jurors = entry.getValue().getJurors();
                metrics.addCount(UNREACHABLE_DP, 1, Unit.ONE);
            }

            Sets.SetView<String> intersection = Sets.intersection(context.getNodesInRecycling(),
                jurors.stream().map(sa -> sa.getNodeId()).collect(Collectors.toSet()));

            intersection.stream().forEach(nodeId -> {
                nodeIdToSetOfCellsFromDPView.putIfAbsent(nodeId, new HashSet<>());
                nodeIdToSetOfCellsFromDPView.get(nodeId).add(entry.getKey());
            });
        }

        return nodeIdToSetOfCellsFromDPView;
    }
}
Editor is loading...
Leave a Comment