Untitled
unknown
plain_text
a year ago
7.2 kB
3
Indexable
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class S3CSVReader {
private static final Logger LOGGER = LogManager.getLogger(S3CSVReader.class);
private static final String BUCKET_NAME = "your-bucket-name";
private static final String PREFIX = "folder/";
private static final String OUTPUT_BUCKET_NAME = "output-bucket-name";
private static final String OUTPUT_KEY = "output/valid_records.csv";
private static final String DYNAMO_TABLE_NAME = "invalid_records";
private static final Predicate<String[]> ROW_VALIDATOR = row -> {
if (!row[0].endsWith(".csv")) {
LOGGER.error("The file is not a CSV file.");
return false;
}
if (row.length < 15) {
LOGGER.error("Row does not have enough columns: {}", Arrays.toString(row));
return false;
}
for (String value : row) {
if (value == null) {
LOGGER.error("Null value found in row: {}", Arrays.toString(row));
return false;
}
}
String column3 = row[2];
String column12 = row[11];
String column13 = row[12];
String column14 = row[13];
if (column3.length() != 2 || !column3.matches("[a-zA-Z]+")) {
LOGGER.error("Invalid value in column 3 of row: {}", Arrays.toString(row));
return false;
}
if (!column12.matches("\\d+(\\.\\d+)?") || !column13.matches("\\d+(\\.\\d+)?")) {
LOGGER.error("Invalid value in columns 12 or 13 of row: {}", Arrays.toString(row));
return false;
}
double column12Value = Double.parseDouble(column12);
double column13Value = Double.parseDouble(column13);
double column14Value = Double.parseDouble(column14);
if (column13Value <= column12Value) {
LOGGER.error("Column 13 value ({}) is not greater than column 12 value ({}) in row: {}",
column13Value, column12Value, Arrays.toString(row));
return false;
}
if (column14Value >= 50.0) {
LOGGER.error("Column 14 value ({}) is not less than 50% in row: {}", column14Value, Arrays.toString(row));
return false;
}
return true;
};
public static void main(String[] args) {
AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();
Optional<S3ObjectSummary> latestFile = findLatestCSVFile(s3Client);
if (latestFile.isPresent()) {
S3Object s3Object = s3Client.getObject(BUCKET_NAME, latestFile.get().getKey());
try (BufferedReader reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
CSVReader csvReader = new CSVReader(reader);
List<String[]> records = csvReader.readAll();
Map<Boolean, List<String[]>> partitionedRecords = records.stream()
.collect(Collectors.partitioningBy(ROW_VALIDATOR));
List<String[]> validRecords = partitionedRecords.get(true);
List<String[]> invalidRecords = partitionedRecords.get(false);
LOGGER.info("Found {} valid records and {} invalid records in the CSV file.", validRecords.size(), invalidRecords.size());
// Write valid records to a CSV file in an S3 location
ByteArrayOutputStream validOutputStream = new ByteArrayOutputStream();
try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(validOutputStream))) {
csvWriter.writeAll(validRecords);
}
byte[] csvBytes = validOutputStream.toByteArray();
s3Client.putObject(OUTPUT_BUCKET_NAME, OUTPUT_KEY, new ByteArrayInputStream(csvBytes), new Object());
LOGGER.info("Valid records written to S3 location: {}/{}", OUTPUT_BUCKET_NAME, OUTPUT_KEY);
// Write invalid records to a DynamoDB table
AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.defaultClient();
DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
Table table = dynamoDB.getTable(DYNAMO_TABLE_NAME);
for (String[] invalidRecord : invalidRecords) {
Item item = new Item()
.withPrimaryKey("id", UUID.randomUUID().toString())
.withList("record", Arrays.asList(invalidRecord));
table.putItem(item);
}
LOGGER.info("Invalid records written to DynamoDB table: {}", DYNAMO_TABLE_NAME);
Map<String, Long> column14ValueCounts = validRecords.stream()
.map(row -> row[13])
.collect(Collectors.groupingBy(value -> value, Collectors.counting()));
long totalValues = validRecords.size();
long repeatedValues = column14ValueCounts.values().stream()
.filter(count -> count > 1)
.map(count -> count - 1)
.mapToLong(value -> value)
.sum();
double percentageRepeated = (double) repeatedValues / totalValues * 100;
LOGGER.info("Percentage of values repeated in column 14: {:.2f}%", percentageRepeated);
} catch (IOException e) {
LOGGER.error("Error reading, parsing, or writing CSV file.", e);
}
} else {
LOGGER.warn("No CSV files found in the bucket.");
}
}
private static Optional<S3ObjectSummary> findLatestCSVFile(AmazonS3 s3Client) {
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(BUCKET_NAME).withPrefix(PREFIX);
ListObjectsV2Result result;
List<S3ObjectSummary> csvFiles;
do {
result = s3Client.listObjectsV2(req);
csvFiles = result.getObjectSummaries().stream()
.filter(obj -> obj.getKey().endsWith(".csv"))
.collect(Collectors.toList());
String token = result.getNextContinuationToken();
req.setContinuationToken(token);
} while (result.isTruncated());
return csvFiles.stream()
.max(Comparator.comparing(S3ObjectSummary::getLastModified));
}
}Editor is loading...
Leave a Comment