package com.amex.matching;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import scala.Tuple2;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public final class GenRecId implements Serializable {
public static void main(String[] args) throws AnalysisException {
String in_table = args[0];
String out_table = args[1];
String start_rid = args[2];
String tablelocation = (new File(in_table)).getAbsolutePath();
SparkSession spark = SparkSession.builder()
.appName("Generate recid")
.config("spark.sql.table.loc", tablelocation)
.enableHiveSupport()
.getOrCreate();
Dataset<Row> rddhive = spark.sql("select * from " + in_table);
StructType hiveschema = rddhive.schema();
Dataset<Row> wcID = rddhive.rdd().toJavaRDD().zipWithIndex().map((MapFunction<Tuple2<Row, Long>, Row>) rowTuple -> {
Row r = rowTuple._1();
long recid = rowTuple._2() + Long.parseLong(start_rid);
List<Object> rList = new ArrayList<>(r.toSeq().toList());
rList.add(0, recid);
return RowFactory.create(rList.toArray());
}, RowEncoder.apply(hiveschema)).toDS();
wcID.createOrReplaceTempView("sprk_to_hive");
spark.sql("drop table if exists " + out_table);
spark.sql("create table " + out_table + " as select * from sprk_to_hive");
System.out.println("End of Program!!!");
}
}