S3ToDDBTransfer.java

 avatar
unknown
java
2 years ago
4.9 kB
1
Indexable
package com.example.awsdemo;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class S3ToDDBTransfer {
    private static final String ACCESS_KEY = "AKIAT4BKXD3GHF5EVGDC"; //TODO
    private static final String SECRET_ACCESS_KEY = "LlgvrjEAc6u5kU/Qxou8yfBbWt65vjT2WYl/jzR8"; //TODO

    private static final String BUCKET_NAME = "imdb-sample-data"; //TODO
    private static final String OBJECT_NAME = "imdb_moviesbatman_series_1992_2022.csv"; //TODO
    private static final String TABLE_NAME = "IMDB_Table"; //TODO

    public static void execute() {

        //Create AWS credentials
        AWSCredentials credentials = new BasicAWSCredentials(ACCESS_KEY, SECRET_ACCESS_KEY);

        //Create DDB client
        AmazonDynamoDB amazonDynamoDB = AmazonDynamoDBClient.builder()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withRegion(Regions.EU_WEST_1)
                .build();

        //Create S3 client
        AmazonS3 amazonS3 = AmazonS3Client.builder()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withRegion(Regions.EU_WEST_1)
                .build();

        GetObjectRequest getObjectRequest = new GetObjectRequest(BUCKET_NAME, OBJECT_NAME);
        List<List<String>> imdbEntities = parseCsv(amazonS3.getObject(getObjectRequest).getObjectContent(), ',');


        DynamoDB dynamoDB = new DynamoDB(amazonDynamoDB);
        Table table = dynamoDB.getTable(TABLE_NAME);

        //Put items in DDB table
        for(int i=1; i<imdbEntities.size(); i++) {
            Item item = new Item()
                    .withPrimaryKey("imdbId", imdbEntities.get(i).get(0))
                    .withString("title", imdbEntities.get(i).get(1))
                    .withString("poster", imdbEntities.get(i).get(2))
                    .withString("type", imdbEntities.get(i).get(3))
                    .withString("year", imdbEntities.get(i).get(4));
            table.putItem(item);
        }
    }

    public static List<List<String>> parseCsv(InputStream csvInput, char csvSeparator) {

        // Prepare.
        BufferedReader csvReader = null;
        List<List<String>> csvList = new ArrayList<>();
        String csvRecord;

        // Process records.
        try {
            csvReader = new BufferedReader(new InputStreamReader(csvInput, StandardCharsets.UTF_8));
            while ((csvRecord = csvReader.readLine()) != null) {
                csvList.add(parseCsvRecord(csvRecord, csvSeparator));
            }
        } catch (IOException e) {
            throw new RuntimeException("Reading CSV failed.", e);
        } finally {
            if (csvReader != null)
                try {
                    csvReader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }

        return csvList;
    }

    private static List<String> parseCsvRecord(String record, char csvSeparator) {

        // Prepare.
        boolean quoted = false;
        StringBuilder fieldBuilder = new StringBuilder();
        List<String> fields = new ArrayList<>();

        // Process fields.
        for (int i = 0; i < record.length(); i++) {
            char c = record.charAt(i);
            fieldBuilder.append(c);

            if (c == '"') {
                quoted = !quoted; // Detect nested quotes.
            }

            if ((!quoted && c == csvSeparator) // The separator ..
                    || i + 1 == record.length()) // .. or, the end of record.
            {
                String field = fieldBuilder.toString() // Obtain the field, ..
                        .replaceAll(csvSeparator + "$", "") // .. trim ending separator, ..
                        .replaceAll("^\"|\"$", "") // .. trim surrounding quotes, ..
                        .replace("\"\"", "\""); // .. and un-escape quotes.
                fields.add(field.trim()); // Add field to List.
                fieldBuilder = new StringBuilder(); // Reset.
            }
        }

        return fields;
    }
}
Editor is loading...