Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
1.6 kB
2
Indexable
package com.amex.matching;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import scala.Tuple2;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public final class GenRecId implements Serializable {
    public static void main(String[] args) throws AnalysisException {
        String in_table = args[0];
        String out_table = args[1];
        String start_rid = args[2];
        String tablelocation = (new File(in_table)).getAbsolutePath();
        SparkSession spark = SparkSession.builder()
                .appName("Generate recid")
                .config("spark.sql.table.loc", tablelocation)
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Row> rddhive = spark.sql("select * from " + in_table);
        StructType hiveschema = rddhive.schema();
        Dataset<Row> wcID = rddhive.rdd().toJavaRDD().zipWithIndex().map((MapFunction<Tuple2<Row, Long>, Row>) rowTuple -> {
            Row r = rowTuple._1();
            long recid = rowTuple._2() + Long.parseLong(start_rid);
            List<Object> rList = new ArrayList<>(r.toSeq().toList());
            rList.add(0, recid);
            return RowFactory.create(rList.toArray());
        }, RowEncoder.apply(hiveschema)).toDS();
        wcID.createOrReplaceTempView("sprk_to_hive");
        spark.sql("drop table if exists " + out_table);
        spark.sql("create table " + out_table + " as select * from sprk_to_hive");
        System.out.println("End of Program!!!");
    }
}