Untitled

 avatar
unknown
plain_text
10 months ago
7.2 kB
1
Indexable
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class S3CSVReader {
    private static final Logger LOGGER = LogManager.getLogger(S3CSVReader.class);
    private static final String BUCKET_NAME = "your-bucket-name";
    private static final String PREFIX = "folder/";
    private static final String OUTPUT_BUCKET_NAME = "output-bucket-name";
    private static final String OUTPUT_KEY = "output/valid_records.csv";
    private static final String DYNAMO_TABLE_NAME = "invalid_records";

    private static final Predicate<String[]> ROW_VALIDATOR = row -> {
        if (!row[0].endsWith(".csv")) {
            LOGGER.error("The file is not a CSV file.");
            return false;
        }

        if (row.length < 15) {
            LOGGER.error("Row does not have enough columns: {}", Arrays.toString(row));
            return false;
        }

        for (String value : row) {
            if (value == null) {
                LOGGER.error("Null value found in row: {}", Arrays.toString(row));
                return false;
            }
        }

        String column3 = row[2];
        String column12 = row[11];
        String column13 = row[12];
        String column14 = row[13];

        if (column3.length() != 2 || !column3.matches("[a-zA-Z]+")) {
            LOGGER.error("Invalid value in column 3 of row: {}", Arrays.toString(row));
            return false;
        }

        if (!column12.matches("\\d+(\\.\\d+)?") || !column13.matches("\\d+(\\.\\d+)?")) {
            LOGGER.error("Invalid value in columns 12 or 13 of row: {}", Arrays.toString(row));
            return false;
        }

        double column12Value = Double.parseDouble(column12);
        double column13Value = Double.parseDouble(column13);
        double column14Value = Double.parseDouble(column14);

        if (column13Value <= column12Value) {
            LOGGER.error("Column 13 value ({}) is not greater than column 12 value ({}) in row: {}",
                    column13Value, column12Value, Arrays.toString(row));
            return false;
        }

        if (column14Value >= 50.0) {
            LOGGER.error("Column 14 value ({}) is not less than 50% in row: {}", column14Value, Arrays.toString(row));
            return false;
        }

        return true;
    };

    public static void main(String[] args) {
        AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();
        Optional<S3ObjectSummary> latestFile = findLatestCSVFile(s3Client);

        if (latestFile.isPresent()) {
            S3Object s3Object = s3Client.getObject(BUCKET_NAME, latestFile.get().getKey());
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
                CSVReader csvReader = new CSVReader(reader);
                List<String[]> records = csvReader.readAll();

                Map<Boolean, List<String[]>> partitionedRecords = records.stream()
                        .collect(Collectors.partitioningBy(ROW_VALIDATOR));

                List<String[]> validRecords = partitionedRecords.get(true);
                List<String[]> invalidRecords = partitionedRecords.get(false);

                LOGGER.info("Found {} valid records and {} invalid records in the CSV file.", validRecords.size(), invalidRecords.size());

                // Write valid records to a CSV file in an S3 location
                ByteArrayOutputStream validOutputStream = new ByteArrayOutputStream();
                try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(validOutputStream))) {
                    csvWriter.writeAll(validRecords);
                }

                byte[] csvBytes = validOutputStream.toByteArray();
                s3Client.putObject(OUTPUT_BUCKET_NAME, OUTPUT_KEY, new ByteArrayInputStream(csvBytes), new Object());
                LOGGER.info("Valid records written to S3 location: {}/{}", OUTPUT_BUCKET_NAME, OUTPUT_KEY);

                // Write invalid records to a DynamoDB table
                AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.defaultClient();
                DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
                Table table = dynamoDB.getTable(DYNAMO_TABLE_NAME);

                for (String[] invalidRecord : invalidRecords) {
                    Item item = new Item()
                            .withPrimaryKey("id", UUID.randomUUID().toString())
                            .withList("record", Arrays.asList(invalidRecord));
                    table.putItem(item);
                }

                LOGGER.info("Invalid records written to DynamoDB table: {}", DYNAMO_TABLE_NAME);

                Map<String, Long> column14ValueCounts = validRecords.stream()
                        .map(row -> row[13])
                        .collect(Collectors.groupingBy(value -> value, Collectors.counting()));

                long totalValues = validRecords.size();
                long repeatedValues = column14ValueCounts.values().stream()
                        .filter(count -> count > 1)
                        .map(count -> count - 1)
                        .mapToLong(value -> value)
                        .sum();

                double percentageRepeated = (double) repeatedValues / totalValues * 100;
                LOGGER.info("Percentage of values repeated in column 14: {:.2f}%", percentageRepeated);
            } catch (IOException e) {
                LOGGER.error("Error reading, parsing, or writing CSV file.", e);
            }
        } else {
            LOGGER.warn("No CSV files found in the bucket.");
        }
    }

    private static Optional<S3ObjectSummary> findLatestCSVFile(AmazonS3 s3Client) {
        ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(BUCKET_NAME).withPrefix(PREFIX);
        ListObjectsV2Result result;

        List<S3ObjectSummary> csvFiles;
        do {
            result = s3Client.listObjectsV2(req);
            csvFiles = result.getObjectSummaries().stream()
                    .filter(obj -> obj.getKey().endsWith(".csv"))
                    .collect(Collectors.toList());

            String token = result.getNextContinuationToken();
            req.setContinuationToken(token);
        } while (result.isTruncated());

        return csvFiles.stream()
                .max(Comparator.comparing(S3ObjectSummary::getLastModified));
    }
}
Editor is loading...
Leave a Comment