trigger

 avatar
unknown
java
2 years ago
5.4 kB
3
Indexable
package org.apache.cassandra.triggers;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SimpleBuilders.RowBuilder;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.json.simple.JSONObject;
public class LogsTrigger implements ITrigger {
	
	private static final Logger logger = LoggerFactory.getLogger(LogsTrigger.class);
    public Collection<Mutation> augment(Partition partition)
    {
    	CFMetaData cfm = partition.metadata();
    	String tableName = cfm.ksName;
    	
    	PartitionUpdate.SimpleBuilder audit = PartitionUpdate.simpleBuilder(Schema.instance.getCFMetaData("navbackoffice", "audit"),UUIDGen.getTimeUUID());
        JSONObject obj = new JSONObject();
        LocalDateTime now = LocalDateTime.now(); 
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
       
        try {
            UnfilteredRowIterator it = partition.unfilteredIterator();
            Map<String, String> map = new HashMap<String, String>();
            while (it.hasNext()) {
                Unfiltered un = it.next();
                Clustering clt = (Clustering) un.clustering();  
                Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
                Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
                Iterator<ColumnData> columndata = partition.getRow(clt).columnData().iterator();

			    while(columns.hasNext())
			    { 
			    	ColumnDefinition columnDef = columns.next(); 
			        List<ColumnDefinition> clusteringKeyColumns = cfm.clusteringColumns();
			    	ByteBuffer[] clusteringKeyValues = clt.getRawValues();
			    	int i = 0;
			    	 for (ColumnDefinition column : clusteringKeyColumns) {
                         map.put(column.name.toString(), cfm.getKeyValidator().getString(clusteringKeyValues[i]));
                         i++;
                       }
			    	//if(columnDef.isPrimaryKeyColumn()) {
			    		Cell  cell = cells.next(); 
				  		String data = new String(cell.value().array()); // If  cell type is text 
				  		obj.put(columnDef.toString(), data); 
			    	//}
			    }
			   
			    logger.debug(map.toString());
			   
			   //logger.debug(un.kind().toString());
                switch (un.kind()) {
                
                case ROW:
                    Row row = (Row) un;
                    
                    row.
                   
                   // logger.debug(row.deletion().isLive());
                    if (!row.deletion().isLive()) {
                    	logger.debug("row deletion");
                    }

                    for (Cell cell : row.cells()) {
                    	//logger.debug(cell.toString());
                        if (cell.isTombstone() ){
                            // Cell deletion
                        	logger.debug("cell deletion");
                        } else {
                            // Insert or Update
                        	if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) {
                        		
                        		logger.debug("row insert");
                        	}
                        	else {
                        		if (row.deletion().isLive()) {
	                        		logger.debug("row update");
	                        		
	                        	}
	                        }
                        	audit.row().add("client_id",Integer.parseInt(partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())))
                        	  .add("broker_id", Integer.parseInt(map.get("broker_id")))
                        	  .add("acc_id", Integer.parseInt(map.get("acc_id")))
                        	  .add("keyspace_name",partition.metadata().ksName)
                        	  .add("table_name", partition.metadata().cfName);
                             
                        }  
                    }
                    break;
                case RANGE_TOMBSTONE_MARKER:
                    // Range Deletion < > 
                    break;
            }
            }
        } catch (Exception e) {
        	logger.debug(e.getMessage());
        }
       return Collections.singletonList(audit.buildAsMutation());
    }
	
}