trigger
unknown
java
3 years ago
5.4 kB
5
Indexable
package org.apache.cassandra.triggers; import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.SimpleBuilders.RowBuilder; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.ColumnData; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.json.simple.JSONObject; public class LogsTrigger implements ITrigger { private static final Logger logger = LoggerFactory.getLogger(LogsTrigger.class); public Collection<Mutation> augment(Partition partition) { CFMetaData cfm = partition.metadata(); String tableName = cfm.ksName; PartitionUpdate.SimpleBuilder audit = PartitionUpdate.simpleBuilder(Schema.instance.getCFMetaData("navbackoffice", "audit"),UUIDGen.getTimeUUID()); JSONObject obj = new JSONObject(); LocalDateTime now = LocalDateTime.now(); DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); try { UnfilteredRowIterator it = partition.unfilteredIterator(); Map<String, String> map = new HashMap<String, String>(); while (it.hasNext()) { Unfiltered un = it.next(); Clustering clt = (Clustering) un.clustering(); Iterator<Cell> cells = partition.getRow(clt).cells().iterator(); Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator(); Iterator<ColumnData> columndata = partition.getRow(clt).columnData().iterator(); while(columns.hasNext()) { ColumnDefinition columnDef = columns.next(); List<ColumnDefinition> clusteringKeyColumns = cfm.clusteringColumns(); ByteBuffer[] clusteringKeyValues = clt.getRawValues(); int i = 0; for (ColumnDefinition column : clusteringKeyColumns) { map.put(column.name.toString(), cfm.getKeyValidator().getString(clusteringKeyValues[i])); i++; } //if(columnDef.isPrimaryKeyColumn()) { Cell cell = cells.next(); String data = new String(cell.value().array()); // If cell type is text obj.put(columnDef.toString(), data); //} } logger.debug(map.toString()); //logger.debug(un.kind().toString()); switch (un.kind()) { case ROW: Row row = (Row) un; row. // logger.debug(row.deletion().isLive()); if (!row.deletion().isLive()) { logger.debug("row deletion"); } for (Cell cell : row.cells()) { //logger.debug(cell.toString()); if (cell.isTombstone() ){ // Cell deletion logger.debug("cell deletion"); } else { // Insert or Update if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) { logger.debug("row insert"); } else { if (row.deletion().isLive()) { logger.debug("row update"); } } audit.row().add("client_id",Integer.parseInt(partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()))) .add("broker_id", Integer.parseInt(map.get("broker_id"))) .add("acc_id", Integer.parseInt(map.get("acc_id"))) .add("keyspace_name",partition.metadata().ksName) .add("table_name", partition.metadata().cfName); } } break; case RANGE_TOMBSTONE_MARKER: // Range Deletion < > break; } } } catch (Exception e) { logger.debug(e.getMessage()); } return Collections.singletonList(audit.buildAsMutation()); } }
Editor is loading...