Untitled
unknown
javascript
3 years ago
8.3 kB
8
Indexable
const readMessage = async (message) => {
const key = get(message, 'key.id');
const topic = message.topic;
const op = get(message, 'value.op');
const before = get(message, 'value.before') || {};
const after = get(message, 'value.after') || {};
const tableName = mapping[message.topic];
const id = before.id || after.id;
if (
tableName === 'BridgeTransactions' &&
![
Transaction.TYPE.REDEMPTION,
Transaction.TYPE.VOID,
Transaction.TYPE.WALLET_REDEMPTION,
Transaction.TYPE.AMENDMENT_REDEEM
].includes(after.type)
) return;
// Save to mongo
switch (op) {
case 'c': // create
case 'u': { // update
const exist = await mongodb[tableName].findOne({ key: id, status: Status.CREATED });
const params = {
key,
topic,
data: after,
status: Status.CREATED,
createdAt: after.createdAt || Date.now(),
updatedAt: after.updatedAt || Date.now()
};
let value = {};
if (exist) {
// data = await mongodb[tableName].updateOne({ _id: exist._id }, params);
value = await mongodb[tableName].findByIdAndUpdate({ _id: exist._id }, params, { returnDocument: 'after' });
} else {
value = await mongodb[tableName].create(params);
}
console.log(tableName, '-- tableName --');
console.log(key, '-- value.key --');
// sync transaction
await syncTransaction({ tableName, value });
break;
}
case 'd': // delete
await mongodb[tableName].deleteMany({ key, status: Status.CREATED });
break;
}
};
const syncTransaction = async (params = {}) => {
try {
const condition = {
'data.type': {
$in: [
Transaction.TYPE.REDEMPTION,
Transaction.TYPE.VOID,
Transaction.TYPE.WALLET_REDEMPTION,
Transaction.TYPE.AMENDMENT_REDEEM
]
},
status: Status.CREATED
};
let BridgeTransactions = null;
if (params.tableName === 'BridgeTransactions') {
condition.key = params.value.key;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
// BridgeOrders
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.TransactionId': BridgeTransactions.key });
if (!BridgeOrders) return;
BridgeTransactions.BridgeOrders = BridgeOrders._id;
// BridgeBuyBacks
if (BridgeOrders.data.orderType === 'BuyBack') {
const BridgeBuyBacks = await mongodb.BridgeBuyBacks.findOne({ key: BridgeOrders.data.orderId});
if (BridgeBuyBacks) {
BridgeTransactions.BridgeBuyBacks = BridgeBuyBacks._id;
}
}
// Orders
const Orders = await mongodb.Orders.findOne({ key: BridgeOrders.data.orderId });
if (Orders) {
BridgeTransactions.Orders = Orders._id;
// OrderItems
const OrderItems = await mongodb.OrderItems.find({ 'data.OrderId': Orders.key });
for (const OrderItem of OrderItems) {
if (!BridgeTransactions.OrderItems.includes(OrderItem._id)) {
BridgeTransactions.OrderItems.push(OrderItem._id);
}
const AwbOrders = await mongodb.AwbOrders.findOne({ 'data.OrderItemId': OrderItem.key });
if (AwbOrders) {
OrderItem.AwbOrders = AwbOrders._id;
updateTransactions.push(OrderItem.save());
const Awbs = await mongodb.Awbs.findOne({ key: AwbOrders.data.AwbId });
if (Awbs) {
AwbOrders.Awbs = Awbs._id;
updateTransactions.push(AwbOrders.save());
}
}
}
// OrderPayments
const OrderPayments = await mongodb.OrderPayments.find({ 'data.OrderId': Orders.key });
OrderPayments.forEach(OrderPayment => {
if (!BridgeTransactions.OrderPayments.includes(OrderPayment._id)) {
BridgeTransactions.OrderPayments.push(OrderPayment._id);
}
});
// Fees
const Fees = await mongodb.Fees.find({ 'data.OrderId': Orders.key });
Fees.forEach(Fee => {
if (!BridgeTransactions.Fees.includes(Fee._id)) {
BridgeTransactions.Fees.push(Fee._id);
}
});
}
BridgeTransactions.OrderItems = removeDuplicate(BridgeTransactions.OrderItems);
BridgeTransactions.OrderPayments = removeDuplicate(BridgeTransactions.OrderPayments);
BridgeTransactions.Fees = removeDuplicate(BridgeTransactions.Fees);
await BridgeTransactions.save();
} else if (params.tableName === 'BridgeOrders') {
condition.key = params.value.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.BridgeOrders = params.value._id;
await BridgeTransactions.save();
}
} else if (params.tableName === 'BridgeBuyBacks') {
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.orderId': params.value.key, 'data.orderType': 'BuyBack' });
if (!BridgeOrders) return;
condition.key = BridgeOrders.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.BridgeBuyBacks = params.value._id;
await BridgeTransactions.save();
}
} else if (params.tableName === 'Orders') {
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.orderId': params.value.key, 'data.orderType': 'Normal' });
if (!BridgeOrders) return;
condition.key = BridgeOrders.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.Orders = params.value._id;
await BridgeTransactions.save();
}
} else if (params.tableName === 'OrderItems') {
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.orderId': params.value.data.OrderId, 'data.orderType': 'Normal' });
if (!BridgeOrders) return;
condition.key = BridgeOrders.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.OrderItems.push(params.value._id);
BridgeTransactions.OrderItems = removeDuplicate(BridgeTransactions.OrderItems);
await BridgeTransactions.save();
}
} else if (params.tableName === 'OrderPayments') {
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.orderId': params.value.data.OrderId, 'data.orderType': 'Normal' });
if (!BridgeOrders) return;
condition.key = BridgeOrders.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.OrderPayments.push(params.value._id);
BridgeTransactions.OrderPayments = removeDuplicate(BridgeTransactions.OrderPayments);
await BridgeTransactions.save();
}
} else if (params.tableName === 'Fees') {
const BridgeOrders = await mongodb.BridgeOrders.findOne({ 'data.orderId': params.value.data.OrderId, 'data.orderType': 'Normal' });
if (!BridgeOrders) return;
condition.key = BridgeOrders.data.TransactionId;
BridgeTransactions = await mongodb.BridgeTransactions.findOne(condition);
if (BridgeTransactions) {
BridgeTransactions.Fees.push(params.value._id);
BridgeTransactions.Fees = removeDuplicate(BridgeTransactions.Fees);
await BridgeTransactions.save();
}
} else if (params.tableName === 'AwbOrders') {
const OrderItems = await mongodb.OrderItems.findOne({ key: params.value.data.OrderItemId });
if (OrderItems) {
OrderItems.AwbOrders = params.value._id;
await OrderItems.save();
}
} else if (params.tableName === 'Awbs') {
const AwbOrders = await mongodb.AwbOrders.findOne({ 'data.AwbId': params.value.key });
if (AwbOrders) {
AwbOrders.Awbs = params.value._id;
await AwbOrders.save();
}
}
return BridgeTransactions;
} catch (error) {
log.error('[Synchronize Transaction Error] ', error);
throw error;
}
};Editor is loading...