Untitled

 avatar
unknown
plain_text
a year ago
7.4 kB
4
Indexable
public abstract class EMRClusterUtil {

    protected static final String[] APPLICATIONS = {"Hadoop", "Spark"};

    private EMRClusterUtil() {

    }

    /**
     * Method responsible for the cluster creation
     * @param clusterBuilder RunJobFlowRequest
     * @param clusterTr ClusterTr
     * @param jobDefinition JobDefinitionTr
     */
    public static void buildCluster(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) {
        final String releaseLabel = clusterTr.optReleaseLabel()
                .orElseThrow(() -> new IllegalStateException("The release label is mandatory"));

        clusterBuilder
                .name(clusterTr.getName())
                .securityConfiguration(clusterTr.optSecurityConfiguration()
                        .orElseThrow(() -> new IllegalStateException("There is no security configuration assigned")))
                .applications(listApplications(jobDefinition))
                .releaseLabel(releaseLabel)
                .autoTerminationPolicy(autoTermination -> autoTermination
                        .idleTimeout(clusterTr.optIdleTimeOut().orElse(300L))
                        .build())
                .configurations(buildEmrConfigurations(clusterTr))
                .applications(listApplications(jobDefinition))
                .instances(instancesBuilder -> buildInstances(instancesBuilder, clusterTr, jobDefinition));
        jobDefinition.optServiceRole().ifPresent(clusterBuilder::serviceRole);
        jobDefinition.optJobFlowRole().ifPresent(clusterBuilder::jobFlowRole);
        clusterTr.optLogUri().ifPresent(clusterBuilder::logUri);

        configureBootStrapActions(clusterBuilder, clusterTr);
        configureCustomAmiId(clusterBuilder, clusterTr);
        configureTags(clusterBuilder, clusterTr);
    }

    private static List<Configuration> buildEmrConfigurations(ClusterTr clusterTr) {
        final List<Configuration> configurations = new ArrayList<>();
        clusterTr.optEmrConfiguration().ifPresent(emrConfigurationTrs ->
                emrConfigurationTrs.forEach(emrConfigurationTr ->
                        configurations.add(Configuration.builder()
                                        .classification(emrConfigurationTr.getClassification())
                                        .properties(emrConfigurationTr.getProperties())
                .build())));

        return configurations;
    }

    private static void configureTags(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
        clusterTr.optTags().ifPresent(streamTags -> {
            final StreamToMap streamToMap = new StreamToMap(streamTags);
            final Set<Tag> tags = new HashSet<>();

            streamToMap.getMap().forEach((key, value) -> {
                Tag tag = Tag.builder()
                        .key(key)
                        .value(value)
                        .build();

                tags.add(tag);
            });

            clusterBuilder.tags(tags);
        });
    }

    private static void configureCustomAmiId(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
        clusterTr.optCustomAmiId().ifPresent(clusterBuilder::customAmiId);
    }

    private static void configureBootStrapActions(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
        clusterTr.optBootStrapActions().ifPresent(bootStraps -> {
            final List<BootstrapActionConfig> bootConfigs = new ArrayList<>();
            bootStraps.forEach(action -> buildBootStrapActions(bootConfigs, action));
            clusterBuilder.bootstrapActions(bootConfigs);
        });
    }

    /**
     * Responsible for the bootstrap actions of the cluster
     *
     * @param bootConfigs List<ScriptBootstrapActionConfig>
     * @param bootStrapTr BootstrapActionConfig.Builder bootStrapActionBuilder
     */
    private static void buildBootStrapActions(List<BootstrapActionConfig> bootConfigs, BootStrapActionTr bootStrapTr) {
        BootstrapActionConfig actionConfig = BootstrapActionConfig.builder()
                .name(bootStrapTr.getName())
                .scriptBootstrapAction(scriptAction -> scriptAction
                        .path(bootStrapTr.getPath())
                        .args(bootStrapTr.getArgs().split("\\s+"))
                        .build())
                .build();

        bootConfigs.add(actionConfig);
    }

    /**
     * Inform the application that will be used by the cluster
     * @param jobDefinition JobDefinitionTr
     * @return Collection<Application>
     */
    private static Collection<Application> listApplications(JobDefinitionTr jobDefinition) {
        final Stream<String> fixedApplications;
        final Stream<String> applications;

        fixedApplications = Stream.of(APPLICATIONS);
        applications = Stream.concat(fixedApplications, jobDefinition.applicationStream());
        return applications.map(EMRClusterUtil::buildApplication)
                .collect(Collectors.toSet());
    }

    /**
     * Name of the application
     * @param name String
     * @return Application
     */
    private static Application buildApplication(String name) {
        return Application.builder()
                .name(name)
                .build();
    }

    /**
     * Creates the instances of the cluster
     * @param builder JobFlowInstancesConfig
     * @param clusterTr ClusterTr
     * @param jobDefinition JobDefinitionTr
     */
    private static void buildInstances(JobFlowInstancesConfig.Builder builder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) {
        final ClusterSizeTr clusterSizeTr;

        clusterSizeTr = clusterTr.optClusterSize()
                .orElseThrow(() -> new IllegalStateException("There is no size assigned"));
        builder.instanceCount(clusterSizeTr.getWorkers())
                .keepJobFlowAliveWhenNoSteps(true)
                .ec2SubnetId(jobDefinition.getSubNet())
                .terminationProtected(clusterTr.optIsTerminationProtected().orElse(false))
                .additionalMasterSecurityGroups(jobDefinition.getSecurityGroupMaster())
                .additionalSlaveSecurityGroups(jobDefinition.getSecurityGroupSlave());
        clusterSizeTr.optDriverType()
                .map(NodeTr::getType)
                .ifPresent(builder::masterInstanceType);
        clusterSizeTr.optWorkerType()
                .map(NodeTr::getType)
                .ifPresent(builder::slaveInstanceType);
    }

    static class StreamToMap {
        private final Map<String, String> map;

        public StreamToMap(Stream<String> stream) {
            this.map = new HashMap<>();
            populateMap(stream);
        }

        private void populateMap(Stream<String> stream) {
            // Extract parameters and values from the stream
            List<String> parametersAndValues = stream.collect(Collectors.toList());

            // Populate the HashMap using Streams
            IntStream.range(0, parametersAndValues.size() / 2)
                    .forEach(i -> map.put(parametersAndValues.get(i * 2), parametersAndValues.get(i * 2 + 1)));
        }

        public Map<String, String> getMap() {
            return map;
        }
    }
}
Editor is loading...
Leave a Comment