Untitled
unknown
plain_text
2 years ago
43 kB
9
Indexable
#include <iostream>
#include <chrono>
#include <thread>
#include <ctime>
#include <random>
#include <string>
#include <climits>
#include <mongocxx/instance.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/stdx.hpp>
#include <mongocxx/uri.hpp>
#include <bsoncxx/types.hpp>
#include <bsoncxx/json.hpp>
#include <bsoncxx/types.hpp>
#include <bsoncxx/document/view.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/basic/array.hpp>
#include <bsoncxx/builder/basic/kvp.hpp>
#include <mongocxx/collection.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <json/json.h>
namespace beast = boost::beast;
namespace http = beast::http;
using tcp = boost::asio::ip::tcp;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::make_document;
mongocxx::instance inst{};
std::default_random_engine generator;
std::uniform_real_distribution<double> distribution(0.0, 1.0);
std::uniform_int_distribution<int> intDistribution(0, 9);
std::uniform_int_distribution<int> stateDistribution(0, 2);
boost::asio::io_context io_context;
boost::asio::steady_timer timer(io_context, boost::asio::chrono::seconds(1));
std::function<void(const boost::system::error_code&)> periodic_function;
void changeStreamFunction() {
try
{
// Create an instance.
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["webmsg"];
// Ping the database.
const auto ping_cmd = bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("ping", 1));
db.run_command(ping_cmd.view());
std::cout << "Pinged your deployment. You successfully connected to MongoDB!" << std::endl;
while (true) // Loop forever
{
try
{
mongocxx::options::change_stream options;
options.full_document("updateLookup");
const std::chrono::milliseconds await_time{1000};
options.max_await_time(await_time);
mongocxx::change_stream streamChanges = collection.watch(options);
for (const bsoncxx::document::view& event : streamChanges)
{
bsoncxx::document::element fullDocumentElement = event["fullDocument"];
if (fullDocumentElement) {
bsoncxx::document::view fullDocumentView = fullDocumentElement.get_document();
std::string fullDocumentJson = bsoncxx::to_json(fullDocumentView);
std::cout << "Full Document: " << fullDocumentJson << std::endl;
}
else {
std::cout << "Invalid" << std::endl;
}
}
}
catch (const std::exception& e)
{
std::cerr << "MongoDB watcher caught exception: " << e.what() << std::endl;
}
}
}
catch (const std::exception& e)
{
// Handle errors.
std::cout << "Exception: " << e.what() << std::endl;
}
}
void rmuxPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["rmux"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i)));
subdoc.append(bsoncxx::builder::basic::kvp("temperature", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("humidity", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", intDistribution(generator) % 2));
subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", intDistribution(generator) % 2));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", arr))));
//std::string docJson = bsoncxx::to_json(doc);
//std::cout << "BSON Document: " << docJson << std::endl;
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Rmux " << count << std::endl;
//std::cout << "Elapsed time: " << elapsedTime.count() << " milliseconds" << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void chillerPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["chiller"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "chiller_0" + std::to_string(i)));
subdoc.append(bsoncxx::builder::basic::kvp("alarm_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("alarm_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("pump_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("pump_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("compressor", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("chiller_state", intDistribution(generator) % 3));
subdoc.append(bsoncxx::builder::basic::kvp("temperature_1", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("temperature_2", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("volume", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("chiller", arr))));
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Chiller " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void dioPublish(int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["dio"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= 1; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "dio_0" + std::to_string(i)));
subdoc.append(bsoncxx::builder::basic::kvp("estop_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("estop_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("emon", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("emhigh", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("reset", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("door_interlock_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("flow_interlock_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("safety_interlock_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("ext_estop_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("ups_off", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("dio", arr))));
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Dio " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void laserUnitPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["laserUnit"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
std::string desc;
if (i < 10) {
desc = "laserUnit_0" + std::to_string(i);
}
else {
desc = "laserUnit_" + std::to_string(i);
}
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
subdoc.append(bsoncxx::builder::basic::kvp("power", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("current_usage_s", intDistribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("current_usage_accum_s", intDistribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("lu_state", intDistribution(generator) % 10));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("laserUnit", arr))));
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at laserUnit " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void monitorCardPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["monitorCard"];
try {
double delay = 0.05;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto doc = bsoncxx::builder::basic::document{};
auto graphDoc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
std::string desc;
if (i < 10) {
desc = "monitorCard_0" + std::to_string(i);
}
else {
desc = "monitorCard_" + std::to_string(i);
}
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
subdoc.append(bsoncxx::builder::basic::kvp("is_seed_on", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("ld_temp", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("cmb_temp", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("cps_temp", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("pd1", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("pd2", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("pd3", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("pd4", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("mon_state", intDistribution(generator) % 8));
subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_1", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("psu_interlock_2", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("monitorCard", arr))));
graphDoc.append(bsoncxx::builder::basic::kvp("monitorCard", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
auto result = collection.update_one({}, doc.extract());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at MonitorCard " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void seedPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["seed"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"f
bsoncxx::builder::basic::document subdoc{};
std::string desc;
if (i < 10) {
desc = "seed_0" + std::to_string(i);
}
else {
desc = "seed_" + std::to_string(i);
}
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
subdoc.append(bsoncxx::builder::basic::kvp("laser_status", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("laser_voltage", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("tec_current", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("tec_voltage", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("ther_res", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("pd_current", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("ain1", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("ain2", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("vtmax", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("vtmin", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("rtset", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("rtmin", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("rtmax", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("ilaser", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("ain", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("seed", arr))));
//std::string docJson = bsoncxx::to_json(doc);
//std::cout << "BSON Document: " << docJson << std::endl;
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Seed " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void psuPublish(int numUnit, int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["psu"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= numUnit; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
std::string desc;
if (i < 10) {
desc = "psu_0" + std::to_string(i);
}
else {
desc = "psu_" + std::to_string(i);
}
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", desc));
subdoc.append(bsoncxx::builder::basic::kvp("voltage_oper", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("voltage_lim", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("current_lim", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("current_oper", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("current_meas", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("voltage_meas", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("psu_state", intDistribution(generator) % 4));
subdoc.append(bsoncxx::builder::basic::kvp("outp", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("is_connected", bool(intDistribution(generator))));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("psu", arr))));
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Psu " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void systemPublish(int duration) {
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
mongocxx::collection graphCollection = db["system"];
try {
double delay = 0.5;
int count = 0;
auto startTime = std::chrono::steady_clock::now();
while (true) {
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(currentTime - startTime);
auto graphDoc = bsoncxx::builder::basic::document{};
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array arr{};
for (int i = 1; i <= 1; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i)));
subdoc.append(bsoncxx::builder::basic::kvp("sys_state", intDistribution(generator) % 8));
subdoc.append(bsoncxx::builder::basic::kvp("mode_state", intDistribution(generator) % 4));
subdoc.append(bsoncxx::builder::basic::kvp("power_state", intDistribution(generator) % 6));
subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", intDistribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
auto timestamp = std::chrono::system_clock::now();
subdoc.append(bsoncxx::builder::basic::kvp("timestamp", bsoncxx::types::b_date(timestamp)));
arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", arr))));
auto result = collection.update_one({}, doc.extract());
graphDoc.append(bsoncxx::builder::basic::kvp("rmux", arr));
auto graphResult = graphCollection.insert_one(graphDoc.view());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at System " << count << std::endl;
if (elapsedTime.count() >= duration) {
break; // Exit the loop after 10 seconds
}
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void make_request(boost::asio::io_context& io_context) {
// Create a resolver and connect to the server
tcp::resolver resolver(io_context);
tcp::socket socket(io_context);
beast::error_code ec;
std::vector<std::string> ports = { "4000", "4001", "4002", "4003" };
while (true) {
std::vector<Json::Value> rmux_data;
bool fault = false;
for (const std::string& port : ports) {
auto const results = resolver.resolve("localhost", port, ec);
if (ec) {
std::cerr << "Error resolving host: " << ec.message() << std::endl;
return;
}
boost::asio::connect(socket, results.begin(), results.end(), ec);
if (ec) {
std::cerr << "Error connecting to host: " << ec.message() << std::endl;
return;
}
// Set up the HTTP request
http::request<http::string_body> req{http::verb::get, "/appAll.json", 11};
req.set(http::field::host, "localhost");
req.set(http::field::user_agent, "Boost.Beast GET");
// Send the HTTP request
http::write(socket, req, ec);
if (ec) {
std::cerr << "Error sending HTTP request: " << ec.message() << std::endl;
return;
}
// Receive and print the HTTP response
beast::flat_buffer buffer;
http::response<http::string_body> res;
http::read(socket, buffer, res, ec);
if (ec) {
std::cerr << "Error receiving HTTP response: " << ec.message() << std::endl;
return;
}
try {
std::string responseBody = res.body();
//std::cout << "Body: " << responseBody << std::endl;
Json::Value root; // Create a JSON Value object
Json::Reader reader; // Create a JSON Reader
if (reader.parse(responseBody, root)) {
// Access JSON values
Json::Value responseJson;
responseJson["temp"] = std::stof(root["data"]["all"][2]["isens"][0]["val"].asString());
responseJson["humidity"] = std::stof(root["data"]["all"][2]["isens"][1]["val"].asString());
responseJson["dew"] = std::stof(root["data"]["all"][2]["isens"][2]["val"].asString());
responseJson["leak1"] = std::stoi(root["data"]["all"][2]["isens"][3]["val"].asString());
responseJson["leak2"] = std::stoi(root["data"]["all"][2]["isens"][4]["val"].asString());
responseJson["cbit"] = std::stoi(root["data"]["all"][2]["isens"][5]["val"].asString());
//std::cout << "Port: " << port << std::endl;
//std::cout << "Temp: " << responseJson["temp"] << std::endl;
//std::cout << "Humidity: " << responseJson["humidity"] << std::endl;
//std::cout << "Dew: " << responseJson["dew"] << std::endl;
rmux_data.push_back(responseJson);
if (responseJson["temp"].asFloat() > 50 || responseJson["humidity"].asFloat() > 50 || responseJson["dew"].asFloat() > 50) {
fault = true;
}
}
else {
std::cerr << "Failed to parse JSON data" << std::endl;
}
}
catch (const std::exception& e) {
std::cerr << "Error parsing data: " << e.what() << std::endl;
}
}
for (size_t i = 0; i < rmux_data.size(); ++i) {
std::cout << "Data from port " << ports[i] << ":" << std::endl;
std::cout << "Temperature: " << rmux_data[i]["temp"].asFloat() << std::endl;
std::cout << "Humidity: " << rmux_data[i]["humidity"].asFloat() << std::endl;
std::cout << "Dew: " << rmux_data[i]["dew"].asFloat() << std::endl;
std::cout << "Leak1: " << rmux_data[i]["leak1"].asInt() << std::endl;
std::cout << "Leak2: " << rmux_data[i]["leak2"].asInt() << std::endl;
std::cout << "CBIT: " << rmux_data[i]["cbit"].asInt() << std::endl;
std::cout << std::endl;
}
try {
// Replace the connection string with your MongoDB deployment's connection string.
const auto uri = mongocxx::uri{ "mongodb://gene:gene@127.0.0.1:30000/dso-gb?authSource=dso-gb&replicaSet=rs0" };
// Set the version of the Stable API on the client.
mongocxx::options::client client_options;
const auto api = mongocxx::options::server_api{ mongocxx::options::server_api::version::k_version_1 };
client_options.server_api_opts(api);
// Setup the connection and get a handle on the "admin" database.
mongocxx::client conn{ uri, client_options };
mongocxx::database db = conn["dso-gb"];
mongocxx::collection collection = db["changestream_data"];
try {
double delay = 0.2;
int count = 0;
auto doc = bsoncxx::builder::basic::document{};
bsoncxx::builder::basic::array system_arr{};
for (int i = 0; i < 1; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "system_0" + std::to_string(i + 1)));
if (fault) {
subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 7));
}
else {
subdoc.append(bsoncxx::builder::basic::kvp("sys_state", 1));
}
subdoc.append(bsoncxx::builder::basic::kvp("mode_state", 0));
subdoc.append(bsoncxx::builder::basic::kvp("power_state", 0));
subdoc.append(bsoncxx::builder::basic::kvp("laser_power", distribution(generator)));
subdoc.append(bsoncxx::builder::basic::kvp("laser_on_s", count));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", intDistribution(generator) % 3));
system_arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("system", system_arr))));
bsoncxx::builder::basic::array rmux_arr{};
for (int i = 0; i < 4; ++i) { // Generate two "docs"
bsoncxx::builder::basic::document subdoc{};
subdoc.append(bsoncxx::builder::basic::kvp("idx", i + 1));
subdoc.append(bsoncxx::builder::basic::kvp("desc", "rmux_0" + std::to_string(i + 1)));
subdoc.append(bsoncxx::builder::basic::kvp("temperature", rmux_data[i]["temp"].asFloat()));
subdoc.append(bsoncxx::builder::basic::kvp("dewpoint", rmux_data[i]["humidity"].asFloat()));
subdoc.append(bsoncxx::builder::basic::kvp("humidity", rmux_data[i]["dew"].asFloat()));
subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor1_status", rmux_data[i]["leak1"].asInt()));
subdoc.append(bsoncxx::builder::basic::kvp("leak_sensor2_status", rmux_data[i]["leak2"].asInt()));
subdoc.append(bsoncxx::builder::basic::kvp("counter", count));
subdoc.append(bsoncxx::builder::basic::kvp("cbit", rmux_data[i]["cbit"].asInt()));
rmux_arr.append(subdoc);
}
doc.append(bsoncxx::builder::basic::kvp("$set", make_document(kvp("rmux", rmux_arr))));
auto result = collection.update_one({}, doc.extract());
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(delay * 1000)));
count++;
std::cout << "Inserted at Rmux " << count << std::endl;
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
}
int main()
{
std::string userInput;
// Prompt the user for input
std::cout << "Select type of simulation:\n1.Observe Web Commands\n2.Publish Data\n3.Observe Web Commands + Publish Data\n4.Rmux Emulator\n";
// Read a full line of input from standard input
std::cin >> userInput;
switch (userInput[0]) {
case '1':
{
std::thread changeStreamThread(changeStreamFunction); // watch
changeStreamThread.join();
break;
}
case '2':
{
int luNum;
int chillerNum;
int rmuxNum;
int duration;
std::cout << "Number of Laser Units: ";
std::cin >> luNum;
std::cout << "Number of Chillers: ";
std::cin >> chillerNum;
std::cout << "Number of Rmux: ";
std::cin >> rmuxNum;
std::cout << "Duration:(0=infinite): ";
std::cin >> duration;
if (duration == 0) {
duration = INT_MAX;
}
std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1
std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2
std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3
std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4
std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5
std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6
std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7
std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8
seedCardPublishThread.join();
monitorCardPublishThread.join();
psuCardPublishThread.join();
chillerPublishThread.join();
rmuxPublishThread.join();
dioPublishThread.join();
laserUnitPublishThread.join();
systemPublishThread.join();
break;
}
case '3':
{
int luNum;
int chillerNum;
int rmuxNum;
int duration;
std::cout << "Number of Laser Units: ";
std::cin >> luNum;
std::cout << "Number of Chillers: ";
std::cin >> chillerNum;
std::cout << "Number of Rmux: ";
std::cin >> rmuxNum;
std::cout << "Duration:(0=infinite): ";
std::cin >> duration;
std::thread changeStreamThread(changeStreamFunction); // watch
std::thread seedCardPublishThread(std::bind(seedPublish, luNum, duration)); // 0.1
std::thread monitorCardPublishThread(std::bind(monitorCardPublish, luNum, duration)); // 0.2
std::thread psuCardPublishThread(std::bind(psuPublish, luNum, duration)); // 0.3
std::thread chillerPublishThread(std::bind(chillerPublish, chillerNum, duration)); // 0.4
std::thread rmuxPublishThread(std::bind(rmuxPublish, rmuxNum, duration)); // 0.5
std::thread dioPublishThread(std::bind(dioPublish, duration)); // 0.6
std::thread laserUnitPublishThread(std::bind(laserUnitPublish, luNum, duration)); // 0.7
std::thread systemPublishThread(std::bind(systemPublish, duration)); // 0.8
changeStreamThread.join();
seedCardPublishThread.join();
monitorCardPublishThread.join();
psuCardPublishThread.join();
chillerPublishThread.join();
rmuxPublishThread.join();
dioPublishThread.join();
laserUnitPublishThread.join();
systemPublishThread.join();
break;
}
case '4':
{
make_request(io_context);
return 0;
}
default:
std::cout << "Invalid choice." << std::endl;
}
return 0;
}Editor is loading...
Leave a Comment