Untitled

 avatar
unknown
plain_text
2 years ago
2.0 kB
4
Indexable
package com.amex.matching;

import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.collection.Seq;

import java.io.File;
import java.io.Serializable;

public final class GenRecId implements Serializable {

    public static void main(String[] args) throws AnalysisException {
        String in_table = args[0];
        String out_table = args[1];
        String start_rid = args[2];

        SparkSession spark = SparkSession.builder()
                .appName("Generate recid")
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> rddhive = spark.sql("select * from " + in_table);
        StructType hiveschema = rddhive.schema();

        RDD<Tuple2<Row, Object>> inputrows = rddhive.rdd().zipWithIndex();
        RDD<Row> transformedRows = inputrows.map(tuple -> {
            Row r = tuple._1();
            long recid = (Long) tuple._2();
            long id1 = Long.parseLong(start_rid) + recid;
            Seq<Object> seq = r.toSeq().$plus$colon(id1);
            return Row$.MODULE$.fromSeq(seq);
        }, scala.reflect.ClassTag$.MODULE$.apply(Row.class));

        StructField structField = new StructField("id", (DataType) LongType$.MODULE$, false,
                StructField$.MODULE$.apply$default$4());
        Dataset<Row> wcID = spark.createDataFrame(transformedRows, new StructType(hiveschema.fields()).add(structField));

        wcID.createOrReplaceTempView("sprk_to_hive");
        spark.sql("drop table if exists " + out_table);
        spark.sql("create table " + out_table +
                " as select id as recid, pdate, listcd, vendorname, mid, seqnum, matchtype, business, businessid, " +
                "person, personid, address, phone, email, siccode, stdstat, hashraw from sprk_to_hive");

        System.out.println("End of Program!!!");
    }
}
Editor is loading...