Untitled
unknown
plain_text
8 months ago
4.2 kB
7
Indexable
const { SQSClient, SendMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3');
const { LambdaClient, InvokeCommand } = require('@aws-sdk/client-lambda');
const https = require('https');
// Initialize AWS clients
const sqs = new SQSClient();
const s3 = new S3Client();
const lambda = new LambdaClient();
// Configuration
const MAX_RETRIES = process.env.MAX_RETRIES || 3;
const RETRY_DELAY_SECONDS = 900; // 15 minutes
const QUEUE_URL = process.env.SQS_QUEUE_URL;
const POST_LAMBDA_ARN = process.env.POST_LAMBDA_ARN;
const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET;
async function fetchPaginatedAPI(fromDate, toDate) {
let allResults = [];
let index = 0;
while(true) {
const url = new URL(process.env.API_ENDPOINT);
url.searchParams.append('from', fromDate);
url.searchParams.append('to', toDate);
url.searchParams.append('index', index);
const response = await new Promise((resolve, reject) => {
https.get(url, res => {
let data = '';
res.on('data', chunk => data += chunk);
res.on('end', () => resolve(JSON.parse(data)));
}).on('error', reject);
});
allResults = allResults.concat(response.items);
index += response.items.length;
if(index >= response.total) break;
}
return allResults;
}
async function uploadToS3(data, filename) {
await s3.send(new PutObjectCommand({
Bucket: OUTPUT_BUCKET,
Key: filename,
Body: JSON.stringify(data),
ContentType: 'application/json'
}));
return filename;
}
async function handleSuccess(id, receiptHandle) {
await lambda.send(new InvokeCommand({
FunctionName: POST_LAMBDA_ARN,
Payload: JSON.stringify({ id, status: 'SUCCESS' })
}));
if(receiptHandle) {
await sqs.send(new DeleteMessageCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: receiptHandle
}));
}
}
async function handleFailure(id, params, retryCount) {
if(retryCount >= MAX_RETRIES) {
await lambda.send(new InvokeCommand({
FunctionName: POST_LAMBDA_ARN,
Payload: JSON.stringify({ id, status: 'FAILED' })
}));
return;
}
await sqs.send(new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify({
id,
params,
retryCount: retryCount + 1
}),
DelaySeconds: RETRY_DELAY_SECONDS
}));
}
exports.handler = async (event) => {
let params, id, receiptHandle;
try {
// Determine event source
if(event.Records?.[0]?.eventSource === 'aws:s3') {
// Initial execution from S3
params = {
fromDate: process.env.LAST_RUN_DATE,
toDate: new Date().toISOString()
};
id = `exec-${Date.now()}`;
} else if(event.Records?.[0]?.eventSource === 'aws:sqs') {
// Retry execution from SQS
const message = JSON.parse(event.Records[0].body);
params = message.params;
id = message.id;
receiptHandle = event.Records[0].receiptHandle;
} else {
throw new Error('Unknown event source');
}
// Core processing
const apiData = await fetchPaginatedAPI(params.fromDate, params.toDate);
const filename = `data-${params.toDate}.json`;
await uploadToS3(apiData, filename);
// Update last run date for initial executions
if(!receiptHandle) {
process.env.LAST_RUN_DATE = params.toDate;
}
await handleSuccess(id, receiptHandle);
return { status: 'COMPLETED' };
} catch(error) {
console.error('Processing failed:', error);
await handleFailure(id, params, receiptHandle ? event.Records[0].body.retryCount : 0);
return { status: 'FAILED', error: error.message };
}
};Editor is loading...
Leave a Comment