Untitled
unknown
plain_text
2 years ago
2.0 kB
4
Indexable
package com.amex.matching; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import scala.collection.Seq; import java.io.File; import java.io.Serializable; public final class GenRecId implements Serializable { public static void main(String[] args) throws AnalysisException { String in_table = args[0]; String out_table = args[1]; String start_rid = args[2]; SparkSession spark = SparkSession.builder() .appName("Generate recid") .enableHiveSupport() .getOrCreate(); Dataset<Row> rddhive = spark.sql("select * from " + in_table); StructType hiveschema = rddhive.schema(); RDD<Tuple2<Row, Object>> inputrows = rddhive.rdd().zipWithIndex(); RDD<Row> transformedRows = inputrows.map(tuple -> { Row r = tuple._1(); long recid = (Long) tuple._2(); long id1 = Long.parseLong(start_rid) + recid; Seq<Object> seq = r.toSeq().$plus$colon(id1); return Row$.MODULE$.fromSeq(seq); }, scala.reflect.ClassTag$.MODULE$.apply(Row.class)); StructField structField = new StructField("id", (DataType) LongType$.MODULE$, false, StructField$.MODULE$.apply$default$4()); Dataset<Row> wcID = spark.createDataFrame(transformedRows, new StructType(hiveschema.fields()).add(structField)); wcID.createOrReplaceTempView("sprk_to_hive"); spark.sql("drop table if exists " + out_table); spark.sql("create table " + out_table + " as select id as recid, pdate, listcd, vendorname, mid, seqnum, matchtype, business, businessid, " + "person, personid, address, phone, email, siccode, stdstat, hashraw from sprk_to_hive"); System.out.println("End of Program!!!"); } }
Editor is loading...