Untitled
unknown
plain_text
2 years ago
43 kB
6
Indexable
#include <iostream> #include <chrono> #include <thread> #include <ctime> #include <random> #include <string> #include <climits> #include <mongocxx/instance.hpp> #include <mongocxx/client.hpp> #include <mongocxx/stdx.hpp> #include <mongocxx/uri.hpp> #include <bsoncxx/types.hpp> #include <bsoncxx/json.hpp> #include <bsoncxx/types.hpp> #include <bsoncxx/document/view.hpp> #include <bsoncxx/builder/basic/document.hpp> #include <bsoncxx/builder/basic/array.hpp> #include <bsoncxx/builder/basic/kvp.hpp> #include <mongocxx/collection.hpp> #include <boost/beast/core.hpp> #include <boost/beast/http.hpp> #include <boost/asio/io_context.hpp> #include <boost/asio/steady_timer.hpp> #include <boost/asio/ip/tcp.hpp> #include <json/json.h> namespace beast = boost::beast; namespace http = beast::http; using tcp = boost::asio::ip::tcp; using bsoncxx::builder::basic::kvp; using bsoncxx::builder::basic::make_array; using bsoncxx::builder::basic::make_document; mongocxx::instance inst{}; std::default_random_engine generator; std::uniform_real_distribution<double> distribution(0.0, 1.0); std::uniform_int_distribution<int> intDistribution(0, 9); std::uniform_int_distribution<int> stateDistribution(0, 2); boost::asio::io_context io_context; boost::asio::steady_timer timer(io_context, boost::asio::chrono::seconds(1)); std::function<void(const boost::system::error_code&)> periodic_function; void changeStreamFunction() { try { // Create an instance. // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["webmsg"]; // Ping the database. const auto ping_cmd = bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("ping", 1)); db.run_command(ping_cmd.view()); std::cout << "Pinged your deployment. You successfully connected to MongoDB!" << std::endl; while (true) // Loop forever { try { mongocxx::options::change_stream options; options.full_document("updateLookup"); const std::chrono::milliseconds await_time{1000}; options.max_await_time(await_time); mongocxx::change_stream streamChanges = collection.watch(options); for (const bsoncxx::document::view& event : streamChanges) { bsoncxx::document::element fullDocumentElement = event["fullDocument"]; if (fullDocumentElement) { bsoncxx::document::view fullDocumentView = fullDocumentElement.get_document(); std::string fullDocumentJson = bsoncxx::to_json(fullDocumentView); std::cout << "Full Document: " << fullDocumentJson << std::endl; } else { std::cout << "Invalid" << std::endl; } } } catch (const std::exception& e) { std::cerr << "MongoDB watcher caught exception: " << e.what() << std::endl; } } } catch (const std::exception& e) { // Handle errors. std::cout << "Exception: " << e.what() << std::endl; } } void rmuxPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["rmux"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i))); subdoc.append(bsoncxx::builder::basic::kvp("temperature", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("humidity", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", intDistribution(generator) % 2)); subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", intDistribution(generator) % 2)); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", arr)))); //std::string docJson = bsoncxx::to_json(doc); //std::cout << "BSON Document: " << docJson << std::endl; auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Rmux " << count << std::endl; //std::cout << "Elapsed time: " << elapsedTime.count() << " milliseconds" << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void chillerPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["chiller"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "chiller_0" + std::to_string(i))); subdoc.append(bsoncxx::builder::basic::kvp("alarm_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("alarm_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("pump_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("pump_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("compressor", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("chiller_state", intDistribution(generator) % 3)); subdoc.append(bsoncxx::builder::basic::kvp("temperature_1", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("temperature_2", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("volume", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("chiller", arr)))); auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Chiller " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void dioPublish(int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["dio"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= 1; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "dio_0" + std::to_string(i))); subdoc.append(bsoncxx::builder::basic::kvp("estop_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("estop_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("emon", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("emhigh", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("reset", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("ups_off", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("dio", arr)))); auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Dio " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void laserUnitPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["laserUnit"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; std::string desc; if (i < 10) { desc = "laserUnit_0" + std::to_string(i); } else { desc = "laserUnit_" + std::to_string(i); } subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", desc)); subdoc.append(bsoncxx::builder::basic::kvp("power", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("current_usage_s", intDistribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("current_usage_accum_s", intDistribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("lu_state", intDistribution(generator) % 10)); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("laserUnit", arr)))); auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at laserUnit " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void monitorCardPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["monitorCard"]; try { double delay = 0.05; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto doc = bsoncxx::builder::basic::document{}; auto graphDoc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; std::string desc; if (i < 10) { desc = "monitorCard_0" + std::to_string(i); } else { desc = "monitorCard_" + std::to_string(i); } subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", desc)); subdoc.append(bsoncxx::builder::basic::kvp("is_seed_on", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("ld_temp", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("cmb_temp", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("cps_temp", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("pd1", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("pd2", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("pd3", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("pd4", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("mon_state", intDistribution(generator) % 8)); subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_1", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_2", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("monitorCard", arr)))); graphDoc.append(bsoncxx::builder::basic::kvp("monitorCard", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); auto result = collection.update_one({}, doc.extract()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at MonitorCard " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void seedPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["seed"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"f bsoncxx::builder::basic::document subdoc{}; std::string desc; if (i < 10) { desc = "seed_0" + std::to_string(i); } else { desc = "seed_" + std::to_string(i); } subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", desc)); subdoc.append(bsoncxx::builder::basic::kvp("laser_status", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("laser_voltage", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("tec_current", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("tec_voltage", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("ther_res", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("pd_current", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("ain1", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("ain2", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("vtmax", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("vtmin", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("rtset", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("rtmin", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("rtmax", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("ilaser", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("ain", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("seed", arr)))); //std::string docJson = bsoncxx::to_json(doc); //std::cout << "BSON Document: " << docJson << std::endl; auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Seed " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void psuPublish(int numUnit, int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["psu"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= numUnit; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; std::string desc; if (i < 10) { desc = "psu_0" + std::to_string(i); } else { desc = "psu_" + std::to_string(i); } subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", desc)); subdoc.append(bsoncxx::builder::basic::kvp("voltage_oper", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("voltage_lim", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("current_lim", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("current_oper", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("current_meas", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("voltage_meas", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("psu_state", intDistribution(generator) % 4)); subdoc.append(bsoncxx::builder::basic::kvp("outp", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("is_connected", bool(intDistribution(generator)))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("psu", arr)))); auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Psu " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void systemPublish(int duration) { try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; mongocxx::collection graphCollection = db["system"]; try { double delay = 0.5; int count = 0; auto startTime = std::chrono::steady_clock::now(); while (true) { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime); auto graphDoc = bsoncxx::builder::basic::document{}; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array arr{}; for (int i = 1; i <= 1; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i))); subdoc.append(bsoncxx::builder::basic::kvp("sys_state", intDistribution(generator) % 8)); subdoc.append(bsoncxx::builder::basic::kvp("mode_state", intDistribution(generator) % 4)); subdoc.append(bsoncxx::builder::basic::kvp("power_state", intDistribution(generator) % 6)); subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", intDistribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); auto timestamp = std::chrono::system_clock::now(); subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp))); arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", arr)))); auto result = collection.update_one({}, doc.extract()); graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr)); auto graphResult = graphCollection.insert_one(graphDoc.view()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at System " << count << std::endl; if (elapsedTime.count() >= duration) { break; // Exit the loop after 10 seconds } } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } void make_request(boost::asio::io_context& io_context) { // Create a resolver and connect to the server tcp::resolver resolver(io_context); tcp::socket socket(io_context); beast::error_code ec; std::vector<std::string> ports = { "4000", "4001", "4002", "4003" }; while (true) { std::vector<Json::Value> rmux_data; bool fault = false; for (const std::string& port : ports) { auto const results = resolver.resolve("localhost", port, ec); if (ec) { std::cerr << "Error resolving host: " << ec.message() << std::endl; return; } boost::asio::connect(socket, results.begin(), results.end(), ec); if (ec) { std::cerr << "Error connecting to host: " << ec.message() << std::endl; return; } // Set up the HTTP request http::request<http::string_body> req{http::verb::get, "/appAll.json", 11}; req.set(http::field::host, "localhost"); req.set(http::field::user_agent, "Boost.Beast GET"); // Send the HTTP request http::write(socket, req, ec); if (ec) { std::cerr << "Error sending HTTP request: " << ec.message() << std::endl; return; } // Receive and print the HTTP response beast::flat_buffer buffer; http::response<http::string_body> res; http::read(socket, buffer, res, ec); if (ec) { std::cerr << "Error receiving HTTP response: " << ec.message() << std::endl; return; } try { std::string responseBody = res.body(); //std::cout << "Body: " << responseBody << std::endl; Json::Value root; // Create a JSON Value object Json::Reader reader; // Create a JSON Reader if (reader.parse(responseBody, root)) { // Access JSON values Json::Value responseJson; responseJson["temp"] = std::stof(root["data"]["all"][2]["isens"][0]["val"].asString()); responseJson["humidity"] = std::stof(root["data"]["all"][2]["isens"][1]["val"].asString()); responseJson["dew"] = std::stof(root["data"]["all"][2]["isens"][2]["val"].asString()); responseJson["leak1"] = std::stoi(root["data"]["all"][2]["isens"][3]["val"].asString()); responseJson["leak2"] = std::stoi(root["data"]["all"][2]["isens"][4]["val"].asString()); responseJson["cbit"] = std::stoi(root["data"]["all"][2]["isens"][5]["val"].asString()); //std::cout << "Port: " << port << std::endl; //std::cout << "Temp: " << responseJson["temp"] << std::endl; //std::cout << "Humidity: " << responseJson["humidity"] << std::endl; //std::cout << "Dew: " << responseJson["dew"] << std::endl; rmux_data.push_back(responseJson); if (responseJson["temp"].asFloat() > 50 || responseJson["humidity"].asFloat() > 50 || responseJson["dew"].asFloat() > 50) { fault = true; } } else { std::cerr << "Failed to parse JSON data" << std::endl; } } catch (const std::exception& e) { std::cerr << "Error parsing data: " << e.what() << std::endl; } } for (size_t i = 0; i < rmux_data.size(); ++i) { std::cout << "Data from port " << ports[i] << ":" << std::endl; std::cout << "Temperature: " << rmux_data[i]["temp"].asFloat() << std::endl; std::cout << "Humidity: " << rmux_data[i]["humidity"].asFloat() << std::endl; std::cout << "Dew: " << rmux_data[i]["dew"].asFloat() << std::endl; std::cout << "Leak1: " << rmux_data[i]["leak1"].asInt() << std::endl; std::cout << "Leak2: " << rmux_data[i]["leak2"].asInt() << std::endl; std::cout << "CBIT: " << rmux_data[i]["cbit"].asInt() << std::endl; std::cout << std::endl; } try { // Replace the connection string with your MongoDB deployment's connection string. const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" }; // Set the version of the Stable API on the client. mongocxx::options::client client_options; const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 }; client_options.server_api_opts(api); // Setup the connection and get a handle on the "admin" database. mongocxx::client conn{ uri, client_options }; mongocxx::database db = conn["dso-gb"]; mongocxx::collection collection = db["changestream_data"]; try { double delay = 0.2; int count = 0; auto doc = bsoncxx::builder::basic::document{}; bsoncxx::builder::basic::array system_arr{}; for (int i = 0; i < 1; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i + 1))); if (fault) { subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 7)); } else { subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 1)); } subdoc.append(bsoncxx::builder::basic::kvp("mode_state", 0)); subdoc.append(bsoncxx::builder::basic::kvp("power_state", 0)); subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator))); subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", count)); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3)); system_arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", system_arr)))); bsoncxx::builder::basic::array rmux_arr{}; for (int i = 0; i < 4; ++i) { // Generate two "docs" bsoncxx::builder::basic::document subdoc{}; subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1)); subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i + 1))); subdoc.append(bsoncxx::builder::basic::kvp("temperature", rmux_data[i]["temp"].asFloat())); subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", rmux_data[i]["humidity"].asFloat())); subdoc.append(bsoncxx::builder::basic::kvp("humidity", rmux_data[i]["dew"].asFloat())); subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", rmux_data[i]["leak1"].asInt())); subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", rmux_data[i]["leak2"].asInt())); subdoc.append(bsoncxx::builder::basic::kvp("counter", count)); subdoc.append(bsoncxx::builder::basic::kvp("cbit", rmux_data[i]["cbit"].asInt())); rmux_arr.append(subdoc); } doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", rmux_arr)))); auto result = collection.update_one({}, doc.extract()); std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000))); count++; std::cout << "Inserted at Rmux " << count << std::endl; } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; } } } int main() { std::string userInput; // Prompt the user for input std::cout << "Select type of simulation:\n1.Observe Web Commands\n2.Publish Data\n3.Observe Web Commands + Publish Data\n4.Rmux Emulator\n"; // Read a full line of input from standard input std::cin >> userInput; switch (userInput[0]) { case '1': { std::thread changeStreamThread(changeStreamFunction); // watch changeStreamThread.join(); break; } case '2': { int luNum; int chillerNum; int rmuxNum; int duration; std::cout << "Number of Laser Units: "; std::cin >> luNum; std::cout << "Number of Chillers: "; std::cin >> chillerNum; std::cout << "Number of Rmux: "; std::cin >> rmuxNum; std::cout << "Duration:(0=infinite): "; std::cin >> duration; if (duration == 0) { duration = INT_MAX; } std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1 std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2 std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3 std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4 std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5 std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6 std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7 std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8 seedCardPublishThread.join(); monitorCardPublishThread.join(); psuCardPublishThread.join(); chillerPublishThread.join(); rmuxPublishThread.join(); dioPublishThread.join(); laserUnitPublishThread.join(); systemPublishThread.join(); break; } case '3': { int luNum; int chillerNum; int rmuxNum; int duration; std::cout << "Number of Laser Units: "; std::cin >> luNum; std::cout << "Number of Chillers: "; std::cin >> chillerNum; std::cout << "Number of Rmux: "; std::cin >> rmuxNum; std::cout << "Duration:(0=infinite): "; std::cin >> duration; std::thread changeStreamThread(changeStreamFunction); // watch std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1 std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2 std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3 std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4 std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5 std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6 std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7 std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8 changeStreamThread.join(); seedCardPublishThread.join(); monitorCardPublishThread.join(); psuCardPublishThread.join(); chillerPublishThread.join(); rmuxPublishThread.join(); dioPublishThread.join(); laserUnitPublishThread.join(); systemPublishThread.join(); break; } case '4': { make_request(io_context); return 0; } default: std::cout << "Invalid choice." << std::endl; } return 0; }
Editor is loading...
Leave a Comment