48 #include <snapwebsites/log.h> 49 #include <snapwebsites/qstring_stream.h> 50 #include <snapwebsites/dbutils.h> 51 #include <snapwebsites/process.h> 52 #include <snapwebsites/snap_string_list.h> 57 #include <snapdev/tokenize_string.h> 62 #include <advgetopt/advgetopt.h> 74 #include <openssl/rand.h> 79 #include <snapdev/poison.h> 130 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_REQUIRED | advgetopt::GETOPT_FLAG_SHOW_USAGE_ON_ERROR,
133 "Path to snaplock and other configuration files.",
138 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
141 "Start the snaplock daemon in debug mode.",
146 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
147 "debug-lock-messages",
149 "Log all the lock messages received by snaplock.",
154 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
157 "List existing tickets and exits.",
162 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_REQUIRED,
165 "Full path to the snaplock logfile.",
170 advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
173 "Only output to the console, not a log file.",
178 advgetopt::GETOPT_FLAG_END,
190 #pragma GCC diagnostic push 191 #pragma GCC diagnostic ignored "-Wpedantic" 194 .f_project_name =
"snapwebsites",
196 .f_options_files_directory =
nullptr,
197 .f_environment_variable_name =
"SNAPLOCK_OPTIONS",
198 .f_configuration_files =
nullptr,
199 .f_configuration_filename =
nullptr,
200 .f_configuration_directories =
nullptr,
201 .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
202 .f_help_header =
"Usage: %p [-<opt>]\n" 203 "where -<opt> is one or more of:",
204 .f_help_footer =
"%c",
206 .f_license =
"GNU GPL v2",
207 .f_copyright =
"Copyright (c) 2013-" 208 BOOST_PP_STRINGIZE(UTC_BUILD_YEAR)
209 " by Made to Order Software Corporation -- All Rights Reserved",
213 #pragma GCC diagnostic pop 227 snap::dispatcher<snaplock>::dispatcher_match::vector_t
const snaplock::g_snaplock_service_messages =
231 , &snaplock::msg_absolutely
235 , &snaplock::msg_activate_lock
239 , &snaplock::msg_add_ticket
243 , &snaplock::msg_cluster_up
247 , &snaplock::msg_cluster_down
251 , &snaplock::msg_server_gone
255 , &snaplock::msg_drop_ticket
259 , &snaplock::msg_get_max_ticket
263 , &snaplock::msg_server_gone
267 , &snaplock::msg_lock
271 , &snaplock::msg_lock_activated
275 , &snaplock::msg_lock_entered
279 , &snaplock::msg_lock_entering
283 , &snaplock::msg_lock_exiting
287 , &snaplock::msg_lock_failed
291 , &snaplock::msg_lock_leaders
295 , &snaplock::msg_lock_started
299 , &snaplock::msg_lock_status
303 , &snaplock::msg_lock_tickets
307 , &snaplock::msg_list_tickets
311 , &snaplock::msg_max_ticket
315 , &snaplock::msg_status
319 , &snaplock::msg_ticket_added
323 , &snaplock::msg_ticket_ready
327 , &snaplock::msg_unlock
344 snaplock::computer_t::computer_t(QString
const & name, uint8_t priority)
346 , f_priority(priority)
350 RAND_bytes(reinterpret_cast<unsigned char *>(&f_random_id),
sizeof(f_random_id));
352 snap::snap_config config(
"snapcommunicator");
353 f_ip_address = config[
"listen"];
357 bool snaplock::computer_t::is_self()
const 363 void snaplock::computer_t::set_connected(
bool connected)
365 f_connected = connected;
369 bool snaplock::computer_t::get_connected()
const 375 bool snaplock::computer_t::set_id(QString
const &
id)
377 if(f_priority != PRIORITY_UNDEFINED)
382 snap::snap_string_list parts(
id.split(
'|'));
383 if(parts.size() != 5)
388 SNAP_LOG_ERROR(
"received a computer id which does not have exactly 5 parts.");
396 f_priority = parts[0].toLong(&ok, 10);
398 || f_priority < PRIORITY_USER_MIN
399 || f_priority > PRIORITY_MAX)
401 SNAP_LOG_ERROR(
"priority is limited to a number between 0 and 15 inclusive.");
405 f_random_id = parts[1].toULong(&ok, 10);
407 f_ip_address = parts[2];
408 if(f_ip_address.isEmpty())
410 SNAP_LOG_ERROR(
"the process IP cannot be an empty string.");
414 f_pid = parts[3].toLong(&ok, 10);
415 if(!ok || f_pid < 1 || f_pid > snap::process::get_pid_max())
417 SNAP_LOG_ERROR(
"a process identifier is 15 bits so ")(f_pid)(
" does not look valid (0 is also not accepted).");
424 SNAP_LOG_ERROR(
"the server name in the lockid cannot be empty.");
440 void snaplock::computer_t::set_start_time(time_t start_time)
442 f_start_time = start_time;
446 time_t snaplock::computer_t::get_start_time()
const 452 QString
const & snaplock::computer_t::get_name()
const 458 QString
const & snaplock::computer_t::get_id()
const 462 if(f_priority == PRIORITY_UNDEFINED)
466 if(f_ip_address.isEmpty())
475 f_id = QString(
"%1|%2|%3|%4|%5")
476 .arg(f_priority, 2, 10, QChar(
'0'))
487 QString
const & snaplock::computer_t::get_ip_address()
const 597 : dispatcher(this, g_snaplock_service_messages)
599 , f_config(
"snaplock")
601 add_snap_communicator_commands();
605 if(f_opt.is_defined(
"config"))
607 f_config.set_configuration_path(f_opt.get_string(
"config"));
611 f_debug = f_opt.is_defined(
"debug");
614 f_debug_lock_messages = f_opt.is_defined(
"debug-lock-messages")
615 || f_config.has_parameter(
"debug_lock_messages");
619 if(f_debug_lock_messages)
630 f_server_name = QString::fromUtf8(snap::server::get_server_name().c_str());
635 if(f_config.has_parameter(
"server_name"))
637 f_server_name = f_config[
"server_name"];
643 tcp_client_server::get_addr_port(QString::fromUtf8(f_config(
"snapcommunicator",
"local_listen").c_str()), f_communicator_addr, f_communicator_port,
"tcp");
647 if(f_opt.is_defined(
"nolog"))
649 snap::logging::configure_console();
651 else if(f_opt.is_defined(
"logfile"))
653 snap::logging::configure_logfile(QString::fromUtf8(f_opt.get_string(
"logfile").c_str()));
657 if(f_config.has_parameter(
"log_config"))
661 f_log_conf = f_config[
"log_config"];
663 snap::logging::configure_conffile(f_log_conf);
671 snap::logging::reduce_log_output_level( snap::logging::log_level_t::LOG_LEVEL_DEBUG );
679 if(f_config.has_parameter(
"service_name"))
681 f_service_name = f_config[
"service_name"];
685 int64_t priority = computer_t::PRIORITY_DEFAULT;
686 if(f_opt.is_defined(
"candidate-priority"))
688 std::string
const candidate_priority(f_opt.get_string(
"candidate-priority"));
689 if(candidate_priority ==
"off")
691 priority = computer_t::PRIORITY_OFF;
695 priority = f_opt.get_long(
"candidate-priority" 697 , computer_t::PRIORITY_USER_MIN
698 , computer_t::PRIORITY_MAX);
701 else if(f_config.has_parameter(
"candidate_priority"))
703 QString
const candidate_priority(f_config[
"candidate_priority"]);
704 if(candidate_priority ==
"off")
710 priority = computer_t::PRIORITY_OFF;
715 priority = candidate_priority.toLong(&ok, 10);
718 SNAP_LOG_FATAL(
"invalid candidate_priority, a valid decimal number was expected instead of \"")(candidate_priority)(
"\".");
721 if(priority < computer_t::PRIORITY_USER_MIN
722 || priority > computer_t::PRIORITY_MAX)
724 SNAP_LOG_FATAL(
"candidate_priority must be between 1 and 15, \"")(candidate_priority)(
"\" is not valid.");
732 if(f_opt.is_defined(
"--"))
734 SNAP_LOG_FATAL(
"unexpected parameters found on snaplock daemon command line.");
735 std::cerr <<
"error: unexpected parameter found on snaplock daemon command line." << std::endl;
736 std::cerr << f_opt.usage(advgetopt::GETOPT_FLAG_SHOW_USAGE_ON_ERROR);
741 f_start_time = time(
nullptr);
749 f_computers[f_server_name] = std::make_shared<computer_t>(f_server_name, priority);
750 f_computers[f_server_name]->set_start_time(f_start_time);
751 f_computers[f_server_name]->set_connected(
true);
752 f_my_id = f_computers[f_server_name]->get_id();
753 f_my_ip_address = f_computers[f_server_name]->get_ip_address();
762 snaplock::~snaplock()
777 signal( SIGSEGV, snaplock::sighandler );
778 signal( SIGBUS, snaplock::sighandler );
779 signal( SIGFPE, snaplock::sighandler );
780 signal( SIGILL, snaplock::sighandler );
781 signal( SIGTERM, snaplock::sighandler );
782 signal( SIGINT, snaplock::sighandler );
783 signal( SIGQUIT, snaplock::sighandler );
787 signal( SIGPIPE, snaplock::sigloghandler );
791 signal( SIGTSTP, SIG_IGN );
792 signal( SIGTTIN, SIG_IGN );
793 signal( SIGTTOU, SIG_IGN );
797 f_communicator = snap::snap_communicator::instance();
802 f_communicator->add_connection(f_interrupt);
807 f_communicator->add_connection(f_timer);
812 f_communicator->add_connection(f_info);
817 f_communicator->add_connection(f_debug_info);
822 if(f_opt.is_defined(
"list"))
824 snap::logging::set_log_output_level(snap::logging::log_level_t::LOG_LEVEL_ERROR);
830 f_service_name =
"snaplocktool";
831 f_messenger.reset(
new snaplock_tool(
this, f_communicator_addr.toUtf8().data(), f_communicator_port));
835 SNAP_LOG_INFO(
"--------------------------------- snaplock started.");
837 f_messenger.reset(
new snaplock_messenger(
this, f_communicator_addr.toUtf8().data(), f_communicator_port));
838 f_messenger->set_dispatcher(shared_from_this());
840 f_communicator->add_connection(f_messenger);
844 f_communicator->run();
862 void snaplock::sighandler(
int sig)
865 bool show_stack(
true);
907 snap::snap_exception_base::output_stack_trace();
910 SNAP_LOG_FATAL(
"Fatal signal caught: ")(signame);
919 void snaplock::sigloghandler(
int sig)
935 SNAP_LOG_WARNING(
"POSIX signal caught: ")(signame);
958 bool snaplock::send_message(snap::snap_communicator_message
const & message,
bool cache)
960 return f_messenger->send_message(message, cache);
973 int snaplock::get_computer_count()
const 975 return f_computers.size();
1003 int snaplock::quorum()
const 1005 return f_computers.size() / 2 + 1;
1019 QString
const & snaplock::get_server_name()
const 1021 return f_server_name;
1036 bool snaplock::is_ready()
const 1040 if(f_leaders.empty())
1042 SNAP_LOG_TRACE(
"not considered ready: no leaders.");
1059 if(f_leaders.size() == 1
1060 && f_neighbors_count != 1)
1062 SNAP_LOG_TRACE(
"not considered ready: no enough leaders for this cluster.");
1078 if(f_neighbors_quorum < 3
1079 && f_computers.size() < f_neighbors_count)
1081 SNAP_LOG_TRACE(
"not considered ready: quorum changed, re-election expected soon.");
1089 if(f_computers.size() < f_neighbors_quorum)
1091 SNAP_LOG_TRACE(
"not considered ready: quorum lost, re-election expected soon.");
1097 for(
auto const & l : f_leaders)
1099 if(!l->get_connected())
1101 SNAP_LOG_TRACE(
"not considered ready: no direct connection with leader: \"")
1112 time_t
const now(time(
nullptr));
1113 if(now > f_pace_lockstarted)
1117 f_pace_lockstarted = now + 5;
1121 snap::snap_communicator_message temporary_message;
1122 temporary_message.set_sent_from_server(l->get_name());
1123 temporary_message.set_sent_from_service(
"snaplock");
1124 const_cast<snaplock *
>(
this)->send_lockstarted(&temporary_message);
1170 for(
auto const & l : f_leaders)
1172 if(l->get_id() == id)
1191 switch(f_leaders.size())
1202 return f_leaders[f_leaders[0]->is_self() ? 1 : 0];
1217 switch(f_leaders.size())
1228 return f_leaders[f_leaders[2]->is_self() ? 1 : 2];
1243 void snaplock::info()
1245 SNAP_LOG_INFO(
"++++++++ SNAPLOCK INFO ++++++++");
1246 SNAP_LOG_INFO(
"My leader ID: ")(f_my_id);
1247 SNAP_LOG_INFO(
"My IP address: ")(f_my_ip_address);
1248 SNAP_LOG_INFO(
"Total number of computers: ")(f_neighbors_count)(
" (quorum: ")(f_neighbors_quorum)(
", leaders: ")(f_leaders.size())(
")");
1249 SNAP_LOG_INFO(
"Known computers: ")(f_computers.size());
1250 for(
auto const & c : f_computers)
1252 auto const it(std::find_if(
1255 , [&c](
auto const & l)
1257 return c.second == l;
1260 if(it != f_leaders.end())
1262 leader = QString(
" (LEADER #%1)").arg(it - f_leaders.begin());
1264 SNAP_LOG_INFO(
" -- Computer Name: ")(c.second->get_name())(leader);
1265 SNAP_LOG_INFO(
" -- Computer ID: ")(c.second->get_id());
1266 SNAP_LOG_INFO(
" -- Computer IP Address: ")(c.second->get_ip_address());
1272 void snaplock::debug_info()
1275 SNAP_LOG_TRACE(
"++++ serialized tickets in debug_info(): ")(serialized_tickets().replace(
"\n",
" --- "));
1289 SNAP_LOG_INFO(
"this version of snaplock is not a debug version. The debug_info() function does nothing in this version.");
1304 void snaplock::msg_list_tickets(snap::snap_communicator_message & message)
1307 for(
auto const & obj_ticket : f_tickets)
1309 for(
auto const & key_ticket : obj_ticket.second)
1311 QString
const & obj_name(key_ticket.second->get_object_name());
1312 QString
const & key(key_ticket.second->get_entering_key());
1314 time_t
const lock_timeout(key_ticket.second->get_lock_timeout());
1316 QString timeout_msg;
1317 if(lock_timeout == 0)
1319 time_t
const obtention_timeout(key_ticket.second->get_obtention_timeout());
1320 timeout_msg = QString(
"obtention %1 %2")
1321 .arg(snap::snap_child::date_to_string(obtention_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_SHORT))
1322 .arg(snap::snap_child::date_to_string(obtention_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_TIME));
1326 timeout_msg = QString(
"timeout %1 %2")
1327 .arg(snap::snap_child::date_to_string(lock_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_SHORT))
1328 .arg(snap::snap_child::date_to_string(lock_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_TIME));
1331 QString
const msg(QString(
"ticket id: %1 object name: \"%2\" key: %3 %4\n")
1339 snap::snap_communicator_message list_message;
1340 list_message.set_command(
"TICKETLIST");
1341 list_message.reply_to(message);
1342 list_message.add_parameter(
"list", ticketlist);
1343 send_message(list_message);
1355 void snaplock::ready(snap::snap_communicator_message & message)
1357 snap::NOTUSED(message);
1359 snap::snap_communicator_message clusterstatus_message;
1360 clusterstatus_message.set_command(
"CLUSTERSTATUS");
1361 clusterstatus_message.set_service(
"snapcommunicator");
1362 send_message(clusterstatus_message);
1366 void snaplock::msg_cluster_up(snap::snap_communicator_message & message)
1368 f_neighbors_count = message.get_integer_parameter(
"neighbors_count");
1369 f_neighbors_quorum = f_neighbors_count / 2 + 1;
1371 SNAP_LOG_INFO(
"cluster is up with ")
1373 (
" neightbors, attempt an election then check for leaders by sending a LOCKSTARTED message.");
1377 send_lockstarted(
nullptr);
1381 void snaplock::msg_cluster_down(snap::snap_communicator_message & message)
1383 snap::NOTUSED(message);
1388 SNAP_LOG_INFO(
"cluster is down, canceling existing locks and we have to refuse any further lock requests for a while.");
1396 check_lock_status();
1403 void snaplock::election_status()
1407 if(!f_leaders.empty())
1412 if(f_leaders.size() == 3
1413 || (f_neighbors_count < 3 && f_leaders.size() == f_neighbors_count))
1418 check_lock_status();
1428 if(f_neighbors_count == 0)
1439 if(f_neighbors_quorum < 3
1440 && f_computers.size() < f_neighbors_count)
1454 if(f_computers.size() < f_neighbors_quorum)
1464 for(
auto & c : f_computers)
1469 if(c.second->get_ip_address() < f_my_ip_address)
1480 for(
auto c : f_computers)
1484 if(c.second->get_priority() != computer_t::PRIORITY_OFF)
1486 QString id(c.second->get_id());
1490 auto it(std::find(f_leaders.begin(), f_leaders.end(), c.second));
1491 if(it != f_leaders.end())
1499 sort_by_id[id] = c.second;
1507 if(f_computers.size() <= 3)
1512 "you cannot have any computer turned OFF when you" 1513 " have three or less computers total in your cluster." 1514 " The elections cannot be completed in these" 1519 else if(f_computers.size() - off < 3)
1521 SNAP_LOG_FATAL(
"you have a total of ")
1522 (f_computers.size())
1523 (
" computers in your cluster. You turned off ")
1525 (
" of them, which means less than three are left" 1526 " as candidates for leadership which is not enough." 1527 " You can have a maximum of ")
1528 (f_computers.size() - 3)
1529 (
" that are turned off on this cluster.");
1533 if(sort_by_id.size() < 3
1534 && sort_by_id.size() != f_computers.size())
1547 snap::snap_communicator_message lockleaders_message;
1548 lockleaders_message.set_command(
"LOCKLEADERS");
1549 lockleaders_message.set_service(
"*");
1551 f_election_date = snap::snap_child::get_current_date();
1552 lockleaders_message.add_parameter(
"election_date", f_election_date);
1553 auto leader(sort_by_id.begin());
1554 size_t const max(std::min(static_cast<computer_t::map_t::size_type>(3), sort_by_id.size()));
1555 for(
size_t idx(0); idx < max; ++idx, ++leader)
1557 lockleaders_message.add_parameter(QString(
"leader%1").arg(idx), leader->second->get_id());
1558 f_leaders.push_back(leader->second);
1560 send_message(lockleaders_message);
1562 SNAP_LOG_WARNING(
"election status = add leader(s)... ")(f_computers.size())(
" comps and ")(f_leaders.size())(
" leaders");
1567 check_lock_status();
1571 void snaplock::check_lock_status()
1573 bool const ready(is_ready());
1574 QString
const current_status(ready ?
"LOCKREADY" :
"NOLOCK");
1576 if(f_lock_status != current_status)
1578 f_lock_status = current_status;
1580 snap::snap_communicator_message status_message;
1581 status_message.set_command(current_status);
1582 status_message.set_service(
".");
1583 status_message.add_parameter(
"cache",
"no");
1584 send_message(status_message);
1587 && !f_message_cache.empty())
1598 cache.swap(f_message_cache);
1599 for(
auto mc : cache)
1601 msg_lock(mc.f_message);
1608 void snaplock::send_lockstarted(snap::snap_communicator_message
const * message)
1614 snap::snap_communicator_message lockstarted_message;
1615 lockstarted_message.set_command(
"LOCKSTARTED");
1616 if(message ==
nullptr)
1618 lockstarted_message.set_service(
"*");
1629 lockstarted_message.reply_to(*message);
1634 lockstarted_message.add_parameter(
"server_name", f_server_name);
1635 lockstarted_message.add_parameter(
"lockid", f_my_id);
1636 lockstarted_message.add_parameter(
"starttime", f_start_time);
1640 if(!f_leaders.empty())
1642 lockstarted_message.add_parameter(
"election_date", f_election_date);
1643 for(
size_t idx(0); idx < f_leaders.size(); ++idx)
1645 lockstarted_message.add_parameter(QString(
"leader%1").arg(idx), f_leaders[idx]->get_id());
1649 send_message(lockstarted_message);
1653 void snaplock::msg_lock_leaders(snap::snap_communicator_message & message)
1655 f_election_date = message.get_integer_parameter(
"election_date");
1660 for(
int idx(0); idx < 3; ++idx)
1662 QString
const param_name(QString(
"leader%1").arg(idx));
1663 if(message.has_parameter(param_name))
1666 QString
const lockid(message.get_parameter(param_name));
1667 if(leader->set_id(lockid))
1669 computer_t::map_t::iterator exists(f_computers.find(leader->get_name()));
1670 if(exists != f_computers.end())
1674 f_leaders.push_back(exists->second);
1682 leader->set_connected(
false);
1683 f_computers[leader->get_name()] = leader;
1685 f_leaders.push_back(leader);
1691 if(!f_leaders.empty())
1693 synchronize_leaders();
1703 RAND_bytes(reinterpret_cast<unsigned char *>(&c),
sizeof(c));
1704 f_next_leader = c % f_leaders.size();
1710 check_lock_status();
1724 void snaplock::msg_lock_started(snap::snap_communicator_message & message)
1728 QString
const server_name(message.get_parameter(
"server_name"));
1729 if(server_name.isEmpty())
1733 throw snap::snap_communicator_invalid_message(
"snaplock::msg_lockstarted(): Invalid server name (empty).");
1739 if(server_name == f_server_name)
1744 time_t
const start_time(message.get_integer_parameter(
"starttime"));
1746 computer_t::map_t::iterator it(f_computers.find(server_name));
1747 bool new_computer(it == f_computers.end());
1756 if(!computer->set_id(message.get_parameter(
"lockid")))
1762 computer->set_start_time(start_time);
1764 f_computers[computer->get_name()] = computer;
1768 if(!it->second->get_connected())
1774 new_computer =
true;
1775 it->second->set_connected(
true);
1778 if(it->second->get_start_time() != start_time)
1787 new_computer =
true;
1788 it->second->set_start_time(start_time);
1794 if(message.has_parameter(
"election_date"))
1796 int64_t
const election_date(message.get_integer_parameter(
"election_date"));
1797 if(election_date > f_election_date)
1799 f_election_date = election_date;
1804 bool const set_my_leaders(f_leaders.empty());
1807 for(
int idx(0); idx < 3; ++idx)
1809 QString
const param_name(QString(
"leader%1").arg(idx));
1810 if(message.has_parameter(param_name))
1813 QString
const lockid(message.get_parameter(param_name));
1814 if(leader->set_id(lockid))
1816 computer_t::map_t::iterator exists(f_computers.find(leader->get_name()));
1817 if(exists != f_computers.end())
1821 f_leaders.push_back(exists->second);
1829 leader->set_connected(
false);
1830 f_computers[leader->get_name()] = leader;
1832 f_leaders.push_back(leader);
1845 send_lockstarted(&message);
1862 void snaplock::msg_lock_status(snap::snap_communicator_message & message)
1864 snap::snap_communicator_message status_message;
1865 status_message.set_command(is_ready() ?
"LOCKREADY" :
"NOLOCK");
1866 status_message.reply_to(message);
1867 status_message.add_parameter(
"cache",
"no");
1868 send_message(status_message);
1889 void snaplock::msg_lock_tickets(snap::snap_communicator_message & message)
1891 QString
const tickets(message.get_parameter(
"tickets"));
1896 snap::snap_string_list
const lines(tickets.split(
'\n'));
1897 for(
auto const & l : lines)
1900 snap::snap_string_list
const vars(l.split(
'|'));
1901 auto object_name_value(std::find_if(
1904 , [](QString
const & vv)
1906 return vv.startsWith(
"object_name=");
1908 if(object_name_value != vars.end())
1910 auto entering_key_value(std::find_if(
1913 , [](QString
const & vv)
1915 return vv.startsWith(
"entering_key=");
1917 if(entering_key_value != vars.end())
1921 QString
const object_name(object_name_value->mid(12));
1922 QString
const entering_key(entering_key_value->mid(13));
1924 auto entering_ticket(f_entering_tickets.find(object_name));
1925 if(entering_ticket != f_entering_tickets.end())
1927 auto key_ticket(entering_ticket->second.find(entering_key));
1928 if(key_ticket != entering_ticket->second.end())
1930 ticket = key_ticket->second;
1933 if(ticket ==
nullptr)
1935 auto obj_ticket(f_tickets.find(object_name));
1936 if(obj_ticket != f_tickets.end())
1938 auto key_ticket(std::find_if(
1939 obj_ticket->second.begin()
1940 , obj_ticket->second.end()
1941 , [&entering_key](
auto const & t)
1943 return t.second->get_entering_key() == entering_key;
1945 if(key_ticket != obj_ticket->second.end())
1947 ticket = key_ticket->second;
1954 bool const new_ticket(ticket ==
nullptr);
1961 ticket = std::make_shared<snaplock_ticket>(
1966 , snap::snap_lock::SNAP_LOCK_DEFAULT_TIMEOUT + time(
nullptr)
1967 , snap::snap_lock::SNAP_LOCK_DEFAULT_TIMEOUT
1972 ticket->unserialize(l);
1985 && ticket->is_locked())
1987 auto li(std::find_if(
1990 , [&ticket](
auto const & c)
1992 return ticket->get_owner() == c->get_name();
1994 if(li != f_leaders.end())
1996 f_tickets[object_name][ticket->get_ticket_key()] = ticket;
2014 void snaplock::msg_status(snap::snap_communicator_message & message)
2019 QString
const service(message.get_parameter(
"service"));
2020 if(service ==
"remote connection" 2021 || service ==
"remote communicator connection")
2025 QString
const status(message.get_parameter(
"status"));
2036 msg_server_gone(message);
2055 void snaplock::msg_server_gone(snap::snap_communicator_message & message)
2059 QString
const server_name(message.get_parameter(
"server_name"));
2060 if(server_name.isEmpty()
2061 || server_name == f_server_name)
2070 auto it(f_computers.find(server_name));
2071 if(it == f_computers.end())
2080 f_computers.erase(it);
2088 if(li != f_leaders.end())
2090 f_leaders.erase(li);
2104 check_lock_status();
2126 void snaplock::stop(
bool quitting)
2128 if(f_messenger !=
nullptr)
2130 if(quitting || !f_messenger->is_connected())
2135 f_communicator->remove_connection(f_messenger);
2136 f_messenger.reset();
2140 f_messenger->mark_done();
2145 snap::snap_communicator_message cmd;
2146 cmd.set_command(
"UNREGISTER");
2147 cmd.add_parameter(
"service", f_service_name);
2152 if(f_communicator !=
nullptr)
2154 f_communicator->remove_connection(f_interrupt);
2155 f_interrupt.reset();
2157 f_communicator->remove_connection(f_info);
2160 f_communicator->remove_connection(f_debug_info);
2161 f_debug_info.reset();
2163 f_communicator->remove_connection(f_timer);
2197 void snaplock::get_parameters(snap::snap_communicator_message
const & message, QString * object_name, pid_t * client_pid, time_t * timeout, QString * key, QString * source)
2202 if(object_name !=
nullptr)
2204 *object_name = message.get_parameter(
"object_name");
2205 if(object_name->isEmpty())
2209 throw snap::snap_communicator_invalid_message(
"snaplock::get_parameters(): Invalid object name. We cannot lock the empty string.");
2217 if(client_pid !=
nullptr)
2219 *client_pid = message.get_integer_parameter(
"pid");
2224 throw snap::snap_communicator_invalid_message(QString(
"snaplock::get_parameters(): Invalid pid specified for a lock (%1). It must be a positive decimal number.").arg(message.get_parameter(
"pid")).toUtf8().data());
2231 if(timeout !=
nullptr)
2233 if(message.has_parameter(
"timeout"))
2238 *timeout = message.get_integer_parameter(
"timeout");
2242 *timeout = time(
nullptr) + DEFAULT_TIMEOUT;
2250 *key = message.get_parameter(
"key");
2255 throw snap::snap_communicator_invalid_message(
"snaplock::get_parameters(): A key cannot be an empty string.");
2261 if(source !=
nullptr)
2263 *source = message.get_parameter(
"source");
2264 if(source->isEmpty())
2268 throw snap::snap_communicator_invalid_message(
"snaplock::get_parameters(): A source cannot be an empty string.");
2285 void snaplock::msg_absolutely(snap::snap_communicator_message & message)
2287 QString
const serial(message.get_parameter(
"serial"));
2288 snap::snap_string_list
const segments(serial.split(
'/'));
2290 if(segments[0] ==
"relock")
2295 if(segments.size() != 4)
2297 SNAP_LOG_WARNING(
"ABSOLUTELY reply has an invalid relock serial parameters \"")
2299 (
"\" was expected to have exactly 4 segments.");
2301 snap::snap_communicator_message lock_failed_message;
2302 lock_failed_message.set_command(
"LOCKFAILED");
2303 lock_failed_message.reply_to(message);
2304 lock_failed_message.add_parameter(
"object_name",
"unknown");
2305 lock_failed_message.add_parameter(
"error",
"invalid");
2306 send_message(lock_failed_message);
2313 QString
const object_name(segments[1]);
2314 QString
const server_name(segments[2]);
2315 QString
const client_pid(segments[3]);
2317 auto entering_ticket(f_entering_tickets.find(object_name));
2318 if(entering_ticket != f_entering_tickets.end())
2320 QString
const entering_key(QString(
"%1/%2").arg(server_name).arg(client_pid));
2321 auto key_ticket(entering_ticket->second.find(entering_key));
2322 if(key_ticket != entering_ticket->second.end())
2326 key_ticket->second->set_alive_timeout(0);
2330 key_ticket->second->entering();
2377 void snaplock::msg_lock(snap::snap_communicator_message & message)
2379 QString object_name;
2380 pid_t client_pid(0);
2382 get_parameters(message, &object_name, &client_pid, &timeout,
nullptr,
nullptr);
2390 QString
const server_name(message.has_parameter(
"lock_proxy_server_name")
2391 ? message.get_parameter(
"lock_proxy_server_name")
2392 : message.get_sent_from_server());
2394 QString
const service_name(message.has_parameter(
"lock_proxy_service_name")
2395 ? message.get_parameter(
"lock_proxy_service_name")
2396 : message.get_sent_from_service());
2398 QString
const entering_key(QString(
"%1/%2").arg(server_name).arg(client_pid));
2400 if(timeout <= time(
nullptr))
2402 SNAP_LOG_WARNING(
"Lock on \"")
2406 (
"\" timed out before we could start the locking process.");
2408 snap::snap_communicator_message lock_failed_message;
2409 lock_failed_message.set_command(
"LOCKFAILED");
2410 lock_failed_message.reply_to(message);
2411 lock_failed_message.add_parameter(
"object_name", object_name);
2412 lock_failed_message.add_parameter(
"key", entering_key);
2413 lock_failed_message.add_parameter(
"error",
"timedout");
2414 send_message(lock_failed_message);
2419 snap::snap_lock::timeout_t
const duration(message.get_integer_parameter(
"duration"));
2420 if(duration < snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2424 SNAP_LOG_ERROR(duration)
2425 (
" is an invalid duration, the minimum accepted is ")
2426 (snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2429 snap::snap_communicator_message lock_failed_message;
2430 lock_failed_message.set_command(
"LOCKFAILED");
2431 lock_failed_message.reply_to(message);
2432 lock_failed_message.add_parameter(
"object_name", object_name);
2433 lock_failed_message.add_parameter(
"key", entering_key);
2434 lock_failed_message.add_parameter(
"error",
"invalid");
2435 send_message(lock_failed_message);
2440 snap::snap_lock::timeout_t unlock_duration(snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT);
2441 if(message.has_parameter(
"unlock_duration"))
2443 unlock_duration = message.get_integer_parameter(
"unlock_duration");
2444 if(unlock_duration < snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2448 SNAP_LOG_ERROR(unlock_duration)
2449 (
" is an invalid unlock duration, the minimum accepted is ")
2450 (snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2453 snap::snap_communicator_message lock_failed_message;
2454 lock_failed_message.set_command(
"LOCKFAILED");
2455 lock_failed_message.reply_to(message);
2456 lock_failed_message.add_parameter(
"object_name", object_name);
2457 lock_failed_message.add_parameter(
"key", entering_key);
2458 lock_failed_message.add_parameter(
"error",
"invalid");
2459 send_message(lock_failed_message);
2467 SNAP_LOG_TRACE(
"caching LOCK message for \"")
2469 (
"\" as the snaplock system is not yet considered ready.");
2476 f_message_cache.push_back(mc);
2480 int64_t
const timeout_date(f_messenger->get_timeout_date());
2481 if(timeout_date == -1
2482 || timeout_date > timeout)
2484 f_timer->set_timeout_date(timeout);
2489 if(is_leader() ==
nullptr)
2494 forward_message_to_leader(message);
2500 auto entering_ticket(f_entering_tickets.find(object_name));
2501 if(entering_ticket != f_entering_tickets.end())
2503 auto key_ticket(entering_ticket->second.find(entering_key));
2504 if(key_ticket != entering_ticket->second.end())
2509 if(message.has_parameter(
"serial"))
2512 if(key_ticket->second->get_serial() == serial)
2524 SNAP_LOG_ERROR(
"an entering ticket has the same object name \"")
2526 (
"\" and entering key \"")
2530 snap::snap_communicator_message lock_failed_message;
2531 lock_failed_message.set_command(
"LOCKFAILED");
2532 lock_failed_message.reply_to(message);
2533 lock_failed_message.add_parameter(
"object_name", object_name);
2534 lock_failed_message.add_parameter(
"key", entering_key);
2535 lock_failed_message.add_parameter(
"error",
"duplicate");
2536 send_message(lock_failed_message);
2549 auto obj_ticket(f_tickets.find(object_name));
2550 if(obj_ticket != f_tickets.end())
2552 auto key_ticket(std::find_if(
2553 obj_ticket->second.begin()
2554 , obj_ticket->second.end()
2555 , [&entering_key](
auto const & t)
2557 return t.second->get_entering_key() == entering_key;
2559 if(key_ticket != obj_ticket->second.end())
2563 SNAP_LOG_ERROR(
"a ticket has the same object name \"")
2565 (
"\" and entering key \"")
2569 snap::snap_communicator_message lock_failed_message;
2570 lock_failed_message.set_command(
"LOCKFAILED");
2571 lock_failed_message.reply_to(message);
2572 lock_failed_message.add_parameter(
"object_name", object_name);
2573 lock_failed_message.add_parameter(
"key", entering_key);
2574 lock_failed_message.add_parameter(
"error",
"duplicate");
2575 send_message(lock_failed_message);
2591 f_entering_tickets[object_name][entering_key] = ticket;
2595 ticket->set_unlock_duration(unlock_duration);
2599 f_ticket_serial = (f_ticket_serial + 1) & 0x00FFFFFF;
2600 if(f_leaders[0]->get_id() != f_my_id)
2602 if(f_leaders.size() >= 2
2603 && f_leaders[1]->get_id() != f_my_id)
2605 f_ticket_serial |= 1 << 24;
2607 else if(f_leaders.size() >= 3
2608 && f_leaders[2]->get_id() != f_my_id)
2610 f_ticket_serial |= 2 << 24;
2613 ticket->set_serial(f_ticket_serial);
2615 if(message.has_parameter(
"serial"))
2622 ticket->set_alive_timeout(5 + time(
nullptr));
2624 snap::snap_communicator_message alive_message;
2625 alive_message.set_command(
"ALIVE");
2626 alive_message.set_server(server_name);
2627 alive_message.set_service(service_name);
2628 alive_message.add_parameter(
"serial", QString(
"relock/%1/%2").arg(object_name).arg(entering_key));
2629 alive_message.add_parameter(
"timestamp", time(
nullptr));
2630 send_message(alive_message);
2653 void snaplock::msg_unlock(snap::snap_communicator_message & message)
2657 SNAP_LOG_ERROR(
"received an UNLOCK when snaplock is not ready to receive LOCK messages.");
2661 if(is_leader() ==
nullptr)
2666 forward_message_to_leader(message);
2670 QString object_name;
2671 pid_t client_pid(0);
2672 get_parameters(message, &object_name, &client_pid,
nullptr,
nullptr,
nullptr);
2676 auto obj_ticket(f_tickets.find(object_name));
2677 if(obj_ticket != f_tickets.end())
2679 QString
const server_name(message.has_parameter(
"lock_proxy_server_name")
2680 ? message.get_parameter(
"lock_proxy_server_name")
2681 : message.get_sent_from_server());
2687 QString
const entering_key(QString(
"%1/%2").arg(server_name).arg(client_pid));
2688 auto key_ticket(std::find_if(
2689 obj_ticket->second.begin()
2690 , obj_ticket->second.end()
2691 , [&entering_key](
auto const & t)
2693 return t.second->get_entering_key() == entering_key;
2695 if(key_ticket != obj_ticket->second.end())
2701 key_ticket->second->drop_ticket();
2703 obj_ticket->second.erase(key_ticket);
2704 if(obj_ticket->second.empty())
2708 f_tickets.erase(obj_ticket);
2711 else SNAP_LOG_WARNING(
"and we could not find that key in that object's map...");
2726 void snaplock::msg_lock_entering(snap::snap_communicator_message & message)
2728 QString object_name;
2732 get_parameters(message, &object_name,
nullptr, &timeout, &key, &source);
2737 if(timeout > time(
nullptr))
2745 bool allocate(
true);
2746 auto const obj_ticket(f_entering_tickets.find(object_name));
2747 if(obj_ticket != f_entering_tickets.end())
2749 auto const key_ticket(obj_ticket->second.find(key));
2750 allocate = key_ticket == obj_ticket->second.end();
2757 int32_t
const duration(message.get_integer_parameter(
"duration"));
2758 if(duration < snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2762 SNAP_LOG_ERROR(duration)
2763 (
" is an invalid duration, the minimum accepted is ")
2764 (snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2767 snap::snap_communicator_message lock_failed_message;
2768 lock_failed_message.set_command(
"LOCKFAILED");
2769 lock_failed_message.reply_to(message);
2770 lock_failed_message.add_parameter(
"object_name", object_name);
2771 lock_failed_message.add_parameter(
"key", key);
2772 lock_failed_message.add_parameter(
"error",
"invalid");
2773 send_message(lock_failed_message);
2778 int32_t unlock_duration(snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT);
2779 if(message.has_parameter(
"unlock_duration"))
2781 unlock_duration = message.get_integer_parameter(
"unlock_duration");
2782 if(unlock_duration != snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT
2783 && unlock_duration < snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2787 SNAP_LOG_ERROR(duration)
2788 (
" is an invalid unlock duration, the minimum accepted is ")
2789 (snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2792 snap::snap_communicator_message lock_failed_message;
2793 lock_failed_message.set_command(
"LOCKFAILED");
2794 lock_failed_message.reply_to(message);
2795 lock_failed_message.add_parameter(
"object_name", object_name);
2796 lock_failed_message.add_parameter(
"key", key);
2797 lock_failed_message.add_parameter(
"error",
"invalid");
2798 send_message(lock_failed_message);
2806 snap::snap_string_list
const source_segments(source.split(
"/"));
2807 if(source_segments.size() != 2)
2809 SNAP_LOG_ERROR(
"Invalid number of parameters in source (found ")
2810 (source_segments.size())
2813 snap::snap_communicator_message lock_failed_message;
2814 lock_failed_message.set_command(
"LOCKFAILED");
2815 lock_failed_message.reply_to(message);
2816 lock_failed_message.add_parameter(
"object_name", object_name);
2817 lock_failed_message.add_parameter(
"key", key);
2818 lock_failed_message.add_parameter(
"error",
"invalid");
2819 send_message(lock_failed_message);
2831 , source_segments[0]
2832 , source_segments[1]));
2834 f_entering_tickets[object_name][key] = ticket;
2838 ticket->set_owner(message.get_sent_from_server());
2839 ticket->set_unlock_duration(unlock_duration);
2840 ticket->set_serial(message.get_integer_parameter(
"serial"));
2843 snap::snap_communicator_message reply;
2844 reply.set_command(
"LOCKENTERED");
2845 reply.reply_to(message);
2846 reply.add_parameter(
"object_name", object_name);
2847 reply.add_parameter(
"key", key);
2848 send_message(reply);
2852 SNAP_LOG_DEBUG(
"received LOCKENTERING while we are thinking we are not ready.");
2870 void snaplock::msg_lock_entered(snap::snap_communicator_message & message)
2872 QString object_name;
2874 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
2876 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
2877 if(obj_entering_ticket != f_entering_tickets.end())
2879 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
2880 if(key_entering_ticket != obj_entering_ticket->second.end())
2882 key_entering_ticket->second->entered();
2894 void snaplock::msg_lock_exiting(snap::snap_communicator_message & message)
2896 QString object_name;
2898 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
2902 auto const obj_entering(f_entering_tickets.find(object_name));
2903 if(obj_entering != f_entering_tickets.end())
2905 auto const key_entering(obj_entering->second.find(key));
2906 if(key_entering != obj_entering->second.end())
2908 obj_entering->second.erase(key_entering);
2913 bool run_activation(
false);
2914 auto const obj_ticket(f_tickets.find(object_name));
2915 if(obj_ticket != f_tickets.end())
2917 for(
auto const & key_ticket : obj_ticket->second)
2919 key_ticket.second->remove_entering(key);
2920 run_activation =
true;
2940 activate_first_lock(object_name);
2943 if(obj_entering->second.empty())
2945 f_entering_tickets.erase(obj_entering);
2972 void snaplock::msg_drop_ticket(snap::snap_communicator_message & message)
2974 QString object_name;
2976 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
2978 snap::snap_string_list
const segments(key.split(
'/'));
2985 QString entering_key;
2986 if(segments.size() == 3)
2988 auto obj_ticket(f_tickets.find(object_name));
2989 if(obj_ticket != f_tickets.end())
2991 auto key_ticket(obj_ticket->second.find(key));
2992 if(key_ticket != obj_ticket->second.end())
2994 obj_ticket->second.erase(key_ticket);
2997 if(obj_ticket->second.empty())
2999 f_tickets.erase(obj_ticket);
3004 activate_first_lock(object_name);
3011 entering_key = QString(
"%1/%2").arg(segments[1]).arg(segments[2]);
3022 auto obj_entering_ticket(f_entering_tickets.find(object_name));
3023 if(obj_entering_ticket != f_entering_tickets.end())
3025 auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
3026 if(key_entering_ticket != obj_entering_ticket->second.end())
3028 obj_entering_ticket->second.erase(key_entering_ticket);
3031 if(obj_entering_ticket->second.empty())
3033 f_entering_tickets.erase(obj_entering_ticket);
3052 void snaplock::msg_get_max_ticket(snap::snap_communicator_message & message)
3054 QString object_name;
3056 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3065 snap::snap_communicator_message reply;
3066 reply.set_command(
"MAXTICKET");
3067 reply.reply_to(message);
3068 reply.add_parameter(
"object_name", object_name);
3069 reply.add_parameter(
"key", key);
3070 reply.add_parameter(
"ticket_id", last_ticket);
3071 send_message(reply);
3085 void snaplock::msg_max_ticket(snap::snap_communicator_message & message)
3087 QString object_name;
3089 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3093 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3094 if(obj_entering_ticket != f_entering_tickets.end())
3096 auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3097 if(key_entering_ticket != obj_entering_ticket->second.end())
3099 key_entering_ticket->second->max_ticket(message.get_integer_parameter(
"ticket_id"));
3120 void snaplock::msg_add_ticket(snap::snap_communicator_message & message)
3122 QString object_name;
3125 get_parameters(message, &object_name,
nullptr, &timeout, &key,
nullptr);
3129 auto const obj_ticket(f_tickets.find(object_name));
3130 if(obj_ticket != f_tickets.end())
3132 auto const key_ticket(obj_ticket->second.find(key));
3133 if(key_ticket != obj_ticket->second.end())
3137 throw std::logic_error(
"snaplock::add_ticket() ticket already exists");
3145 snap::snap_string_list
const segments(key.split(
'/'));
3146 if(segments.size() != 3)
3148 SNAP_LOG_ERROR(
"Expected exactly 3 segments in \"")
3150 (
"\" to add a ticket.");
3152 snap::snap_communicator_message lock_failed_message;
3153 lock_failed_message.set_command(
"LOCKFAILED");
3154 lock_failed_message.reply_to(message);
3155 lock_failed_message.add_parameter(
"object_name", object_name);
3156 lock_failed_message.add_parameter(
"key", key);
3157 lock_failed_message.add_parameter(
"error",
"invalid");
3158 send_message(lock_failed_message);
3164 uint32_t
const number(segments[0].toUInt(&ok, 16));
3167 SNAP_LOG_ERROR(
"somehow ticket number \"")
3169 (
"\" is not a valid hexadecimal number");
3171 snap::snap_communicator_message lock_failed_message;
3172 lock_failed_message.set_command(
"LOCKFAILED");
3173 lock_failed_message.reply_to(message);
3174 lock_failed_message.add_parameter(
"object_name", object_name);
3175 lock_failed_message.add_parameter(
"key", key);
3176 lock_failed_message.add_parameter(
"error",
"invalid");
3177 send_message(lock_failed_message);
3185 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3186 if(obj_entering_ticket == f_entering_tickets.end())
3188 SNAP_LOG_ERROR(
"Expected entering ticket object for \"")
3190 (
"\" not found when adding a ticket.");
3192 snap::snap_communicator_message lock_failed_message;
3193 lock_failed_message.set_command(
"LOCKFAILED");
3194 lock_failed_message.reply_to(message);
3195 lock_failed_message.add_parameter(
"object_name", object_name);
3196 lock_failed_message.add_parameter(
"key", key);
3197 lock_failed_message.add_parameter(
"error",
"invalid");
3198 send_message(lock_failed_message);
3206 QString
const entering_key(QString(
"%1/%2").arg(segments[1]).arg(segments[2]));
3207 auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
3208 if(key_entering_ticket == obj_entering_ticket->second.end())
3210 SNAP_LOG_ERROR(
"Expected entering ticket key for \"")
3212 (
"\" not found when adding a ticket.");
3214 snap::snap_communicator_message lock_failed_message;
3215 lock_failed_message.set_command(
"LOCKFAILED");
3216 lock_failed_message.reply_to(message);
3217 lock_failed_message.add_parameter(
"object_name", object_name);
3218 lock_failed_message.add_parameter(
"key", key);
3219 lock_failed_message.add_parameter(
"error",
"invalid");
3220 send_message(lock_failed_message);
3230 set_ticket(object_name, key, key_entering_ticket->second);
3236 f_tickets[object_name][key]->set_ticket_number(number);
3238 snap::snap_communicator_message ticket_added_message;
3239 ticket_added_message.set_command(
"TICKETADDED");
3240 ticket_added_message.reply_to(message);
3241 ticket_added_message.add_parameter(
"object_name", object_name);
3242 ticket_added_message.add_parameter(
"key", key);
3243 send_message(ticket_added_message);
3254 void snaplock::msg_ticket_added(snap::snap_communicator_message & message)
3256 QString object_name;
3258 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3260 auto const obj_ticket(f_tickets.find(object_name));
3261 if(obj_ticket != f_tickets.end())
3263 auto const key_ticket(obj_ticket->second.find(key));
3264 if(key_ticket != obj_ticket->second.end())
3268 auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3269 if(obj_entering_ticket == f_entering_tickets.end())
3275 SNAP_LOG_TRACE(
"called with object \"")
3277 (
"\" not present in f_entering_ticket (key: \"")
3282 key_ticket->second->ticket_added(obj_entering_ticket->second);
3286 SNAP_LOG_DEBUG(
"found object \"")
3288 (
"\" but could not find a ticket with key \"")
3295 SNAP_LOG_DEBUG(
"object \"")
3309 void snaplock::msg_ticket_ready(snap::snap_communicator_message & message)
3311 QString object_name;
3313 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3315 auto obj_ticket(f_tickets.find(object_name));
3316 if(obj_ticket != f_tickets.end())
3318 auto key_ticket(obj_ticket->second.find(key));
3319 if(key_ticket != obj_ticket->second.end())
3323 key_ticket->second->set_ready();
3343 void snaplock::msg_activate_lock(snap::snap_communicator_message & message)
3345 QString object_name;
3347 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3349 QString first_key(
"no-key");
3351 auto ticket(find_first_lock(object_name));
3352 if(ticket !=
nullptr)
3356 first_key = ticket->get_ticket_key();
3358 if(key == first_key)
3362 ticket->lock_activated();
3369 snap::snap_communicator_message lock_activated_message;
3370 lock_activated_message.set_command(
"LOCKACTIVATED");
3371 lock_activated_message.reply_to(message);
3372 lock_activated_message.add_parameter(
"object_name", object_name);
3373 lock_activated_message.add_parameter(
"key", key);
3374 lock_activated_message.add_parameter(
"other_key", first_key);
3375 send_message(lock_activated_message);
3392 void snaplock::msg_lock_activated(snap::snap_communicator_message & message)
3394 QString object_name;
3396 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3398 QString
const & other_key(message.get_parameter(
"other_key"));
3399 if(other_key == key)
3401 auto obj_ticket(f_tickets.find(object_name));
3402 if(obj_ticket != f_tickets.end())
3404 auto key_ticket(obj_ticket->second.find(key));
3405 if(key_ticket != obj_ticket->second.end())
3410 key_ticket->second->lock_activated();
3442 void snaplock::msg_lock_failed(snap::snap_communicator_message & message)
3444 QString object_name;
3446 get_parameters(message, &object_name,
nullptr,
nullptr, &key,
nullptr);
3448 QString forward_server;
3449 QString forward_service;
3453 auto obj_entering(f_entering_tickets.find(object_name));
3454 if(obj_entering != f_entering_tickets.end())
3456 auto key_entering(obj_entering->second.find(key));
3457 if(key_entering != obj_entering->second.end())
3459 forward_server = key_entering->second->get_server_name();
3460 forward_service = key_entering->second->get_service_name();
3462 obj_entering->second.erase(key_entering);
3465 if(obj_entering->second.empty())
3467 obj_entering = f_entering_tickets.erase(obj_entering);
3477 auto obj_ticket(f_tickets.find(object_name));
3478 if(obj_ticket != f_tickets.end())
3480 bool try_activate(
false);
3481 auto key_ticket(obj_ticket->second.find(key));
3482 if(key_ticket == obj_ticket->second.end())
3484 key_ticket = std::find_if(
3485 obj_ticket->second.begin()
3486 , obj_ticket->second.end()
3487 , [&key](
auto const & t)
3489 return t.second->get_entering_key() == key;
3492 if(key_ticket != obj_ticket->second.end())
3498 forward_server = key_ticket->second->get_server_name();
3499 forward_service = key_ticket->second->get_service_name();
3501 obj_ticket->second.erase(key_ticket);
3502 try_activate =
true;
3505 if(obj_ticket->second.empty())
3507 obj_ticket = f_tickets.erase(obj_ticket);
3515 activate_first_lock(obj_ticket->first);
3522 if(!forward_server.isEmpty()
3523 && !forward_service.isEmpty())
3528 message.set_server(forward_server);
3529 message.set_service(forward_service);
3530 send_message(message);
3564 void snaplock::activate_first_lock(QString
const & object_name)
3566 auto ticket(find_first_lock(object_name));
3568 if(ticket !=
nullptr)
3574 ticket->activate_lock();
3582 auto const obj_ticket(f_tickets.find(object_name));
3584 if(obj_ticket != f_tickets.end())
3596 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3598 if(key_ticket->second->timed_out())
3603 key_ticket->second->lock_failed();
3604 if(key_ticket->second->timed_out())
3608 key_ticket = obj_ticket->second.erase(key_ticket);
3613 if(first_ticket ==
nullptr)
3615 first_ticket = key_ticket->second;
3621 if(obj_ticket->second.empty())
3625 f_tickets.erase(obj_ticket);
3629 return first_ticket;
3655 void snaplock::synchronize_leaders()
3661 if(f_leaders.size() <= 1)
3677 bool const leader0(f_leaders[0]->get_id() == f_my_id);
3681 snap::snap_communicator_message::vector_t local_locks;
3692 for(
auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); ++obj_entering)
3694 for(
auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
3696 QString
const owner_name(key_entering->second->get_owner());
3697 auto key_leader(std::find_if(
3700 , [&owner_name](
auto const & l)
3702 return l->get_name() == owner_name;
3704 if(key_leader == f_leaders.end())
3708 snap::snap_communicator_message lock_message;
3709 lock_message.set_command(
"LOCK");
3710 lock_message.set_server(f_leaders[0]->get_name());
3711 lock_message.set_service(
"snaplock");
3712 lock_message.set_sent_from_server(key_entering->second->get_server_name());
3713 lock_message.set_sent_from_service(key_entering->second->get_service_name());
3714 lock_message.add_parameter(
"object_name", key_entering->second->get_object_name());
3715 lock_message.add_parameter(
"pid", key_entering->second->get_client_pid());
3716 lock_message.add_parameter(
"timeout", key_entering->second->get_obtention_timeout());
3717 lock_message.add_parameter(
"duration", key_entering->second->get_lock_duration());
3718 lock_message.add_parameter(
"unlock_duration", key_entering->second->get_unlock_duration());
3726 key_entering = obj_entering->second.erase(key_entering);
3727 local_locks.push_back(lock_message);
3734 lock_message.add_parameter(
"serial", key_entering->second->get_serial());
3735 send_message(lock_message);
3752 for(
auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); ++obj_ticket)
3754 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3756 QString
const owner_name(key_ticket->second->get_owner());
3757 auto key_leader(std::find_if(
3760 , [&owner_name](
auto const & l)
3762 return l->get_name() == owner_name;
3764 if(key_ticket->second->is_locked())
3769 if(key_leader == f_leaders.end())
3771 key_ticket->second->set_owner(f_leaders[0]->get_name());
3777 serialized += key_ticket->second->serialize();
3778 serialized += QChar(
'\n');
3787 if(key_leader == f_leaders.end())
3791 snap::snap_communicator_message lock_message;
3792 lock_message.set_command(
"LOCK");
3793 lock_message.set_server(f_leaders[0]->get_name());
3794 lock_message.set_service(
"snaplock");
3795 lock_message.set_sent_from_server(key_ticket->second->get_server_name());
3796 lock_message.set_sent_from_service(key_ticket->second->get_service_name());
3797 lock_message.add_parameter(
"object_name", key_ticket->second->get_object_name());
3798 lock_message.add_parameter(
"pid", key_ticket->second->get_client_pid());
3799 lock_message.add_parameter(
"timeout", key_ticket->second->get_obtention_timeout());
3800 lock_message.add_parameter(
"duration", key_ticket->second->get_lock_duration());
3801 lock_message.add_parameter(
"unlock_duration", key_ticket->second->get_unlock_duration());
3806 key_ticket = obj_ticket->second.erase(key_ticket);
3807 local_locks.push_back(lock_message);
3814 lock_message.add_parameter(
"serial", key_ticket->second->get_serial());
3815 send_message(lock_message);
3831 for(
auto lm : local_locks)
3838 if(!serialized.isEmpty())
3840 snap::snap_communicator_message lock_tickets_message;
3841 lock_tickets_message.set_command(
"LOCKTICKETS");
3842 lock_tickets_message.set_service(
"snaplock");
3843 lock_tickets_message.add_parameter(
"tickets", serialized);
3845 auto const la(get_leader_a());
3848 lock_tickets_message.set_server(la->get_name());
3849 send_message(lock_tickets_message);
3851 auto const lb(get_leader_b());
3854 lock_tickets_message.set_server(lb->get_name());
3855 send_message(lock_tickets_message);
3875 void snaplock::forward_message_to_leader(snap::snap_communicator_message & message)
3885 message.set_service(
"snaplock");
3886 message.add_parameter(
"lock_proxy_server_name", message.get_sent_from_server());
3887 message.add_parameter(
"lock_proxy_service_name", message.get_sent_from_service());
3889 f_next_leader = (f_next_leader + 1) % f_leaders.size();
3890 message.set_server(f_leaders[f_next_leader]->get_name());
3892 send_message(message);
3903 void snaplock::cleanup()
3905 time_t next_timeout(std::numeric_limits<time_t>::max());
3910 for(
auto c(f_message_cache.begin()); c != f_message_cache.end(); )
3912 if(c->f_timeout <= time(
nullptr))
3914 QString object_name;
3915 pid_t client_pid(0);
3917 get_parameters(c->f_message, &object_name, &client_pid, &timeout,
nullptr,
nullptr);
3919 SNAP_LOG_WARNING(
"Lock on \"")(object_name)(
"\" / \"")(client_pid)(
"\" timed out before leaders were known.");
3921 QString
const server_name(c->f_message.has_parameter(
"lock_proxy_server_name")
3922 ? c->f_message.get_parameter(
"lock_proxy_server_name")
3923 : c->f_message.get_sent_from_server());
3924 QString
const entering_key(QString(
"%1/%2").arg(server_name).arg(client_pid));
3926 snap::snap_communicator_message lock_failed_message;
3927 lock_failed_message.set_command(
"LOCKFAILED");
3928 lock_failed_message.reply_to(c->f_message);
3929 lock_failed_message.add_parameter(
"object_name", object_name);
3930 lock_failed_message.add_parameter(
"key", entering_key);
3931 lock_failed_message.add_parameter(
"error",
"timedout");
3932 send_message(lock_failed_message);
3934 c = f_message_cache.erase(c);
3938 if(c->f_timeout < next_timeout)
3940 next_timeout = c->f_timeout;
3948 for(
auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); )
3950 bool try_activate(
false);
3951 for(
auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3953 if(key_ticket->second->timed_out())
3955 key_ticket->second->lock_failed();
3956 if(key_ticket->second->timed_out())
3960 key_ticket = obj_ticket->second.erase(key_ticket);
3961 try_activate =
true;
3966 if(key_ticket->second->get_current_timeout() < next_timeout)
3968 next_timeout = key_ticket->second->get_current_timeout();
3974 if(obj_ticket->second.empty())
3976 obj_ticket = f_tickets.erase(obj_ticket);
3984 activate_first_lock(obj_ticket->first);
3993 for(
auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); )
3995 for(
auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
3997 if(key_entering->second->timed_out())
3999 key_entering->second->lock_failed();
4000 if(key_entering->second->timed_out())
4004 key_entering = obj_entering->second.erase(key_entering);
4009 if(key_entering->second->get_current_timeout() < next_timeout)
4011 next_timeout = key_entering->second->get_current_timeout();
4017 if(obj_entering->second.empty())
4019 obj_entering = f_entering_tickets.erase(obj_entering);
4029 if(next_timeout != std::numeric_limits<time_t>::max())
4037 f_timer->set_timeout_date((next_timeout + 1) * 1000000LL);
4041 f_timer->set_timeout_date(-1);
4072 auto obj_ticket(f_tickets.find(object_name));
4073 if(obj_ticket != f_tickets.end())
4079 for(
auto key_ticket : obj_ticket->second)
4082 if(ticket_number > last_ticket)
4084 last_ticket = ticket_number;
4105 f_tickets[object_name][key] = ticket;
4122 auto const it(f_entering_tickets.find(object_name));
4123 if(it == f_entering_tickets.end())
4139 void snaplock::lock_exiting(snap::snap_communicator_message & message)
4141 msg_lock_exiting(message);
4153 void snaplock::tool_message(snap::snap_communicator_message
const & message)
4155 SNAP_LOG_TRACE(
"tool received message [")(message.to_message())(
"] for ")(f_server_name);
4157 QString
const command(message.get_command());
4159 switch(command[0].unicode())
4162 if(command ==
"HELP")
4166 snap::snap_communicator_message reply;
4167 reply.set_command(
"COMMANDS");
4173 reply.add_parameter(
"list",
"CLUSTERDOWN,CLUSTERUP,HELP,QUITTING,READY,STOP,TICKETLIST,UNKNOWN");
4175 send_message(reply);
4181 if(command ==
"QUITTING")
4194 if(command ==
"READY")
4196 if(f_opt.is_defined(
"list"))
4198 snap::snap_communicator_message list_message;
4199 list_message.set_command(
"LISTTICKETS");
4200 list_message.set_service(
"snaplock");
4201 list_message.set_server(f_server_name);
4202 list_message.add_parameter(
"cache",
"no");
4203 list_message.add_parameter(
"transmission_report",
"failure");
4204 send_message(list_message);
4211 if(command ==
"STOP")
4221 if(command ==
"TICKETLIST")
4225 ticket_list(message);
4229 else if(command ==
"TRANSMISSIONREPORT")
4231 QString
const status(message.get_parameter(
"status"));
4232 if(status ==
"failed")
4234 SNAP_LOG_ERROR(
"the transmission of our TICKLIST message failed to travel to a snaplock service");
4242 if(command ==
"UNKNOWN")
4246 SNAP_LOG_ERROR(
"we sent unknown command \"")(message.get_parameter(
"command"))(
"\" and probably did not get the expected result (2).");
4255 SNAP_LOG_ERROR(
"unsupported command \"")(command)(
"\" was received on the connection with Snap! Communicator.");
4257 snap::snap_communicator_message reply;
4258 reply.set_command(
"UNKNOWN");
4259 reply.add_parameter(
"command", command);
4260 send_message(reply);
4271 void snaplock::ticket_list(snap::snap_communicator_message
const & message)
4273 QString
const list(message.get_parameter(
"list"));
4282 std::cout << std::endl <<
"...no locks found..." << std::endl;
4286 std::cout << std::endl << list << std::endl;
4291 QString snaplock::serialized_tickets()
4295 for(
auto const & obj_ticket : f_tickets)
4297 for(
auto const & key_ticket : obj_ticket.second)
4299 result += key_ticket.second->serialize();
4300 result += QChar(
'\n');
static ticket_id_t const NO_TICKET
advgetopt::option const g_options[]
std::shared_ptr< computer_t > pointer_t
#define SNAPLOCK_VERSION_STRING
std::vector< message_cache > vector_t
snaplock(int argc, char *argv[])
Initializes a snaplock object.
Handle the locks timeout.
advgetopt::options_environment const g_options_environment
std::shared_ptr< snaplock_ticket > pointer_t
std::map< QString, pointer_t > map_t
Handle the SIGINT Unix signal.
Handle the SIGUSR2 Unix signal.
Handle the SIGUSR1 Unix signal.
std::map< QString, pointer_t > key_map_t
Handle messages from the Snap Communicator server.