Untitled
unknown
plain_text
a year ago
7.4 kB
9
Indexable
public abstract class EMRClusterUtil {
protected static final String[] APPLICATIONS = {"Hadoop", "Spark"};
private EMRClusterUtil() {
}
/**
* Method responsible for the cluster creation
* @param clusterBuilder RunJobFlowRequest
* @param clusterTr ClusterTr
* @param jobDefinition JobDefinitionTr
*/
public static void buildCluster(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) {
final String releaseLabel = clusterTr.optReleaseLabel()
.orElseThrow(() -> new IllegalStateException("The release label is mandatory"));
clusterBuilder
.name(clusterTr.getName())
.securityConfiguration(clusterTr.optSecurityConfiguration()
.orElseThrow(() -> new IllegalStateException("There is no security configuration assigned")))
.applications(listApplications(jobDefinition))
.releaseLabel(releaseLabel)
.autoTerminationPolicy(autoTermination -> autoTermination
.idleTimeout(clusterTr.optIdleTimeOut().orElse(300L))
.build())
.configurations(buildEmrConfigurations(clusterTr))
.applications(listApplications(jobDefinition))
.instances(instancesBuilder -> buildInstances(instancesBuilder, clusterTr, jobDefinition));
jobDefinition.optServiceRole().ifPresent(clusterBuilder::serviceRole);
jobDefinition.optJobFlowRole().ifPresent(clusterBuilder::jobFlowRole);
clusterTr.optLogUri().ifPresent(clusterBuilder::logUri);
configureBootStrapActions(clusterBuilder, clusterTr);
configureCustomAmiId(clusterBuilder, clusterTr);
configureTags(clusterBuilder, clusterTr);
}
private static List<Configuration> buildEmrConfigurations(ClusterTr clusterTr) {
final List<Configuration> configurations = new ArrayList<>();
clusterTr.optEmrConfiguration().ifPresent(emrConfigurationTrs ->
emrConfigurationTrs.forEach(emrConfigurationTr ->
configurations.add(Configuration.builder()
.classification(emrConfigurationTr.getClassification())
.properties(emrConfigurationTr.getProperties())
.build())));
return configurations;
}
private static void configureTags(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
clusterTr.optTags().ifPresent(streamTags -> {
final StreamToMap streamToMap = new StreamToMap(streamTags);
final Set<Tag> tags = new HashSet<>();
streamToMap.getMap().forEach((key, value) -> {
Tag tag = Tag.builder()
.key(key)
.value(value)
.build();
tags.add(tag);
});
clusterBuilder.tags(tags);
});
}
private static void configureCustomAmiId(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
clusterTr.optCustomAmiId().ifPresent(clusterBuilder::customAmiId);
}
private static void configureBootStrapActions(RunJobFlowRequest.Builder clusterBuilder, ClusterTr clusterTr) {
clusterTr.optBootStrapActions().ifPresent(bootStraps -> {
final List<BootstrapActionConfig> bootConfigs = new ArrayList<>();
bootStraps.forEach(action -> buildBootStrapActions(bootConfigs, action));
clusterBuilder.bootstrapActions(bootConfigs);
});
}
/**
* Responsible for the bootstrap actions of the cluster
*
* @param bootConfigs List<ScriptBootstrapActionConfig>
* @param bootStrapTr BootstrapActionConfig.Builder bootStrapActionBuilder
*/
private static void buildBootStrapActions(List<BootstrapActionConfig> bootConfigs, BootStrapActionTr bootStrapTr) {
BootstrapActionConfig actionConfig = BootstrapActionConfig.builder()
.name(bootStrapTr.getName())
.scriptBootstrapAction(scriptAction -> scriptAction
.path(bootStrapTr.getPath())
.args(bootStrapTr.getArgs().split("\\s+"))
.build())
.build();
bootConfigs.add(actionConfig);
}
/**
* Inform the application that will be used by the cluster
* @param jobDefinition JobDefinitionTr
* @return Collection<Application>
*/
private static Collection<Application> listApplications(JobDefinitionTr jobDefinition) {
final Stream<String> fixedApplications;
final Stream<String> applications;
fixedApplications = Stream.of(APPLICATIONS);
applications = Stream.concat(fixedApplications, jobDefinition.applicationStream());
return applications.map(EMRClusterUtil::buildApplication)
.collect(Collectors.toSet());
}
/**
* Name of the application
* @param name String
* @return Application
*/
private static Application buildApplication(String name) {
return Application.builder()
.name(name)
.build();
}
/**
* Creates the instances of the cluster
* @param builder JobFlowInstancesConfig
* @param clusterTr ClusterTr
* @param jobDefinition JobDefinitionTr
*/
private static void buildInstances(JobFlowInstancesConfig.Builder builder, ClusterTr clusterTr, JobDefinitionTr jobDefinition) {
final ClusterSizeTr clusterSizeTr;
clusterSizeTr = clusterTr.optClusterSize()
.orElseThrow(() -> new IllegalStateException("There is no size assigned"));
builder.instanceCount(clusterSizeTr.getWorkers())
.keepJobFlowAliveWhenNoSteps(true)
.ec2SubnetId(jobDefinition.getSubNet())
.terminationProtected(clusterTr.optIsTerminationProtected().orElse(false))
.additionalMasterSecurityGroups(jobDefinition.getSecurityGroupMaster())
.additionalSlaveSecurityGroups(jobDefinition.getSecurityGroupSlave());
clusterSizeTr.optDriverType()
.map(NodeTr::getType)
.ifPresent(builder::masterInstanceType);
clusterSizeTr.optWorkerType()
.map(NodeTr::getType)
.ifPresent(builder::slaveInstanceType);
}
static class StreamToMap {
private final Map<String, String> map;
public StreamToMap(Stream<String> stream) {
this.map = new HashMap<>();
populateMap(stream);
}
private void populateMap(Stream<String> stream) {
// Extract parameters and values from the stream
List<String> parametersAndValues = stream.collect(Collectors.toList());
// Populate the HashMap using Streams
IntStream.range(0, parametersAndValues.size() / 2)
.forEach(i -> map.put(parametersAndValues.get(i * 2), parametersAndValues.get(i * 2 + 1)));
}
public Map<String, String> getMap() {
return map;
}
}
}Editor is loading...
Leave a Comment