Untitled

mail@pastecode.io avatar
unknown
plain_text
7 months ago
43 kB
2
Indexable
Never
#include <iostream>
#include <chrono>
#include <thread>
#include <ctime>
#include <random>
#include <string>
#include <climits> 
#include <mongocxx/instance.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/stdx.hpp>
#include <mongocxx/uri.hpp>
#include <bsoncxx/types.hpp>
#include <bsoncxx/json.hpp>
#include <bsoncxx/types.hpp>
#include <bsoncxx/document/view.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/basic/array.hpp>
#include <bsoncxx/builder/basic/kvp.hpp>
#include <mongocxx/collection.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <json/json.h>

namespace beast = boost::beast;
namespace http = beast::http;
using tcp = boost::asio::ip::tcp;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::make_document;

mongocxx::instance inst{};
std::default_random_engine generator;
std::uniform_real_distribution<double> distribution(0.0, 1.0);
std::uniform_int_distribution<int> intDistribution(0, 9);
std::uniform_int_distribution<int> stateDistribution(0, 2);

boost::asio::io_context io_context;
boost::asio::steady_timer timer(io_context, boost::asio::chrono::seconds(1));
std::function<void(const boost::system::error_code&)> periodic_function;

void changeStreamFunction() {
	try
	{
		// Create an instance.

		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["webmsg"];


		// Ping the database.
		const auto ping_cmd = bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("ping", 1));
		db.run_command(ping_cmd.view());
		std::cout << "Pinged your deployment. You successfully connected to MongoDB!" << std::endl;

		while (true) // Loop forever
		{
			try
			{
				mongocxx::options::change_stream options;
				options.full_document("updateLookup");
				const std::chrono::milliseconds await_time{1000};
				options.max_await_time(await_time);
				mongocxx::change_stream streamChanges = collection.watch(options);
				for (const bsoncxx::document::view& event : streamChanges)
				{
					bsoncxx::document::element fullDocumentElement = event["fullDocument"];
					if (fullDocumentElement) {
						bsoncxx::document::view fullDocumentView = fullDocumentElement.get_document();
						std::string fullDocumentJson = bsoncxx::to_json(fullDocumentView);
						std::cout << "Full Document: " << fullDocumentJson << std::endl;
					}
					else {
						std::cout << "Invalid" << std::endl;
					}
				}

			}
			catch (const std::exception& e)
			{
				std::cerr << "MongoDB watcher caught exception: " << e.what() << std::endl;
			}
		}
	}
	catch (const std::exception& e)
	{
		// Handle errors.
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void rmuxPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["rmux"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};

				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i)));
					subdoc.append(bsoncxx::builder::basic::kvp("temperature", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("humidity", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", intDistribution(generator) % 2));
					subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", intDistribution(generator) % 2));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", arr))));
				//std::string docJson = bsoncxx::to_json(doc);
				//std::cout << "BSON Document: " << docJson << std::endl;
				auto result = collection.update_one({}, doc.extract());

				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Rmux " << count << std::endl;
				//std::cout << "Elapsed time: " << elapsedTime.count() << " milliseconds" << std::endl;

				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void chillerPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["chiller"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "chiller_0" + std::to_string(i)));
					subdoc.append(bsoncxx::builder::basic::kvp("alarm_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("alarm_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("pump_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("pump_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("compressor", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("chiller_state", intDistribution(generator) % 3));
					subdoc.append(bsoncxx::builder::basic::kvp("temperature_1", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("temperature_2", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("volume", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("chiller", arr))));

				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Chiller " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void dioPublish(int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["dio"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= 1; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "dio_0" + std::to_string(i)));
					subdoc.append(bsoncxx::builder::basic::kvp("estop_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("estop_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("emon", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("emhigh", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("reset", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("ups_off", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("dio", arr))));

				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());

				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Dio " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void laserUnitPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["laserUnit"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					std::string desc;
					if (i < 10) {
						desc = "laserUnit_0" + std::to_string(i);
					}
					else {
						desc = "laserUnit_" + std::to_string(i);
					}
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
					subdoc.append(bsoncxx::builder::basic::kvp("power", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("current_usage_s", intDistribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("current_usage_accum_s", intDistribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("lu_state", intDistribution(generator) % 10));

					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("laserUnit", arr))));

				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at laserUnit " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void monitorCardPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["monitorCard"];
		try {
			double delay = 0.05;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);



				auto doc = bsoncxx::builder::basic::document{};

				auto graphDoc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					std::string desc;
					if (i < 10) {
						desc = "monitorCard_0" + std::to_string(i);
					}
					else {
						desc = "monitorCard_" + std::to_string(i);
					}
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
					subdoc.append(bsoncxx::builder::basic::kvp("is_seed_on", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("ld_temp", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("cmb_temp", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("cps_temp", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("pd1", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("pd2", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("pd3", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("pd4", distribution(generator)));

					subdoc.append(bsoncxx::builder::basic::kvp("mon_state", intDistribution(generator) % 8));
					subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_1", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_2", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("monitorCard", arr))));
				graphDoc.append(bsoncxx::builder::basic::kvp("monitorCard", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				auto result = collection.update_one({}, doc.extract());
				
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at MonitorCard " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void seedPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["seed"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"f
					bsoncxx::builder::basic::document subdoc{};
					std::string desc;
					if (i < 10) {
						desc = "seed_0" + std::to_string(i);
					}
					else {
						desc = "seed_" + std::to_string(i);
					}
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_status", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_voltage", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("tec_current", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("tec_voltage", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("ther_res", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("pd_current", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("ain1", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("ain2", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("vtmax", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("vtmin", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("rtset", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("rtmin", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("rtmax", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("ilaser", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("ain", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("seed", arr))));
				//std::string docJson = bsoncxx::to_json(doc);
				//std::cout << "BSON Document: " << docJson << std::endl;
				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Seed " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void psuPublish(int numUnit, int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["psu"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= numUnit; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					std::string desc;
					if (i < 10) {
						desc = "psu_0" + std::to_string(i);
					}
					else {
						desc = "psu_" + std::to_string(i);
					}
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
					subdoc.append(bsoncxx::builder::basic::kvp("voltage_oper", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("voltage_lim", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("current_lim", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("current_oper", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("current_meas", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("voltage_meas", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("psu_state", intDistribution(generator) % 4));
					subdoc.append(bsoncxx::builder::basic::kvp("outp", bool(intDistribution(generator))));
					subdoc.append(bsoncxx::builder::basic::kvp("is_connected", bool(intDistribution(generator))));

					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("psu", arr))));

				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Psu " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void systemPublish(int duration) {
	try {
		// Replace the connection string with your MongoDB deployment's connection string.
		const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

		// Set the version of the Stable API on the client.
		mongocxx::options::client client_options;
		const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
		client_options.server_api_opts(api);

		// Setup the connection and get a handle on the "admin" database.
		mongocxx::client conn{ uri, client_options };
		mongocxx::database db = conn["dso-gb"];
		mongocxx::collection collection = db["changestream_data"];
		mongocxx::collection graphCollection = db["system"];

		try {
			double delay = 0.5;
			int count = 0;
			auto startTime = std::chrono::steady_clock::now();

			while (true) {
				auto currentTime = std::chrono::steady_clock::now();
				auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);

				auto graphDoc = bsoncxx::builder::basic::document{};


				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array arr{};

				for (int i = 1; i <= 1; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i)));
					subdoc.append(bsoncxx::builder::basic::kvp("sys_state", intDistribution(generator) % 8));
					subdoc.append(bsoncxx::builder::basic::kvp("mode_state", intDistribution(generator) % 4));
					subdoc.append(bsoncxx::builder::basic::kvp("power_state", intDistribution(generator) % 6));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", intDistribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
					auto timestamp = std::chrono::system_clock::now();
					subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
					arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", arr))));

				auto result = collection.update_one({}, doc.extract());
				graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
				auto graphResult = graphCollection.insert_one(graphDoc.view());
				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at System " << count << std::endl;
				if (elapsedTime.count() >= duration) {
					break; // Exit the loop after 10 seconds
				}
			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;

		}
	}
	catch (const std::exception& e) {
		std::cout << "Exception: " << e.what() << std::endl;
	}
}
void make_request(boost::asio::io_context& io_context) {
	// Create a resolver and connect to the server
	tcp::resolver resolver(io_context);
	tcp::socket socket(io_context);

	beast::error_code ec;
	std::vector<std::string> ports = { "4000", "4001", "4002", "4003" };

	while (true) {
		std::vector<Json::Value> rmux_data;
		bool fault = false;
		for (const std::string& port : ports) {

			auto const results = resolver.resolve("localhost", port, ec);

			if (ec) {
				std::cerr << "Error resolving host: " << ec.message() << std::endl;
				return;
			}

			boost::asio::connect(socket, results.begin(), results.end(), ec);

			if (ec) {
				std::cerr << "Error connecting to host: " << ec.message() << std::endl;
				return;
			}

			// Set up the HTTP request
			http::request<http::string_body> req{http::verb::get, "/appAll.json", 11};
			req.set(http::field::host, "localhost");
			req.set(http::field::user_agent, "Boost.Beast GET");

			// Send the HTTP request


			http::write(socket, req, ec);

			if (ec) {
				std::cerr << "Error sending HTTP request: " << ec.message() << std::endl;
				return;
			}

			// Receive and print the HTTP response
			beast::flat_buffer buffer;
			http::response<http::string_body> res;

			http::read(socket, buffer, res, ec);

			if (ec) {
				std::cerr << "Error receiving HTTP response: " << ec.message() << std::endl;
				return;
			}

			try {
				std::string responseBody = res.body();
				//std::cout << "Body: " << responseBody << std::endl;

				Json::Value root;  // Create a JSON Value object
				Json::Reader reader;  // Create a JSON Reader

				if (reader.parse(responseBody, root)) {
					// Access JSON values
					Json::Value responseJson;
					responseJson["temp"] = std::stof(root["data"]["all"][2]["isens"][0]["val"].asString());
					responseJson["humidity"] = std::stof(root["data"]["all"][2]["isens"][1]["val"].asString());
					responseJson["dew"] = std::stof(root["data"]["all"][2]["isens"][2]["val"].asString());
					responseJson["leak1"] = std::stoi(root["data"]["all"][2]["isens"][3]["val"].asString());
					responseJson["leak2"] = std::stoi(root["data"]["all"][2]["isens"][4]["val"].asString());
					responseJson["cbit"] = std::stoi(root["data"]["all"][2]["isens"][5]["val"].asString());
					//std::cout << "Port: " << port << std::endl;
					//std::cout << "Temp: " << responseJson["temp"] << std::endl;
					//std::cout << "Humidity: " << responseJson["humidity"] << std::endl;
					//std::cout << "Dew: " << responseJson["dew"] << std::endl;
					rmux_data.push_back(responseJson);
					if (responseJson["temp"].asFloat() > 50 || responseJson["humidity"].asFloat() > 50 || responseJson["dew"].asFloat() > 50) {
						fault = true;
					}
				}
				else {
					std::cerr << "Failed to parse JSON data" << std::endl;
				}
			}
			catch (const std::exception& e) {
				std::cerr << "Error parsing data: " << e.what() << std::endl;
			}
		}
		for (size_t i = 0; i < rmux_data.size(); ++i) {
			std::cout << "Data from port " << ports[i] << ":" << std::endl;
			std::cout << "Temperature: " << rmux_data[i]["temp"].asFloat() << std::endl;
			std::cout << "Humidity: " << rmux_data[i]["humidity"].asFloat() << std::endl;
			std::cout << "Dew: " << rmux_data[i]["dew"].asFloat() << std::endl;
			std::cout << "Leak1: " << rmux_data[i]["leak1"].asInt() << std::endl;
			std::cout << "Leak2: " << rmux_data[i]["leak2"].asInt() << std::endl;
			std::cout << "CBIT: " << rmux_data[i]["cbit"].asInt() << std::endl;
			std::cout << std::endl;
		}

		try {
			// Replace the connection string with your MongoDB deployment's connection string.
			const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };

			// Set the version of the Stable API on the client.
			mongocxx::options::client client_options;
			const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
			client_options.server_api_opts(api);

			// Setup the connection and get a handle on the "admin" database.
			mongocxx::client conn{ uri, client_options };
			mongocxx::database db = conn["dso-gb"];
			mongocxx::collection collection = db["changestream_data"];
			try {
				double delay = 0.2;
				int count = 0;

				auto doc = bsoncxx::builder::basic::document{};

				bsoncxx::builder::basic::array system_arr{};

				for (int i = 0; i < 1; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i + 1)));
					if (fault) {
						subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 7));

					}
					else {
						subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 1));
					}
					subdoc.append(bsoncxx::builder::basic::kvp("mode_state", 0));
					subdoc.append(bsoncxx::builder::basic::kvp("power_state", 0));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator)));
					subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", count));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));

					system_arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", system_arr))));


				bsoncxx::builder::basic::array rmux_arr{};

				for (int i = 0; i < 4; ++i) {  // Generate two "docs"
					bsoncxx::builder::basic::document subdoc{};
					subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1));
					subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i + 1)));
					subdoc.append(bsoncxx::builder::basic::kvp("temperature", rmux_data[i]["temp"].asFloat()));
					subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", rmux_data[i]["humidity"].asFloat()));
					subdoc.append(bsoncxx::builder::basic::kvp("humidity", rmux_data[i]["dew"].asFloat()));
					subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", rmux_data[i]["leak1"].asInt()));
					subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", rmux_data[i]["leak2"].asInt()));
					subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
					subdoc.append(bsoncxx::builder::basic::kvp("cbit", rmux_data[i]["cbit"].asInt()));

					rmux_arr.append(subdoc);
				}
				doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", rmux_arr))));
				auto result = collection.update_one({}, doc.extract());

				std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
				count++;

				std::cout << "Inserted at Rmux " << count << std::endl;



			}
			catch (const std::exception& e) {
				std::cout << "Exception: " << e.what() << std::endl;

			}
		}
		catch (const std::exception& e) {
			std::cout << "Exception: " << e.what() << std::endl;
		}
	}
}
int main()
{
	std::string userInput;

	// Prompt the user for input
	std::cout << "Select type of simulation:\n1.Observe Web Commands\n2.Publish Data\n3.Observe Web Commands + Publish Data\n4.Rmux Emulator\n";

	// Read a full line of input from standard input
	std::cin >> userInput;

	switch (userInput[0]) {
	case '1':
	{
		std::thread changeStreamThread(changeStreamFunction); // watch
		changeStreamThread.join();

		break;
	}

	case '2':
	{
		int luNum;
		int chillerNum;
		int rmuxNum;
		int duration;
		std::cout << "Number of Laser Units: ";
		std::cin >> luNum;
		std::cout << "Number of Chillers: ";
		std::cin >> chillerNum;
		std::cout << "Number of Rmux: ";
		std::cin >> rmuxNum;
		std::cout << "Duration:(0=infinite): ";
		std::cin >> duration;
		if (duration == 0) {
			duration = INT_MAX;
		}
		std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1
		std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2
		std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3
		std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4
		std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5
		std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6
		std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7
		std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8


		seedCardPublishThread.join();
		monitorCardPublishThread.join();
		psuCardPublishThread.join();
		chillerPublishThread.join();
		rmuxPublishThread.join();
		dioPublishThread.join();
		laserUnitPublishThread.join();
		systemPublishThread.join();

		break;
	}

	case '3':
	{
		int luNum;
		int chillerNum;
		int rmuxNum;
		int duration;
		std::cout << "Number of Laser Units: ";
		std::cin >> luNum;
		std::cout << "Number of Chillers: ";
		std::cin >> chillerNum;
		std::cout << "Number of Rmux: ";
		std::cin >> rmuxNum;
		std::cout << "Duration:(0=infinite): ";
		std::cin >> duration;
		std::thread changeStreamThread(changeStreamFunction); // watch
		std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1
		std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2
		std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3
		std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4
		std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5
		std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6
		std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7
		std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8

		changeStreamThread.join();
		seedCardPublishThread.join();
		monitorCardPublishThread.join();
		psuCardPublishThread.join();
		chillerPublishThread.join();
		rmuxPublishThread.join();
		dioPublishThread.join();
		laserUnitPublishThread.join();
		systemPublishThread.join();
		break;
	}

	case '4':
	{
		make_request(io_context);
		return 0;
	}

	default:
		std::cout << "Invalid choice." << std::endl;
	}

	return 0;
}
Leave a Comment