package org.apache.cassandra.triggers;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SimpleBuilders.RowBuilder;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.json.simple.JSONObject;
public class LogsTrigger implements ITrigger {
private static final Logger logger = LoggerFactory.getLogger(LogsTrigger.class);
public Collection<Mutation> augment(Partition partition)
{
CFMetaData cfm = partition.metadata();
String tableName = cfm.ksName;
PartitionUpdate.SimpleBuilder audit = PartitionUpdate.simpleBuilder(Schema.instance.getCFMetaData("navbackoffice", "audit"),UUIDGen.getTimeUUID());
JSONObject obj = new JSONObject();
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
try {
UnfilteredRowIterator it = partition.unfilteredIterator();
Map<String, String> map = new HashMap<String, String>();
while (it.hasNext()) {
Unfiltered un = it.next();
Clustering clt = (Clustering) un.clustering();
Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
Iterator<ColumnData> columndata = partition.getRow(clt).columnData().iterator();
while(columns.hasNext())
{
ColumnDefinition columnDef = columns.next();
List<ColumnDefinition> clusteringKeyColumns = cfm.clusteringColumns();
ByteBuffer[] clusteringKeyValues = clt.getRawValues();
int i = 0;
for (ColumnDefinition column : clusteringKeyColumns) {
map.put(column.name.toString(), cfm.getKeyValidator().getString(clusteringKeyValues[i]));
i++;
}
//if(columnDef.isPrimaryKeyColumn()) {
Cell cell = cells.next();
String data = new String(cell.value().array()); // If cell type is text
obj.put(columnDef.toString(), data);
//}
}
logger.debug(map.toString());
//logger.debug(un.kind().toString());
switch (un.kind()) {
case ROW:
Row row = (Row) un;
row.
// logger.debug(row.deletion().isLive());
if (!row.deletion().isLive()) {
logger.debug("row deletion");
}
for (Cell cell : row.cells()) {
//logger.debug(cell.toString());
if (cell.isTombstone() ){
// Cell deletion
logger.debug("cell deletion");
} else {
// Insert or Update
if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) {
logger.debug("row insert");
}
else {
if (row.deletion().isLive()) {
logger.debug("row update");
}
}
audit.row().add("client_id",Integer.parseInt(partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())))
.add("broker_id", Integer.parseInt(map.get("broker_id")))
.add("acc_id", Integer.parseInt(map.get("acc_id")))
.add("keyspace_name",partition.metadata().ksName)
.add("table_name", partition.metadata().cfName);
}
}
break;
case RANGE_TOMBSTONE_MARKER:
// Range Deletion < >
break;
}
}
} catch (Exception e) {
logger.debug(e.getMessage());
}
return Collections.singletonList(audit.buildAsMutation());
}
}