Untitled
unknown
plain_text
10 months ago
1.6 kB
2
Indexable
Never
package com.amex.matching; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.*; import scala.Tuple2; import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public final class GenRecId implements Serializable { public static void main(String[] args) throws AnalysisException { String in_table = args[0]; String out_table = args[1]; String start_rid = args[2]; String tablelocation = (new File(in_table)).getAbsolutePath(); SparkSession spark = SparkSession.builder() .appName("Generate recid") .config("spark.sql.table.loc", tablelocation) .enableHiveSupport() .getOrCreate(); Dataset<Row> rddhive = spark.sql("select * from " + in_table); StructType hiveschema = rddhive.schema(); Dataset<Row> wcID = rddhive.rdd().toJavaRDD().zipWithIndex().map((MapFunction<Tuple2<Row, Long>, Row>) rowTuple -> { Row r = rowTuple._1(); long recid = rowTuple._2() + Long.parseLong(start_rid); List<Object> rList = new ArrayList<>(r.toSeq().toList()); rList.add(0, recid); return RowFactory.create(rList.toArray()); }, RowEncoder.apply(hiveschema)).toDS(); wcID.createOrReplaceTempView("sprk_to_hive"); spark.sql("drop table if exists " + out_table); spark.sql("create table " + out_table + " as select * from sprk_to_hive"); System.out.println("End of Program!!!"); } }