Gop tin

 avatar
unknown
java
2 years ago
16 kB
5
Indexable
package ScanCA;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jdbc.JDBCPool;

import mqaccess.MQAccess;
import uti.Func;
import uti.Para;

import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;

import dto.TMTranDTO;

/*
 * Duyet bang tam lien tuc
 * Tim cac giao dich da danh dau duoc tao tin nhan ()
 * Tim cac giao dich con lai trong bang xem co 2 giao dich nao cung dieu kien (TK,seq,teller,thoi gian) se gop thanh 1 tin
 * chi gop trong truong hop co 2 giao dich cung dk, khac se tao tung tin
 */
public class New3_ScanCA_JavaCompute extends MbJavaComputeNode {

	public static boolean Flow3IsStop = false;
	public static boolean Flow3IsStarted = false;
	
	private ExecutorService executor;

//	MQAccess mqaccess = new MQAccess();
//	MQQueueManager mqmng = mqaccess.accessQMgr("BSMS_QMGR");
//	MQQueue mq = null;
//	MQMessage msgQ = new MQMessage();
//	MQPutMessageOptions pmo = new MQPutMessageOptions();
	
	public void evaluate(MbMessageAssembly assembly) throws MbException {
		
		//wait for main thread Initialized
		synchronized (New1_ScanCA_JavaCompute.arrayInitializedLock) {
			while (!New1_ScanCA_JavaCompute.isArrayInitialized) {
				try {
					New1_ScanCA_JavaCompute.arrayInitializedLock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
		Func.log(Para.fullPathLog + ".ca"," New3_ScanCA_JavaCompute start!");
		
		executor = Executors.newFixedThreadPool(New1_ScanCA_JavaCompute.threadNum);
		MbOutputTerminal out = getOutputTerminal("out");
		if (New1_ScanCA_JavaCompute.collectTMTransArr != null){
			for (int i = 0; i < New1_ScanCA_JavaCompute.collectTMTransArr .length; i++) {
				Runnable r = new MergeWorker(New1_ScanCA_JavaCompute.collectTMTransArr[i], assembly, out, i);
				executor.execute(r);
			}
		}
		executor.shutdown();
		try {
			while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
				Thread.sleep(100);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	

	// @SuppressWarnings("unused")
	// private void sendToQueue2(MbMessageAssembly assembly, MbOutputTerminal
	// out,
	// List<String> columns, String sStat, String sAccNo, String sAmt,
	// String sCurr, String sDorc, String sDate, String sTime, String
	// sTmapptype,
	// String sEFTH, String sHasFee, String sFee, String sFeeDorc) {
	//
	// List<String> values = new Vector<String>();
	// values.add(sStat);
	// values.add(sAccNo);
	// values.add(sAmt);
	// values.add(sCurr);
	//
	// values.add(sDorc);
	// values.add(sDate);
	// values.add(sTime);
	// values.add(sTmapptype);
	// values.add(sEFTH);
	//
	// values.add(sHasFee);
	// values.add(sFee);
	// values.add(sFeeDorc);
	//
	// New1_ScanCA_JavaCompute.SentToQueue(assembly, out, "XTABLEX", "" ,columns
	// ,values);
	// }
	
	public class MergeWorker implements Runnable{
		
		MQAccess mqaccess = new MQAccess();
//		MQQueueManager mqmng = mqaccess.accessQMgr("BSMS_QMGR");
		MQQueueManager mqmng = mqaccess.accessQMgr("BSMS_Q");
//		MQQueueManager mqmng = mqaccess.accessQMgr("BSMS_Q");
		MQQueue mq = null;
		MQMessage msgQ = new MQMessage();
		MQPutMessageOptions pmo = new MQPutMessageOptions();
		
		List<TMTranDTO> collectTMTrans;
		MbMessageAssembly assembly;
		MbOutputTerminal out;
		int i;
		
		public MergeWorker(List<TMTranDTO> collectTMTrans,
							MbMessageAssembly assembly,
							MbOutputTerminal out,
							int i) {
			this.collectTMTrans = collectTMTrans;
			this.assembly = assembly;
			this.out = out;
			this.i = i;
		}
		
		/**
		 * Lay remark cho tin nhan khi gop tin
		 * 
		 * @param transOne
		 * @param transTwo
		 */
		private String getRemark(TMTranDTO transOne, TMTranDTO transTwo) {
			if (transOne.isTransCTLNH() && transTwo.isTransCTLNH()) {
				return transOne.getEFTH().endsWith("-12-PMT-002") ? transOne
						.getEFTH().substring(0, transOne.getEFTH().length() - 11)
						: transTwo.getEFTH().substring(0,
								transTwo.getEFTH().length() - 11);
			}
			return transTwo.getEFTH();
		}

		private void sendToQueue(MbMessageAssembly assembly, MbOutputTerminal out,
				List<String> columns, String sStat, String sAccNo, String sAmt,
				String sCurr, String sDorc, String sDate, String sTime,
				String sTmapptype, String sEFTH, String sHasFee, String sFee,
				String sFeeDorc, String sTmtxCD) {

			try {
				List<String> values = new Vector<String>();
				values.add(sStat);
				values.add(sAccNo);
				values.add(sAmt);
				values.add(sCurr);

				values.add(sDorc);
				values.add(sDate);
				values.add(sTime);
				values.add(sTmapptype);
				values.add(sEFTH);

				values.add(sHasFee);
				values.add(sFee);
				values.add(sFeeDorc);

				values.add(sTmtxCD);

				StringBuilder sb = new StringBuilder();
				sb.append("<MSG><HDR><TABLE>XTABLEX</TABLE><ACT></ACT></HDR><DATA>");
				for (int i = 0; i < columns.size(); i++) {
					sb.append("<");
					sb.append(columns.get(i));
					sb.append(">");
					sb.append(values.get(i));
					sb.append("</");
					sb.append(columns.get(i));
					sb.append(">");
				}
				sb.append("</DATA></MSG>");

				Func.log(Para.fullPathLog + ".ca", "message=" + sb);

				msgQ.clearMessage();
				msgQ.writeString(sb.toString());
				mq.put(msgQ, pmo);
//				JDBCPool.jdbcTemplate.execute("insert into CA_MERGE values (sysdate)");
			} catch (Exception e) {
				Func.log(Para.fullPathLog + ".ca", "sendToQueue:" + e.toString());
			}
		}
		
		@Override
		public void run() {
			if (mqmng == null) {
				New1_ScanCA_JavaCompute.RequestStopAll = true;
				Func.log(Para.fullPathLog + ".ca",
						"QUEUE NOT AVAILABLE!!! flow 3 is stopped!");
				return;
			}
			mq = mqaccess.accessMQueue(mqmng, "Q.CATRANS");
			if (mq == null) {
				New1_ScanCA_JavaCompute.RequestStopAll = true;
				Func.log(Para.fullPathLog + ".ca",
						"QUEUE NOT AVAILABLE!!! flow 3 is stopped!");
				return;
			}
			msgQ.messageId = MQConstants.MQMI_NONE;

			Flow3IsStarted = true;

			List<String> columns = new Vector<String>();

			columns.add("TMTXSTAT");
			columns.add("TMACCTNO");
			columns.add("TMTXAMT");
			columns.add("TMGLCUR");

			columns.add("TMDORC");
			columns.add("TMENTDT7");
			columns.add("TMTIMENT");
			columns.add("TMAPPTYPE");
			columns.add("TMEFTH");

			columns.add("HASFEE"); // 0 OR 1
			columns.add("FEE");
			columns.add("FEEDORC");

			columns.add("TMTXCD");

//			Func.log(Para.fullPathLog + ".caMergeWorker" + i,
//					"MergeWorker start:" + i);
			try{
			while (true) {
//				Func.log(Para.fullPathLog + ".caMergeWorker" + i,
//						"MergeWorker " + i + " running:" + collectTMTrans.size());
				ArrayList<TMTranDTO> l = new ArrayList<TMTranDTO>();
				for (int i = 0; i < collectTMTrans.size(); i++) {
					l.add(collectTMTrans.get(i));
				}

				for (int i = 0; i < l.size() - 1; i++) {
					TMTranDTO b = l.get(i);
					if (b == null)
						continue;

					if (b.getStatus() != TMTranDTO.STATUS_FORCREATE) {
						continue;
					}
					for (int j = i + 1; j < l.size(); j++) {
						TMTranDTO c = (l.get(j));

						if (c.compare(b)) {
							// trung dieu kien
							boolean isOK = true;
							if (j < l.size() - 1) {
								for (int k = j + 1; k < l.size(); k++) {
									TMTranDTO d = l.get(k);

									if (d.compare(b)) {
										// lai bi trung dieu kien
										isOK = false;
										// Func.log(Para.fullPathLog +
										// ".ca","Nhieu hon 2 gd: "+ b.getAccNo() +
										// "#"+ b.getTellID() + "#" + b.getSeq()+
										// "#" + b.getTime());
										//thinhnt3 Gop tin tat toan TLBA
										if (collectTMTrans.contains(b)
												&& collectTMTrans.contains(c) 
												&& collectTMTrans.contains(d)
												// 1960 - Trung gian; 1920 - Chuyen khoan; 1900 - Rut tien mat
												&& (b.getTmtxcd().trim().equals("1960") ||  b.getTmtxcd().trim().equals("1920") ||  b.getTmtxcd().trim().equals("1900"))
												&& (c.getTmtxcd().trim().equals("1960") || c.getTmtxcd().trim().equals("1920")  || c.getTmtxcd().trim().equals("1900") )
												&& (d.getTmtxcd().trim().equals("1960") || d.getTmtxcd().trim().equals("1920") || d.getTmtxcd().trim().equals("1900"))
												&& (b.getTmhosttxcd().trim().equals("26")||b.getTmhosttxcd().trim().equals("160")||b.getTmhosttxcd().trim().equals("165"))
												&& (c.getTmhosttxcd().trim().equals("160") || c.getTmhosttxcd().trim().equals("26")||c.getTmhosttxcd().trim().equals("165"))
												&& (d.getTmhosttxcd().trim().equals("165")||d.getTmhosttxcd().trim().equals("26")||d.getTmhosttxcd().trim().equals("160"))){
												b.setStatus(TMTranDTO.STATUS_CREATED);
												c.setStatus(TMTranDTO.STATUS_CREATED);
												d.setStatus(TMTranDTO.STATUS_CREATED);
												
												collectTMTrans.remove(b);
												collectTMTrans.remove(c);
												collectTMTrans.remove(d);
												
												if (b.getTmhosttxcd().trim().equals("26") || b.getTmhosttxcd().trim().equals("160")){
													sendToQueue(assembly, out, columns, b.getStat(),
															b.getAccNo(), b.getAmt(), b.getCurr(), b.getDorc(),
															b.getDate(), b.getTime(), b.getTmapptype(),
															b.getEFTH(), "0", "", "", b.getTmtxcd());
												}
												
												if (c.getTmhosttxcd().trim().equals("26") || c.getTmhosttxcd().trim().equals("160")){
													sendToQueue(assembly, out, columns, c.getStat(),
															c.getAccNo(), c.getAmt(), c.getCurr(), c.getDorc(),
															c.getDate(), c.getTime(), c.getTmapptype(),
															c.getEFTH(), "0", "", "", c.getTmtxcd());
												}
												
												if (d.getTmhosttxcd().trim().equals("26") || d.getTmhosttxcd().trim().equals("160")){
													sendToQueue(assembly, out, columns, d.getStat(),
															d.getAccNo(), d.getAmt(), d.getCurr(), d.getDorc(),
															d.getDate(), d.getTime(), d.getTmapptype(),
															d.getEFTH(), "0", "", "", d.getTmtxcd());
												}
											}
										break;
									}
								}
							}
							if (isOK) {
								// ghep tin nhan
								if (collectTMTrans
										.contains(b)
										&& collectTMTrans
												.contains(c)) {

									b.setStatus(TMTranDTO.STATUS_CREATED);
									c.setStatus(TMTranDTO.STATUS_CREATED);
									collectTMTrans
											.remove(b);
									collectTMTrans
											.remove(c);

									/* xac dinh tin nhan phi, chinh */
									// if (b.getTellID() != null
									// && !b.getTellID().trim().equals("")
									// && Para.IBMB_TELLER.contains("," +
									// b.getTellID().trim() + ",")){
									// // IBMB
									// if
									// (c.getTmhosttxcd().trim().equalsIgnoreCase("62"))
									// {
									// sendToQueue(assembly, out, columns,
									// b.getStat(), b.getAccNo(), b.getAmt(),
									// b.getCurr(),
									// b.getDorc(), b.getDate(), b.getTime(),
									// b.getTmapptype(), c.getEFTH(),"1",
									// c.getAmt());
									// } else {
									// sendToQueue(assembly, out, columns,
									// c.getStat(), c.getAccNo(), c.getAmt(),
									// c.getCurr(),
									// c.getDorc(), c.getDate(), c.getTime(),
									// c.getTmapptype(), b.getEFTH(),"1",
									// b.getAmt());
									// }
									// } else {
									// if
									// (b.getAmtDecimal().compareTo(c.getAmtDecimal())
									// > 0) {
									// sendToQueue(assembly, out, columns,
									// b.getStat(), b.getAccNo(), b.getAmt(),
									// b.getCurr(),
									// b.getDorc(), b.getDate(), b.getTime(),
									// b.getTmapptype(), c.getEFTH(),"1",
									// c.getAmt());
									// } else {
									// sendToQueue(assembly, out, columns,
									// c.getStat(), c.getAccNo(), c.getAmt(),
									// c.getCurr(),
									// c.getDorc(), c.getDate(), c.getTime(),
									// c.getTmapptype(), b.getEFTH(),"1",
									// b.getAmt());
									// }
									// }

									// if
									// ((b.getStat().trim().equalsIgnoreCase("CE")
									// &&
									// !c.getStat().trim().equalsIgnoreCase("CE"))
									// ||
									// (!b.getStat().trim().equalsIgnoreCase("CE")
									// &&
									// c.getStat().trim().equalsIgnoreCase("CE"))){
									// // gd bi EC ngay lap tuc thi khong lam gi
									// ;
									// } else {
									// sendToQueue(assembly, out, columns,
									// b.getStat(), b.getAccNo(), b.getAmt(),
									// b.getCurr(),b.getDorc(), b.getDate(),
									// b.getTime(), b.getTmapptype(),
									// c.getEFTH(),"1", c.getAmt(), c.getDorc(),
									// b.getTmtxcd());
									sendToQueue(assembly, out, columns,
											b.getStat(), b.getAccNo(), b.getAmt(),
											b.getCurr(), b.getDorc(), b.getDate(),
											b.getTime(), b.getTmapptype(),
											getRemark(b, c), "1", c.getAmt(),
											c.getDorc(), b.getTmtxcd());
									// }
									/* end */
								}
							}
						}
					}

					if (b.getStatus() == TMTranDTO.STATUS_FORCREATE
							&& collectTMTrans.contains(b)) {
						// khong co giao dich phi, tao tin nhan binh thuong
						b.setStatus(TMTranDTO.STATUS_CREATED);
						collectTMTrans.remove(b);
						sendToQueue(assembly, out, columns, b.getStat(),
								b.getAccNo(), b.getAmt(), b.getCurr(), b.getDorc(),
								b.getDate(), b.getTime(), b.getTmapptype(),
								b.getEFTH(), "0", "", "", b.getTmtxcd());
						// b=null;
						// New4_ScanCA_JavaCompute.collectRemoved.add(b);
						// Func.log(Para.fullPathLog + ".ca",
						// "===================>TK " + b.getAccNo() + " +/- " +
						// b.getAmt() + " vao " + b.getTime()+ ";ND:" +
						// b.getEFTH());
					}

					// Func.log(Para.fullPathLog + ".ca","size=" +
					// New1_ScanCA_JavaCompute.collectTMTrans.size());
					// Func.log(Para.fullPathLog + ".ca","tempRemoveSize=" +
					// New4_ScanCA_JavaCompute.collectRemoved.size());
				}

				// con 1 message ma doi lau
				if (collectTMTrans.size() == 1) {
					TMTranDTO b = collectTMTrans.get(0);

					if (b.getStatus() == TMTranDTO.STATUS_FORCREATE) {
						b.setStatus(TMTranDTO.STATUS_CREATED);
						collectTMTrans.remove(b);
						sendToQueue(assembly, out, columns, b.getStat(),
								b.getAccNo(), b.getAmt(), b.getCurr(), b.getDorc(),
								b.getDate(), b.getTime(), b.getTmapptype(),
								b.getEFTH(), "0", "", "", b.getTmtxcd());
						// b = null;
						// New4_ScanCA_JavaCompute.collectRemoved.add(b);
						// Func.log(Para.fullPathLog +
						// ".ca","===================>TK " + b.getAccNo()+ " +/- " +
						// b.getAmt() + " vao " + b.getTime()+ ";ND:" +
						// b.getEFTH());
					}
				}

				// l.clear();
				l = null;

				// request stop
				if (New1_ScanCA_JavaCompute.RequestStopAll
						&& collectTMTrans.isEmpty()) {
					Func.log(Para.fullPathLog + ".ca", "flow 3 is stopped!");
					try {
						mq.close();
						mqmng.disconnect();
					} catch (Exception e) {
						// TODO Auto-generated catch block
						Func.log(Para.fullPathLog + ".ca", e.toString());
					}
					break;
				}

				try {
					Thread.sleep(10);
				} catch (Exception e) {
				}
			}	
//		}catch(Error e){
//			Func.log(Para.fullPathLog + ".caError" + i,
//					"CheckTimoutWorker " + i + ": " + Func.getStackTrace(e));
		}catch (Exception e) {
			Func.log(Para.fullPathLog + ".caException" + i,
					"CheckTimoutWorker " + i + ": " + Func.getStackTrace(e));
		}
			
		}
		
	}
}
Editor is loading...