Untitled

 avatar
unknown
plain_text
a year ago
1.0 kB
2
Indexable
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import scala.collection.mutable.WrappedArray;

public class GenRecIdMapper implements Function<Tuple2<Row, Object>, Row>, Serializable {
    private String start_rid;

    public GenRecIdMapper(String start_rid) {
        this.start_rid = start_rid;
    }

    @Override
    public Row call(Tuple2<Row, Object> tuple) throws Exception {
        Row r = (Row) tuple._1();
        long recid = (Long) tuple._2();
        long id1 = Long.parseLong(start_rid) + recid;

        // Convert id1 to WrappedArray to include it in the row
        WrappedArray<Long> wrappedArray = WrappedArray.make(new Long[]{id1});

        // Create a new Row with id1 appended to the existing fields
        Object[] rowData = new Object[r.size() + 1];
        for (int i = 0; i < r.size(); i++) {
            rowData[i] = r.get(i);
        }
        rowData[r.size()] = wrappedArray;

        // Return the new Row
        return RowFactory.create(rowData);
    }
}