Untitled
unknown
plain_text
a year ago
7.4 kB
4
Indexable
public abstract class EMRClusterUtil { protected static final String[] APPLICATIONS = {"Hadoop", "Spark"}; private EMRClusterUtil() { } /** * Method responsible for the cluster creation * @param clusterBuilder RunJobFlowRequest * @param clusterTr ClusterTr * @param jobDefinition JobDefinitionTr */ public static void buildCluster(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) { final String releaseLabel = clusterTr.optReleaseLabel() .orElseThrow(() -> new IllegalStateException("The release label is mandatory")); clusterBuilder .name(clusterTr.getName()) .securityConfiguration(clusterTr.optSecurityConfiguration() .orElseThrow(() -> new IllegalStateException("There is no security configuration assigned"))) .applications(listApplications(jobDefinition)) .releaseLabel(releaseLabel) .autoTerminationPolicy(autoTermination -> autoTermination .idleTimeout(clusterTr.optIdleTimeOut().orElse(300L)) .build()) .configurations(buildEmrConfigurations(clusterTr)) .applications(listApplications(jobDefinition)) .instances(instancesBuilder -> buildInstances(instancesBuilder, clusterTr, jobDefinition)); jobDefinition.optServiceRole().ifPresent(clusterBuilder::serviceRole); jobDefinition.optJobFlowRole().ifPresent(clusterBuilder::jobFlowRole); clusterTr.optLogUri().ifPresent(clusterBuilder::logUri); configureBootStrapActions(clusterBuilder, clusterTr); configureCustomAmiId(clusterBuilder, clusterTr); configureTags(clusterBuilder, clusterTr); } private static List<Configuration> buildEmrConfigurations(ClusterTr clusterTr) { final List<Configuration> configurations = new ArrayList<>(); clusterTr.optEmrConfiguration().ifPresent(emrConfigurationTrs -> emrConfigurationTrs.forEach(emrConfigurationTr -> configurations.add(Configuration.builder() .classification(emrConfigurationTr.getClassification()) .properties(emrConfigurationTr.getProperties()) .build()))); return configurations; } private static void configureTags(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) { clusterTr.optTags().ifPresent(streamTags -> { final StreamToMap streamToMap = new StreamToMap(streamTags); final Set<Tag> tags = new HashSet<>(); streamToMap.getMap().forEach((key, value) -> { Tag tag = Tag.builder() .key(key) .value(value) .build(); tags.add(tag); }); clusterBuilder.tags(tags); }); } private static void configureCustomAmiId(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) { clusterTr.optCustomAmiId().ifPresent(clusterBuilder::customAmiId); } private static void configureBootStrapActions(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) { clusterTr.optBootStrapActions().ifPresent(bootStraps -> { final List<BootstrapActionConfig> bootConfigs = new ArrayList<>(); bootStraps.forEach(action -> buildBootStrapActions(bootConfigs, action)); clusterBuilder.bootstrapActions(bootConfigs); }); } /** * Responsible for the bootstrap actions of the cluster * * @param bootConfigs List<ScriptBootstrapActionConfig> * @param bootStrapTr BootstrapActionConfig.Builder bootStrapActionBuilder */ private static void buildBootStrapActions(List<BootstrapActionConfig> bootConfigs, BootStrapActionTr bootStrapTr) { BootstrapActionConfig actionConfig = BootstrapActionConfig.builder() .name(bootStrapTr.getName()) .scriptBootstrapAction(scriptAction -> scriptAction .path(bootStrapTr.getPath()) .args(bootStrapTr.getArgs().split("\\s+")) .build()) .build(); bootConfigs.add(actionConfig); } /** * Inform the application that will be used by the cluster * @param jobDefinition JobDefinitionTr * @return Collection<Application> */ private static Collection<Application> listApplications(JobDefinitionTr jobDefinition) { final Stream<String> fixedApplications; final Stream<String> applications; fixedApplications = Stream.of(APPLICATIONS); applications = Stream.concat(fixedApplications, jobDefinition.applicationStream()); return applications.map(EMRClusterUtil::buildApplication) .collect(Collectors.toSet()); } /** * Name of the application * @param name String * @return Application */ private static Application buildApplication(String name) { return Application.builder() .name(name) .build(); } /** * Creates the instances of the cluster * @param builder JobFlowInstancesConfig * @param clusterTr ClusterTr * @param jobDefinition JobDefinitionTr */ private static void buildInstances(JobFlowInstancesConfig.Builder builder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) { final ClusterSizeTr clusterSizeTr; clusterSizeTr = clusterTr.optClusterSize() .orElseThrow(() -> new IllegalStateException("There is no size assigned")); builder.instanceCount(clusterSizeTr.getWorkers()) .keepJobFlowAliveWhenNoSteps(true) .ec2SubnetId(jobDefinition.getSubNet()) .terminationProtected(clusterTr.optIsTerminationProtected().orElse(false)) .additionalMasterSecurityGroups(jobDefinition.getSecurityGroupMaster()) .additionalSlaveSecurityGroups(jobDefinition.getSecurityGroupSlave()); clusterSizeTr.optDriverType() .map(NodeTr::getType) .ifPresent(builder::masterInstanceType); clusterSizeTr.optWorkerType() .map(NodeTr::getType) .ifPresent(builder::slaveInstanceType); } static class StreamToMap { private final Map<String, String> map; public StreamToMap(Stream<String> stream) { this.map = new HashMap<>(); populateMap(stream); } private void populateMap(Stream<String> stream) { // Extract parameters and values from the stream List<String> parametersAndValues = stream.collect(Collectors.toList()); // Populate the HashMap using Streams IntStream.range(0, parametersAndValues.size() / 2) .forEach(i -> map.put(parametersAndValues.get(i * 2), parametersAndValues.get(i * 2 + 1))); } public Map<String, String> getMap() { return map; } } }
Editor is loading...
Leave a Comment