Untitled
unknown
plain_text
2 years ago
4.0 kB
6
Indexable
package com.amex.matching;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.collection.mutable.WrappedArray;
import java.io.Serializable;
public final class GenRecId implements Serializable {
public static void main(String[] args) throws AnalysisException {
String in_table = args[0];
String out_table = args[1];
String start_rid = args[2];
SparkSession spark = SparkSession.builder()
.appName("Generate recid")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> rddhive = spark.sql("select * from " + in_table);
StructType hiveschema = rddhive.schema();
// Convert RDD to CustomJavaPairRDD
CustomJavaPairRDD<Row, Object> inputPairRDD = new CustomJavaPairRDD<>(rddhive.rdd().toJavaRDD().rdd().toJavaRDD());
// Transformation function
Function<Tuple2<Row, Object>, Tuple2<Row, Object>> mapFunction = new Function<Tuple2<Row, Object>, Tuple2<Row, Object>>() {
@Override
public Tuple2<Row, Object> call(Tuple2<Row, Object> tuple) throws Exception {
Row r = tuple._1();
long recid = (Long) tuple._2();
long id1 = Long.parseLong(start_rid) + recid;
// Convert id1 to WrappedArray to include it in the row
WrappedArray<Long> wrappedArray = WrappedArray.make(new Long[]{id1});
// Create a new Row with id1 appended to the existing fields
Object[] rowData = new Object[r.size() + 1];
for (int i = 0; i < r.size(); i++) {
rowData[i] = r.get(i);
}
rowData[r.size()] = wrappedArray;
// Create a new Tuple2 with the updated Row and the original Object
Tuple2<Row, Object> updatedTuple = new Tuple2<>(RowFactory.create(rowData), tuple._2());
// Return the updated Tuple2
return updatedTuple;
}
};
// Apply the transformation using map transformation on CustomJavaPairRDD
CustomJavaPairRDD<Row, Object> transformedPairRDD = inputPairRDD.mapToPair(new PairFunction<Tuple2<Row, Object>, Row, Object>() {
@Override
public Tuple2<Row, Object> call(Tuple2<Row, Object> tuple) throws Exception {
return mapFunction.call(tuple);
}
});
// Extract the Rows from the updated Tuple2s
JavaRDD<Row> transformedRowsJavaRDD = transformedPairRDD.values().toJavaRDD();
StructField structField = new StructField("id", (DataType) LongType$.MODULE$, false,
StructField$.MODULE$.apply$default$4());
Dataset<Row> wcID = spark.createDataFrame(transformedRowsJavaRDD, new StructType(hiveschema.fields()).add(structField));
wcID.createOrReplaceTempView("sprk_to_hive");
spark.sql("drop table if exists " + out_table);
spark.sql("create table " + out_table +
" as select id as recid, pdate, listcd, vendorname, mid, seqnum, matchtype, business, businessid, " +
"person, personid, address, phone, email, siccode, stdstat, hashraw from sprk_to_hive");
System.out.println("End of Program!!!");
}
public static class CustomJavaPairRDD<K, V> extends JavaPairRDD<K, V> {
public CustomJavaPairRDD(RDD<Tuple2<K, V>> rdd) {
super(rdd, scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class));
}
}
}
Editor is loading...