Untitled
unknown
plain_text
2 years ago
16 kB
3
Indexable
#include <iostream> #include <map> #include <list> #include <vector> #include <string> #include <chrono> #include <thread> #include <memory> #include <unistd.h> #include "SimplePocoHandler.h" #include "amqp-cpp.h" using namespace std::chrono_literals; class HeartbitNode final { public: int64_t _id; bool _available; std::chrono::_V2::system_clock::time_point _start; int64_t _prev_time = 0; HeartbitNode(int64_t id, bool available, std::chrono::_V2::system_clock::time_point start) : _id(id), _available(available), _start(start) {} }; bool search_for_elem(std::map<int64_t, std::list<int64_t>> const & m, int64_t id) { for (auto i : m) { for (auto j : i.second) { if (j == id) { return true; } } } return false; } std::string get_main_node(std::map<int64_t, std::list<int64_t>> m, int64_t id) { for (auto i : m) { std::string way = ""; for (auto j : i.second) { way += std::to_string(j) + "-"; if (j == id) { std::string main_node_id_str = ""; for (char ch : way) { if (ch == '-') { return main_node_id_str; } main_node_id_str += ch; } } } } return ""; } void get_msg4(std::string const & msg, std::string& command1, std::string& command2, std::string& str) { int flag = 0; for (char ch : msg) { if (ch == ' ' && flag < 2) { flag += 1; continue; } if (flag == 0) { command1 += ch; } if (flag == 1) { command2 += ch; } if (flag == 2) { str += ch; } } } std::string get_last_arg(std::string const & msg) { std::string res = ""; for (int i = msg.size() - 1; i > 0; --i) { if (msg[i] == ' ') { return res; } res = msg[i] + res; } return res; } std::string get_first_arg(std::string const & msg) { std::string res = ""; for (char ch : msg) { if (ch == ' ') { return res; } res += ch; } return res; } void get_command3(std::string const & msg, std::string& command, std::string& arg1) { int flag = 0; for (char ch : msg) { if (ch == ' ') { flag++; continue; } if (flag == 0) { command += ch; } if (flag == 1) { arg1 += ch; } } } void create_computing_node(AMQP::Channel& channel, std::map<int64_t, std::list<int64_t>>& node_id_map, std::vector<HeartbitNode>& hb, int64_t node_id_child, int64_t node_id_parent = 0) { if (!search_for_elem(node_id_map, node_id_parent) && node_id_parent != 0) { std::cout << "There is not such parrent node" << std::endl; return; } if (search_for_elem(node_id_map, node_id_child)) { std::cout << "Node with id " << node_id_child << " has been already created" << std::endl; return; } std::string node_id_child_str = std::to_string(node_id_child); std::string node_id_parent_str = std::to_string(node_id_parent); std::string out_queue = node_id_child_str; std::string in_queue = node_id_parent_str; if (node_id_parent == 0) { channel.declareQueue(out_queue.c_str()); int pid = fork(); if (pid == 0) { std::string comp_node = "comp_node"; char *argv[] = {const_cast<char*>(comp_node.c_str()), const_cast<char*>(comp_node.c_str()), const_cast<char*>("-1"), const_cast<char*>(node_id_child_str.c_str()), const_cast<char*>(node_id_parent_str.c_str()), NULL}; if (execv(argv[0], argv) == -1) { std::cout << "Error: exec computing_node" << std::endl; } } node_id_map.insert(std::pair<int64_t, std::list<int64_t>>(node_id_child, {node_id_child})); hb.push_back(HeartbitNode(node_id_child, true, std::chrono::high_resolution_clock::now())); } else if (node_id_parent > 0) { std::string main_node_id_str = get_main_node(node_id_map, node_id_parent); int64_t main_node_id = stoi(main_node_id_str); std::string main_out_queue = main_node_id_str; if (node_id_map[main_node_id].back() == node_id_parent) { node_id_map[main_node_id].push_back(node_id_child); hb.push_back(HeartbitNode(node_id_child, true, std::chrono::high_resolution_clock::now())); std::string msg = "create " + node_id_child_str + " " + node_id_parent_str; channel.publish("", main_out_queue.c_str(), msg.c_str()); } else { std::cout << "Error: it is not tail of the list" << std::endl; } } } void kill_computing_node(AMQP::Channel& channel, std::map<int64_t, std::list<int64_t>>& node_id_map, std::vector<HeartbitNode>& hb, int64_t node_id) { if (!search_for_elem(node_id_map, node_id)) { std::cout << "Node with id " << node_id << " has not been created" << std::endl; return; } std::string main_node_id_str = get_main_node(node_id_map, node_id); std::string msg = "kill " + std::to_string(node_id); channel.publish("", main_node_id_str.c_str(), msg.c_str()); auto list_begin = node_id_map[stoi(main_node_id_str)].begin(); auto list_end = node_id_map[stoi(main_node_id_str)].end(); for (auto it = list_begin; it != list_end; ++it) { if (*it == node_id) { bool flag = false; for (auto el : node_id_map[stoi(main_node_id_str)]) { if (el == node_id) { flag = true; } if (flag) { for (int i = 0; i < hb.size(); ++i) { if (hb[i]._id == el) { hb[i]._available = false; } } } } node_id_map[stoi(main_node_id_str)].erase(it); break; } } // for (size_t i = 0; i < hb.begin(); ++i) { // if (hb[i]._id == node_id) { // hb[i]._available = false; // std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is unavailable now" << std::endl; // break; // } // } // for (auto it = hb.begin(); it != hb.end(); ++it) { // if (it->_id == node_id) { // it->_available = false; // break; // } // } } void exec_timer(AMQP::Channel& channel, std::map<int64_t, std::list<int64_t>>& node_id_map, int64_t node_id, std::string action) { if (!search_for_elem(node_id_map, node_id)) { std::cout << "Node with id " << node_id << " has not been created" << std::endl; return; } std::string main_node_id_str = get_main_node(node_id_map, node_id); std::string msg = action + " " + std::to_string(node_id); channel.publish("", main_node_id_str.c_str(), msg.c_str()); } void heartbit(AMQP::Channel& channel, std::map<int64_t, std::list<int64_t>>& node_id_map, int64_t node_id, std::vector<HeartbitNode>& hb, int64_t heartbit_time) { std::string main_node_id_str = get_main_node(node_id_map, node_id); std::string msg = "heartbit " + std::to_string(node_id) + " " + std::to_string(heartbit_time) + " 0"; channel.publish("", main_node_id_str.c_str(), msg.c_str()); } int main() { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("control_node", "qwerty"), "/"); AMQP::Channel channel(&connection); channel.declareQueue("0"); std::map<int64_t, std::list<int64_t>> node_id_map; std::vector<HeartbitNode> hb_list; bool hb_started = false; std::thread th1([&]() { channel.onReady([&]() { std::cout << "Control node created" << std::endl; }); std::this_thread::sleep_for(100ms); while(true) { std::cout << "Menu:" << std::endl; std::string menu_command; std::cin >> menu_command; if (menu_command == "create") { std::string arg1; std::string arg2; std::cin >> arg1 >> arg2; create_computing_node(channel, node_id_map, hb_list, stoi(arg1), stoi(arg2)); } else if (menu_command == "kill") { std::string arg1; std::cin >> arg1; kill_computing_node(channel, node_id_map, hb_list, stoi(arg1)); } else if (menu_command == "exec") { std::string arg1; std::string arg2; std::cin >> arg1 >> arg2; std::cout << arg1 << " " << arg2 << std::endl; exec_timer(channel, node_id_map, stoi(arg1), arg2); } else if (menu_command == "heartbit") { std::cout << "Ok" << std::endl; int64_t arg1; std::cin >> arg1; hb_started = true; if (arg1 == -1) { hb_started = false; } // for (auto node : hb_list) { // heartbit(channel, node_id_map, node._id, hb_list, arg1); // node._start = std::chrono::high_resolution_clock::now(); // } for (size_t i = 0; i < hb_list.size(); ++i) { heartbit(channel, node_id_map, hb_list[i]._id, hb_list, arg1); hb_list[i]._start = std::chrono::high_resolution_clock::now(); } } else if (menu_command == "print_nodes") { std::cout << "map:" << std::endl; for (auto map_el : node_id_map) { for (auto list_el : map_el.second) { std::cout << list_el << ' '; } std::cout << std::endl; } std::cout << "list:" << std::endl; for (auto el : hb_list) { std::cout << el._id << ' '; } std::cout << std::endl; } else if (menu_command == "ping") { std::string node_id_str; std::cin >> node_id_str; int64_t node_id = stoi(node_id_str); for (auto el : hb_list) { if (el._id == node_id) { if (el._available == true) { std::cout << "Ok: 1" << std::endl; } else { std::cout << "Ok: 0" << std::endl; } break; } } } else { std::cout << "(" << menu_command << ")" << std::endl; std::cout << "Invalid command" << std::endl; } } }); std::thread th2([&]() { while(true) { if (!hb_started) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); for (size_t i = 0; i < hb_list.size(); ++i) { heartbit(channel, node_id_map, hb_list[i]._id, hb_list, -1); hb_list[i]._start = std::chrono::high_resolution_clock::now(); } // std::string main_node_id_str = get_main_node(node_id_map, node_id); // std::string msg = "heartbit " + std::to_string(node_id) + " " + std::to_string(heartbit_time) + " 0"; // channel.publish("", main_node_id_str.c_str(), msg.c_str()); } } }); channel.consume("0", AMQP::noack).onReceived( [&] (const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) { std::string received_message{message.body()}; received_message.resize(message.bodySize()); std::string command1 = ""; std::string command2 = ""; std::string msg = ""; get_msg4(received_message, command1, command2, msg); // std::cout << command1 << " " << command2 << "+" << msg << std::endl; if (command1 == "up") { if (command2 == "created") { std::cout << "Received " << msg << std::endl; } if (command2 == "killed") { std::string killed_node_id_str = get_last_arg(msg); channel.removeQueue(killed_node_id_str); std::cout << "Received " << msg << std::endl; } if (command2 == "exec") { std::cout << "Received " << msg << std::endl; } if (command2 == "heartbit") { // std::cout << "Received " << msg << std::endl; std::string node_id_str = ""; std::string heartbit_time = ""; std::string heartbit_timer = ""; // get_command3(msg, node_id_str, heartbit_time); get_msg4(msg, node_id_str, heartbit_time, heartbit_timer); bool killed = false; if (stoi(heartbit_timer) >= 4 * stoi(heartbit_time)) { killed = true; } // std::string node_id_str = get_first_arg(msg); // std::string heartbit_time = get_second_arg(msg); if (heartbit_time != "-1") { for (size_t i = 0; i < hb_list.size(); ++i) { if (hb_list[i]._id == stoi(node_id_str)) { auto end = std::chrono::high_resolution_clock::now(); int64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(end - hb_list[i]._start).count(); // std::cout << "time: " << time << std::endl; //Uncomment to see time if ((time > stoi(heartbit_time) * 4 && hb_list[i]._available == true) || killed) { std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is unavailable now" << std::endl; hb_list[i]._start = end; hb_list[i]._available = false; break; } if (hb_list[i]._available == false) { hb_list[i]._available = true; std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is available now" << std::endl; break; } } } heartbit(channel, node_id_map, stoi(node_id_str), hb_list, stoi(heartbit_time)); } // std::this_thread::sleep_for(std::chrono::milliseconds(stoi(heartbit_time))); } } } ); handler.loop(); th1.join(); th2.join(); return 0; }
Editor is loading...
Leave a Comment