Untitled
unknown
plain_text
10 months ago
3.1 kB
5
Indexable
Never
package com.amex.matching; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import scala.collection.mutable.WrappedArray; import java.io.Serializable; public final class GenRecId implements Serializable { public static void main(String[] args) throws AnalysisException { String in_table = args[0]; String out_table = args[1]; String start_rid = args[2]; SparkSession spark = SparkSession.builder() .appName("Generate recid") .enableHiveSupport() .getOrCreate(); Dataset<Row> rddhive = spark.sql("select * from " + in_table); StructType hiveschema = rddhive.schema(); // Convert RDD to JavaRDD JavaRDD<Tuple2<Row, Object>> inputJavaRDD = JavaRDD.fromRDD(rddhive.rdd(), scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class)); // Transformation function Function<Tuple2<Row, Object>, Row> mapFunction = new Function<Tuple2<Row, Object>, Row>() { @Override public Row call(Tuple2<Row, Object> tuple) throws Exception { Row r = tuple._1(); long recid = (Long) tuple._2(); long id1 = Long.parseLong(start_rid) + recid; // Convert id1 to WrappedArray to include it in the row WrappedArray<Long> wrappedArray = WrappedArray.make(new Long[]{id1}); // Create a new Row with id1 appended to the existing fields Object[] rowData = new Object[r.size() + 1]; for (int i = 0; i < r.size(); i++) { rowData[i] = r.get(i); } rowData[r.size()] = wrappedArray; // Return the new Row return RowFactory.create(rowData); } }; // Apply the transformation using map transformation on JavaRDD JavaRDD<Row> transformedRowsJavaRDD = inputJavaRDD.map(mapFunction); // Convert JavaRDD back to RDD RDD<Row> transformedRowsRDD = transformedRowsJavaRDD.rdd(); StructField structField = new StructField("id", (DataType) LongType$.MODULE$, false, StructField$.MODULE$.apply$default$4()); Dataset<Row> wcID = spark.createDataFrame(transformedRowsRDD, new StructType(hiveschema.fields()).add(structField)); wcID.createOrReplaceTempView("sprk_to_hive"); spark.sql("drop table if exists " + out_table); spark.sql("create table " + out_table + " as select id as recid, pdate, listcd, vendorname, mid, seqnum, matchtype, business, businessid, " + "person, personid, address, phone, email, siccode, stdstat, hashraw from sprk_to_hive"); System.out.println("End of Program!!!"); } }