Untitled
unknown
plain_text
2 years ago
16 kB
9
Indexable
#include <iostream>
#include <map>
#include <list>
#include <vector>
#include <string>
#include <chrono>
#include <thread>
#include <memory>
#include <unistd.h>
#include "SimplePocoHandler.h"
#include "amqp-cpp.h"
using namespace std::chrono_literals;
class HeartbitNode final {
public:
int64_t _id;
bool _available;
std::chrono::_V2::system_clock::time_point _start;
int64_t _prev_time = 0;
HeartbitNode(int64_t id,
bool available,
std::chrono::_V2::system_clock::time_point start) : _id(id),
_available(available),
_start(start) {}
};
bool search_for_elem(std::map<int64_t,
std::list<int64_t>> const & m,
int64_t id)
{
for (auto i : m) {
for (auto j : i.second) {
if (j == id) {
return true;
}
}
}
return false;
}
std::string get_main_node(std::map<int64_t,
std::list<int64_t>> m,
int64_t id)
{
for (auto i : m) {
std::string way = "";
for (auto j : i.second) {
way += std::to_string(j) + "-";
if (j == id) {
std::string main_node_id_str = "";
for (char ch : way) {
if (ch == '-') {
return main_node_id_str;
}
main_node_id_str += ch;
}
}
}
}
return "";
}
void get_msg4(std::string const & msg, std::string& command1, std::string& command2, std::string& str) {
int flag = 0;
for (char ch : msg) {
if (ch == ' ' && flag < 2) {
flag += 1;
continue;
}
if (flag == 0) {
command1 += ch;
}
if (flag == 1) {
command2 += ch;
}
if (flag == 2) {
str += ch;
}
}
}
std::string get_last_arg(std::string const & msg) {
std::string res = "";
for (int i = msg.size() - 1; i > 0; --i) {
if (msg[i] == ' ') {
return res;
}
res = msg[i] + res;
}
return res;
}
std::string get_first_arg(std::string const & msg) {
std::string res = "";
for (char ch : msg) {
if (ch == ' ') {
return res;
}
res += ch;
}
return res;
}
void get_command3(std::string const & msg, std::string& command, std::string& arg1) {
int flag = 0;
for (char ch : msg) {
if (ch == ' ') {
flag++;
continue;
}
if (flag == 0) {
command += ch;
}
if (flag == 1) {
arg1 += ch;
}
}
}
void create_computing_node(AMQP::Channel& channel,
std::map<int64_t, std::list<int64_t>>& node_id_map,
std::vector<HeartbitNode>& hb,
int64_t node_id_child,
int64_t node_id_parent = 0)
{
if (!search_for_elem(node_id_map, node_id_parent) && node_id_parent != 0) {
std::cout << "There is not such parrent node" << std::endl;
return;
}
if (search_for_elem(node_id_map, node_id_child)) {
std::cout << "Node with id " << node_id_child << " has been already created" << std::endl;
return;
}
std::string node_id_child_str = std::to_string(node_id_child);
std::string node_id_parent_str = std::to_string(node_id_parent);
std::string out_queue = node_id_child_str;
std::string in_queue = node_id_parent_str;
if (node_id_parent == 0) {
channel.declareQueue(out_queue.c_str());
int pid = fork();
if (pid == 0) {
std::string comp_node = "comp_node";
char *argv[] = {const_cast<char*>(comp_node.c_str()),
const_cast<char*>(comp_node.c_str()),
const_cast<char*>("-1"),
const_cast<char*>(node_id_child_str.c_str()),
const_cast<char*>(node_id_parent_str.c_str()),
NULL};
if (execv(argv[0], argv) == -1) {
std::cout << "Error: exec computing_node" << std::endl;
}
}
node_id_map.insert(std::pair<int64_t, std::list<int64_t>>(node_id_child, {node_id_child}));
hb.push_back(HeartbitNode(node_id_child, true, std::chrono::high_resolution_clock::now()));
} else if (node_id_parent > 0) {
std::string main_node_id_str = get_main_node(node_id_map, node_id_parent);
int64_t main_node_id = stoi(main_node_id_str);
std::string main_out_queue = main_node_id_str;
if (node_id_map[main_node_id].back() == node_id_parent) {
node_id_map[main_node_id].push_back(node_id_child);
hb.push_back(HeartbitNode(node_id_child, true, std::chrono::high_resolution_clock::now()));
std::string msg = "create " + node_id_child_str + " " + node_id_parent_str;
channel.publish("", main_out_queue.c_str(), msg.c_str());
} else {
std::cout << "Error: it is not tail of the list" << std::endl;
}
}
}
void kill_computing_node(AMQP::Channel& channel,
std::map<int64_t, std::list<int64_t>>& node_id_map,
std::vector<HeartbitNode>& hb,
int64_t node_id)
{
if (!search_for_elem(node_id_map, node_id)) {
std::cout << "Node with id " << node_id << " has not been created" << std::endl;
return;
}
std::string main_node_id_str = get_main_node(node_id_map, node_id);
std::string msg = "kill " + std::to_string(node_id);
channel.publish("", main_node_id_str.c_str(), msg.c_str());
auto list_begin = node_id_map[stoi(main_node_id_str)].begin();
auto list_end = node_id_map[stoi(main_node_id_str)].end();
for (auto it = list_begin; it != list_end; ++it) {
if (*it == node_id) {
bool flag = false;
for (auto el : node_id_map[stoi(main_node_id_str)]) {
if (el == node_id) {
flag = true;
}
if (flag) {
for (int i = 0; i < hb.size(); ++i) {
if (hb[i]._id == el) {
hb[i]._available = false;
}
}
}
}
node_id_map[stoi(main_node_id_str)].erase(it);
break;
}
}
// for (size_t i = 0; i < hb.begin(); ++i) {
// if (hb[i]._id == node_id) {
// hb[i]._available = false;
// std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is unavailable now" << std::endl;
// break;
// }
// }
// for (auto it = hb.begin(); it != hb.end(); ++it) {
// if (it->_id == node_id) {
// it->_available = false;
// break;
// }
// }
}
void exec_timer(AMQP::Channel& channel,
std::map<int64_t, std::list<int64_t>>& node_id_map,
int64_t node_id, std::string action)
{
if (!search_for_elem(node_id_map, node_id)) {
std::cout << "Node with id " << node_id << " has not been created" << std::endl;
return;
}
std::string main_node_id_str = get_main_node(node_id_map, node_id);
std::string msg = action + " " + std::to_string(node_id);
channel.publish("", main_node_id_str.c_str(), msg.c_str());
}
void heartbit(AMQP::Channel& channel,
std::map<int64_t, std::list<int64_t>>& node_id_map,
int64_t node_id,
std::vector<HeartbitNode>& hb,
int64_t heartbit_time)
{
std::string main_node_id_str = get_main_node(node_id_map, node_id);
std::string msg = "heartbit " + std::to_string(node_id) + " " + std::to_string(heartbit_time) + " 0";
channel.publish("", main_node_id_str.c_str(), msg.c_str());
}
int main() {
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("control_node", "qwerty"), "/");
AMQP::Channel channel(&connection);
channel.declareQueue("0");
std::map<int64_t, std::list<int64_t>> node_id_map;
std::vector<HeartbitNode> hb_list;
bool hb_started = false;
std::thread th1([&]() {
channel.onReady([&]() {
std::cout << "Control node created" << std::endl;
});
std::this_thread::sleep_for(100ms);
while(true) {
std::cout << "Menu:" << std::endl;
std::string menu_command;
std::cin >> menu_command;
if (menu_command == "create") {
std::string arg1;
std::string arg2;
std::cin >> arg1 >> arg2;
create_computing_node(channel, node_id_map, hb_list, stoi(arg1), stoi(arg2));
} else if (menu_command == "kill") {
std::string arg1;
std::cin >> arg1;
kill_computing_node(channel, node_id_map, hb_list, stoi(arg1));
} else if (menu_command == "exec") {
std::string arg1;
std::string arg2;
std::cin >> arg1 >> arg2;
std::cout << arg1 << " " << arg2 << std::endl;
exec_timer(channel, node_id_map, stoi(arg1), arg2);
} else if (menu_command == "heartbit") {
std::cout << "Ok" << std::endl;
int64_t arg1;
std::cin >> arg1;
hb_started = true;
if (arg1 == -1) {
hb_started = false;
}
// for (auto node : hb_list) {
// heartbit(channel, node_id_map, node._id, hb_list, arg1);
// node._start = std::chrono::high_resolution_clock::now();
// }
for (size_t i = 0; i < hb_list.size(); ++i) {
heartbit(channel, node_id_map, hb_list[i]._id, hb_list, arg1);
hb_list[i]._start = std::chrono::high_resolution_clock::now();
}
} else if (menu_command == "print_nodes") {
std::cout << "map:" << std::endl;
for (auto map_el : node_id_map) {
for (auto list_el : map_el.second) {
std::cout << list_el << ' ';
}
std::cout << std::endl;
}
std::cout << "list:" << std::endl;
for (auto el : hb_list) {
std::cout << el._id << ' ';
}
std::cout << std::endl;
} else if (menu_command == "ping") {
std::string node_id_str;
std::cin >> node_id_str;
int64_t node_id = stoi(node_id_str);
for (auto el : hb_list) {
if (el._id == node_id) {
if (el._available == true) {
std::cout << "Ok: 1" << std::endl;
} else {
std::cout << "Ok: 0" << std::endl;
}
break;
}
}
} else {
std::cout << "(" << menu_command << ")" << std::endl;
std::cout << "Invalid command" << std::endl;
}
}
});
std::thread th2([&]() {
while(true) {
if (!hb_started) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
for (size_t i = 0; i < hb_list.size(); ++i) {
heartbit(channel, node_id_map, hb_list[i]._id, hb_list, -1);
hb_list[i]._start = std::chrono::high_resolution_clock::now();
}
// std::string main_node_id_str = get_main_node(node_id_map, node_id);
// std::string msg = "heartbit " + std::to_string(node_id) + " " + std::to_string(heartbit_time) + " 0";
// channel.publish("", main_node_id_str.c_str(), msg.c_str());
}
}
});
channel.consume("0", AMQP::noack).onReceived(
[&] (const AMQP::Message& message,
uint64_t deliveryTag,
bool redelivered)
{
std::string received_message{message.body()};
received_message.resize(message.bodySize());
std::string command1 = "";
std::string command2 = "";
std::string msg = "";
get_msg4(received_message, command1, command2, msg);
// std::cout << command1 << " " << command2 << "+" << msg << std::endl;
if (command1 == "up") {
if (command2 == "created") {
std::cout << "Received " << msg << std::endl;
}
if (command2 == "killed") {
std::string killed_node_id_str = get_last_arg(msg);
channel.removeQueue(killed_node_id_str);
std::cout << "Received " << msg << std::endl;
}
if (command2 == "exec") {
std::cout << "Received " << msg << std::endl;
}
if (command2 == "heartbit") {
// std::cout << "Received " << msg << std::endl;
std::string node_id_str = "";
std::string heartbit_time = "";
std::string heartbit_timer = "";
// get_command3(msg, node_id_str, heartbit_time);
get_msg4(msg, node_id_str, heartbit_time, heartbit_timer);
bool killed = false;
if (stoi(heartbit_timer) >= 4 * stoi(heartbit_time)) {
killed = true;
}
// std::string node_id_str = get_first_arg(msg);
// std::string heartbit_time = get_second_arg(msg);
if (heartbit_time != "-1") {
for (size_t i = 0; i < hb_list.size(); ++i) {
if (hb_list[i]._id == stoi(node_id_str)) {
auto end = std::chrono::high_resolution_clock::now();
int64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(end - hb_list[i]._start).count();
// std::cout << "time: " << time << std::endl; //Uncomment to see time
if ((time > stoi(heartbit_time) * 4 && hb_list[i]._available == true) || killed) {
std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is unavailable now" << std::endl;
hb_list[i]._start = end;
hb_list[i]._available = false;
break;
}
if (hb_list[i]._available == false) {
hb_list[i]._available = true;
std::cout << "Heartbit: node " + std::to_string(hb_list[i]._id) + " is available now" << std::endl;
break;
}
}
}
heartbit(channel, node_id_map, stoi(node_id_str), hb_list, stoi(heartbit_time));
}
// std::this_thread::sleep_for(std::chrono::milliseconds(stoi(heartbit_time)));
}
}
}
);
handler.loop();
th1.join();
th2.join();
return 0;
}
Editor is loading...
Leave a Comment