snaplock: snaplock.cpp Source File

snaplock  1.7.14
The multi-computer lock service.
snaplock.cpp
Go to the documentation of this file.
1 /*
2  * Text:
3  * snaplock/src/snaplock.cpp
4  *
5  * Description:
6  * A daemon to synchronize processes between any number of computers
7  * by blocking all of them but one.
8  *
9  * License:
10  * Copyright (c) 2016-2019 Made to Order Software Corp. All Rights Reserved
11  *
14  *
15  * Permission is hereby granted, free of charge, to any person obtaining a
16  * copy of this software and associated documentation files (the
17  * "Software"), to deal in the Software without restriction, including
18  * without limitation the rights to use, copy, modify, merge, publish,
19  * distribute, sublicense, and/or sell copies of the Software, and to
20  * permit persons to whom the Software is furnished to do so, subject to
21  * the following conditions:
22  *
23  * The above copyright notice and this permission notice shall be included
24  * in all copies or substantial portions of the Software.
25  *
26  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
27  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
28  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
29  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
30  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
31  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
32  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33  */
34 
35 
36 // self
37 //
38 #include "snaplock.h"
39 
40 
41 // snapmanager lib
42 //
43 #include "version.h"
44 
45 
46 // snapwebsites lib
47 //
48 #include <snapwebsites/log.h>
49 #include <snapwebsites/qstring_stream.h>
50 #include <snapwebsites/dbutils.h>
51 #include <snapwebsites/process.h>
52 #include <snapwebsites/snap_string_list.h>
53 
54 
55 // snapdev lib
56 //
57 #include <snapdev/tokenize_string.h>
58 
59 
60 // advgetopt lib
61 //
62 #include <advgetopt/advgetopt.h>
63 
64 
65 // C++ lib
66 //
67 #include <algorithm>
68 #include <iostream>
69 #include <sstream>
70 
71 
72 // openssl lib
73 //
74 #include <openssl/rand.h>
75 
76 
77 // last include
78 //
79 #include <snapdev/poison.h>
80 
81 
82 
117 namespace snaplock
118 {
119 
120 
121 namespace
122 {
123 
124 
125 
126 advgetopt::option const g_options[] =
127 {
128  {
129  'c',
130  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_REQUIRED | advgetopt::GETOPT_FLAG_SHOW_USAGE_ON_ERROR,
131  "config",
132  nullptr,
133  "Path to snaplock and other configuration files.",
134  nullptr
135  },
136  {
137  '\0',
138  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
139  "debug",
140  nullptr,
141  "Start the snaplock daemon in debug mode.",
142  nullptr
143  },
144  {
145  '\0',
146  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
147  "debug-lock-messages",
148  nullptr,
149  "Log all the lock messages received by snaplock.",
150  nullptr
151  },
152  {
153  '\0',
154  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
155  "list",
156  nullptr,
157  "List existing tickets and exits.",
158  nullptr
159  },
160  {
161  'l',
162  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_REQUIRED,
163  "logfile",
164  nullptr,
165  "Full path to the snaplock logfile.",
166  nullptr
167  },
168  {
169  'n',
170  advgetopt::GETOPT_FLAG_COMMAND_LINE | advgetopt::GETOPT_FLAG_ENVIRONMENT_VARIABLE | advgetopt::GETOPT_FLAG_FLAG,
171  "nolog",
172  nullptr,
173  "Only output to the console, not a log file.",
174  nullptr
175  },
176  {
177  '\0',
178  advgetopt::GETOPT_FLAG_END,
179  nullptr,
180  nullptr,
181  nullptr,
182  nullptr
183  }
184 };
185 
186 
187 
188 
189 // until we have C++20 remove warnings this way
190 #pragma GCC diagnostic push
191 #pragma GCC diagnostic ignored "-Wpedantic"
192 advgetopt::options_environment const g_options_environment =
193 {
194  .f_project_name = "snapwebsites",
195  .f_options = g_options,
196  .f_options_files_directory = nullptr,
197  .f_environment_variable_name = "SNAPLOCK_OPTIONS",
198  .f_configuration_files = nullptr,
199  .f_configuration_filename = nullptr,
200  .f_configuration_directories = nullptr,
201  .f_environment_flags = advgetopt::GETOPT_ENVIRONMENT_FLAG_PROCESS_SYSTEM_PARAMETERS,
202  .f_help_header = "Usage: %p [-<opt>]\n"
203  "where -<opt> is one or more of:",
204  .f_help_footer = "%c",
205  .f_version = SNAPLOCK_VERSION_STRING,
206  .f_license = "GNU GPL v2",
207  .f_copyright = "Copyright (c) 2013-"
208  BOOST_PP_STRINGIZE(UTC_BUILD_YEAR)
209  " by Made to Order Software Corporation -- All Rights Reserved",
210  //.f_build_date = __DATE__,
211  //.f_build_time = __TIME__
212 };
213 #pragma GCC diagnostic pop
214 
215 
216 }
217 // no name namespace
218 
219 
220 
221 
227 snap::dispatcher<snaplock>::dispatcher_match::vector_t const snaplock::g_snaplock_service_messages =
228 {
229  {
230  "ABSOLUTELY"
231  , &snaplock::msg_absolutely
232  },
233  {
234  "ACTIVATELOCK"
235  , &snaplock::msg_activate_lock
236  },
237  {
238  "ADDTICKET"
239  , &snaplock::msg_add_ticket
240  },
241  {
242  "CLUSTERUP"
243  , &snaplock::msg_cluster_up
244  },
245  {
246  "CLUSTERDOWN"
247  , &snaplock::msg_cluster_down
248  },
249  {
250  "DISCONNECTED"
251  , &snaplock::msg_server_gone
252  },
253  {
254  "DROPTICKET"
255  , &snaplock::msg_drop_ticket
256  },
257  {
258  "GETMAXTICKET"
259  , &snaplock::msg_get_max_ticket
260  },
261  {
262  "HANGUP"
263  , &snaplock::msg_server_gone
264  },
265  {
266  "LOCK"
267  , &snaplock::msg_lock
268  },
269  {
270  "LOCKACTIVATED"
271  , &snaplock::msg_lock_activated
272  },
273  {
274  "LOCKENTERED"
275  , &snaplock::msg_lock_entered
276  },
277  {
278  "LOCKENTERING"
279  , &snaplock::msg_lock_entering
280  },
281  {
282  "LOCKEXITING"
283  , &snaplock::msg_lock_exiting
284  },
285  {
286  "LOCKFAILED"
287  , &snaplock::msg_lock_failed
288  },
289  {
290  "LOCKLEADERS"
291  , &snaplock::msg_lock_leaders
292  },
293  {
294  "LOCKSTARTED"
295  , &snaplock::msg_lock_started
296  },
297  {
298  "LOCKSTATUS"
299  , &snaplock::msg_lock_status
300  },
301  {
302  "LOCKTICKETS"
303  , &snaplock::msg_lock_tickets
304  },
305  {
306  "LISTTICKETS"
307  , &snaplock::msg_list_tickets
308  },
309  {
310  "MAXTICKET"
311  , &snaplock::msg_max_ticket
312  },
313  {
314  "STATUS"
315  , &snaplock::msg_status
316  },
317  {
318  "TICKETADDED"
319  , &snaplock::msg_ticket_added
320  },
321  {
322  "TICKETREADY"
323  , &snaplock::msg_ticket_ready
324  },
325  {
326  "UNLOCK"
327  , &snaplock::msg_unlock
328  }
329 };
330 
331 
332 
333 
334 
335 
336 
338 {
339  // used for a remote computer, we'll eventually get a set_id() which
340  // defines the necessary computer parameters
341 }
342 
343 
344 snaplock::computer_t::computer_t(QString const & name, uint8_t priority)
345  : f_self(true)
346  , f_priority(priority)
347  , f_pid(getpid())
348  , f_name(name)
349 {
350  RAND_bytes(reinterpret_cast<unsigned char *>(&f_random_id), sizeof(f_random_id));
351 
352  snap::snap_config config("snapcommunicator");
353  f_ip_address = config["listen"];
354 }
355 
356 
357 bool snaplock::computer_t::is_self() const
358 {
359  return f_self;
360 }
361 
362 
363 void snaplock::computer_t::set_connected(bool connected)
364 {
365  f_connected = connected;
366 }
367 
368 
369 bool snaplock::computer_t::get_connected() const
370 {
371  return f_connected;
372 }
373 
374 
375 bool snaplock::computer_t::set_id(QString const & id)
376 {
377  if(f_priority != PRIORITY_UNDEFINED)
378  {
379  throw snaplock_exception_content_invalid_usage("computer_t::set_id() can't be called more than once or on this snaplock computer");
380  }
381 
382  snap::snap_string_list parts(id.split('|'));
383  if(parts.size() != 5)
384  {
385  // do not throw in case something changes we do not want snaplock to
386  // "crash" over and over again
387  //
388  SNAP_LOG_ERROR("received a computer id which does not have exactly 5 parts.");
389  return false;
390  }
391 
392  // base is VERY IMPORTANT for this one as we save priorities below ten
393  // as 0n (01 to 09) so the sort works as expected
394  //
395  bool ok(false);
396  f_priority = parts[0].toLong(&ok, 10);
397  if(!ok
398  || f_priority < PRIORITY_USER_MIN
399  || f_priority > PRIORITY_MAX)
400  {
401  SNAP_LOG_ERROR("priority is limited to a number between 0 and 15 inclusive.");
402  return false;
403  }
404 
405  f_random_id = parts[1].toULong(&ok, 10);
406 
407  f_ip_address = parts[2];
408  if(f_ip_address.isEmpty())
409  {
410  SNAP_LOG_ERROR("the process IP cannot be an empty string.");
411  return false;
412  }
413 
414  f_pid = parts[3].toLong(&ok, 10);
415  if(!ok || f_pid < 1 || f_pid > snap::process::get_pid_max())
416  {
417  SNAP_LOG_ERROR("a process identifier is 15 bits so ")(f_pid)(" does not look valid (0 is also not accepted).");
418  return false;
419  }
420 
421  f_name = parts[4];
422  if(f_name.isEmpty())
423  {
424  SNAP_LOG_ERROR("the server name in the lockid cannot be empty.");
425  return false;
426  }
427 
428  f_id = id;
429 
430  return true;
431 }
432 
433 
434 snaplock::computer_t::priority_t snaplock::computer_t::get_priority() const
435 {
436  return f_priority;
437 }
438 
439 
440 void snaplock::computer_t::set_start_time(time_t start_time)
441 {
442  f_start_time = start_time;
443 }
444 
445 
446 time_t snaplock::computer_t::get_start_time() const
447 {
448  return f_start_time;
449 }
450 
451 
452 QString const & snaplock::computer_t::get_name() const
453 {
454  return f_name;
455 }
456 
457 
458 QString const & snaplock::computer_t::get_id() const
459 {
460  if(f_id.isEmpty())
461  {
462  if(f_priority == PRIORITY_UNDEFINED)
463  {
464  throw snaplock_exception_content_invalid_usage("computer_t::get_id() can't be called when the priority is not defined");
465  }
466  if(f_ip_address.isEmpty())
467  {
468  throw snaplock_exception_content_invalid_usage("computer_t::get_id() can't be called when the address is empty");
469  }
470  if(f_pid == 0)
471  {
472  throw snaplock_exception_content_invalid_usage("computer_t::get_id() can't be called when the pid is not defined");
473  }
474 
475  f_id = QString("%1|%2|%3|%4|%5")
476  .arg(f_priority, 2, 10, QChar('0'))
477  .arg(f_random_id)
478  .arg(f_ip_address)
479  .arg(f_pid)
480  .arg(f_name);
481  }
482 
483  return f_id;
484 }
485 
486 
487 QString const & snaplock::computer_t::get_ip_address() const
488 {
489  return f_ip_address;
490 }
491 
492 
493 
494 
495 
496 
497 
498 
499 
500 
501 
502 
503 
504 
596 snaplock::snaplock(int argc, char * argv[])
597  : dispatcher(this, g_snaplock_service_messages)
598  , f_opt(g_options_environment, argc, argv)
599  , f_config("snaplock")
600 {
601  add_snap_communicator_commands();
602 
603  // read the configuration file
604  //
605  if(f_opt.is_defined("config"))
606  {
607  f_config.set_configuration_path(f_opt.get_string("config"));
608  }
609 
610  // --debug
611  f_debug = f_opt.is_defined("debug");
612 
613  // --debug-lock-messages
614  f_debug_lock_messages = f_opt.is_defined("debug-lock-messages") // command line
615  || f_config.has_parameter("debug_lock_messages"); // .conf file
616 
617  // set message trace mode if debug-lock-messages is defined
618  //
619  if(f_debug_lock_messages)
620  {
621  set_trace();
622  }
623 
624  // get the server name using the library function
625  //
626  // TODO: if the name of the server is changed, we should reboot, but
627  // to the minimum we need to restart snaplock (among other daemons)
628  // remember that snapmanager.cgi gives you that option
629  //
630  f_server_name = QString::fromUtf8(snap::server::get_server_name().c_str());
631 #ifdef _DEBUG
632  // to debug multiple snaplock on the same server each instance needs to
633  // have a different server name
634  //
635  if(f_config.has_parameter("server_name"))
636  {
637  f_server_name = f_config["server_name"];
638  }
639 #endif
640 
641  // local_listen=... -- from snapcommnicator.conf
642  //
643  tcp_client_server::get_addr_port(QString::fromUtf8(f_config("snapcommunicator", "local_listen").c_str()), f_communicator_addr, f_communicator_port, "tcp");
644 
645  // setup the logger: --nolog, --logfile, or config file log_config
646  //
647  if(f_opt.is_defined("nolog"))
648  {
649  snap::logging::configure_console();
650  }
651  else if(f_opt.is_defined("logfile"))
652  {
653  snap::logging::configure_logfile(QString::fromUtf8(f_opt.get_string("logfile").c_str()));
654  }
655  else
656  {
657  if(f_config.has_parameter("log_config"))
658  {
659  // use .conf definition when available
660  //
661  f_log_conf = f_config["log_config"];
662  }
663  snap::logging::configure_conffile(f_log_conf);
664  }
665 
666  if(f_debug)
667  {
668  // Force the logger level to DEBUG
669  // (unless already lower)
670  //
671  snap::logging::reduce_log_output_level( snap::logging::log_level_t::LOG_LEVEL_DEBUG );
672  }
673 
674 #ifdef _DEBUG
675  // for test purposes (i.e. to run any number of snaplock on a single
676  // computer) we allow the administrator to change the name of the
677  // server, but only in a debug version
678  //
679  if(f_config.has_parameter("service_name"))
680  {
681  f_service_name = f_config["service_name"];
682  }
683 #endif
684 
685  int64_t priority = computer_t::PRIORITY_DEFAULT;
686  if(f_opt.is_defined("candidate-priority"))
687  {
688  std::string const candidate_priority(f_opt.get_string("candidate-priority"));
689  if(candidate_priority == "off")
690  {
691  priority = computer_t::PRIORITY_OFF;
692  }
693  else
694  {
695  priority = f_opt.get_long("candidate-priority"
696  , 0
697  , computer_t::PRIORITY_USER_MIN
698  , computer_t::PRIORITY_MAX);
699  }
700  }
701  else if(f_config.has_parameter("candidate_priority"))
702  {
703  QString const candidate_priority(f_config["candidate_priority"]);
704  if(candidate_priority == "off")
705  {
706  // a priority 15 means that this computer is not a candidate
707  // at all (useful for nodes that get dynamically added
708  // and removed--i.e. avoid re-election each time that happens.)
709  //
710  priority = computer_t::PRIORITY_OFF;
711  }
712  else
713  {
714  bool ok(false);
715  priority = candidate_priority.toLong(&ok, 10);
716  if(!ok)
717  {
718  SNAP_LOG_FATAL("invalid candidate_priority, a valid decimal number was expected instead of \"")(candidate_priority)("\".");
719  exit(1);
720  }
721  if(priority < computer_t::PRIORITY_USER_MIN
722  || priority > computer_t::PRIORITY_MAX)
723  {
724  SNAP_LOG_FATAL("candidate_priority must be between 1 and 15, \"")(candidate_priority)("\" is not valid.");
725  exit(1);
726  }
727  }
728  }
729 
730  // make sure there are no standalone parameters
731  //
732  if(f_opt.is_defined("--"))
733  {
734  SNAP_LOG_FATAL("unexpected parameters found on snaplock daemon command line.");
735  std::cerr << "error: unexpected parameter found on snaplock daemon command line." << std::endl;
736  std::cerr << f_opt.usage(advgetopt::GETOPT_FLAG_SHOW_USAGE_ON_ERROR);
737  exit(1);
738  snap::NOTREACHED();
739  }
740 
741  f_start_time = time(nullptr);
742 
743  // add ourselves to the list of computers
744  //
745  // mark ourselves as connected, obviously
746  //
747  // as a side effect: it generates our identifier
748  //
749  f_computers[f_server_name] = std::make_shared<computer_t>(f_server_name, priority);
750  f_computers[f_server_name]->set_start_time(f_start_time);
751  f_computers[f_server_name]->set_connected(true);
752  f_my_id = f_computers[f_server_name]->get_id();
753  f_my_ip_address = f_computers[f_server_name]->get_ip_address();
754 }
755 
756 
762 snaplock::~snaplock()
763 {
764 }
765 
766 
773 void snaplock::run()
774 {
775  // Stop on these signals, log them, then terminate.
776  //
777  signal( SIGSEGV, snaplock::sighandler );
778  signal( SIGBUS, snaplock::sighandler );
779  signal( SIGFPE, snaplock::sighandler );
780  signal( SIGILL, snaplock::sighandler );
781  signal( SIGTERM, snaplock::sighandler );
782  signal( SIGINT, snaplock::sighandler );
783  signal( SIGQUIT, snaplock::sighandler );
784 
785  // Continue, but let us know by adding one line to the logs
786  //
787  signal( SIGPIPE, snaplock::sigloghandler );
788 
789  // ignore console signals
790  //
791  signal( SIGTSTP, SIG_IGN );
792  signal( SIGTTIN, SIG_IGN );
793  signal( SIGTTOU, SIG_IGN );
794 
795  // initialize the communicator and its connections
796  //
797  f_communicator = snap::snap_communicator::instance();
798 
799  // capture Ctrl-C (SIGINT)
800  //
801  f_interrupt.reset(new snaplock_interrupt(this));
802  f_communicator->add_connection(f_interrupt);
803 
804  // timer so we can timeout locks
805  //
806  f_timer.reset(new snaplock_timer(this));
807  f_communicator->add_connection(f_timer);
808 
809  // capture SIGUSR1 to print out information
810  //
811  f_info.reset(new snaplock_info(this));
812  f_communicator->add_connection(f_info);
813 
814  // capture SIGUSR2 to print out information
815  //
816  f_debug_info.reset(new snaplock_debug_info(this));
817  f_communicator->add_connection(f_debug_info);
818 
819  // create a messenger to communicate with the Snap Communicator process
820  // and other services as required
821  //
822  if(f_opt.is_defined("list"))
823  {
824  snap::logging::set_log_output_level(snap::logging::log_level_t::LOG_LEVEL_ERROR);
825 
826  // in this case create a snaplock_tool() which means most messages
827  // are not going to function; and once ready, it will execute the
828  // function specified on the command line such as --list
829  //
830  f_service_name = "snaplocktool";
831  f_messenger.reset(new snaplock_tool(this, f_communicator_addr.toUtf8().data(), f_communicator_port));
832  }
833  else
834  {
835  SNAP_LOG_INFO("--------------------------------- snaplock started.");
836 
837  f_messenger.reset(new snaplock_messenger(this, f_communicator_addr.toUtf8().data(), f_communicator_port));
838  f_messenger->set_dispatcher(shared_from_this());
839  }
840  f_communicator->add_connection(f_messenger);
841 
842  // now run our listening loop
843  //
844  f_communicator->run();
845 }
846 
847 
862 void snaplock::sighandler(int sig)
863 {
864  QString signame;
865  bool show_stack(true);
866  switch(sig)
867  {
868  case SIGSEGV:
869  signame = "SIGSEGV";
870  break;
871 
872  case SIGBUS:
873  signame = "SIGBUS";
874  break;
875 
876  case SIGFPE:
877  signame = "SIGFPE";
878  break;
879 
880  case SIGILL:
881  signame = "SIGILL";
882  break;
883 
884  case SIGTERM:
885  signame = "SIGTERM";
886  show_stack = false;
887  break;
888 
889  case SIGINT:
890  signame = "SIGINT";
891  show_stack = false;
892  break;
893 
894  case SIGQUIT:
895  signame = "SIGQUIT";
896  show_stack = false;
897  break;
898 
899  default:
900  signame = "UNKNOWN";
901  break;
902 
903  }
904 
905  if(show_stack)
906  {
907  snap::snap_exception_base::output_stack_trace();
908  }
909 
910  SNAP_LOG_FATAL("Fatal signal caught: ")(signame);
911 
912  // Exit with error status
913  //
914  ::exit(1);
915  snap::NOTREACHED();
916 }
917 
918 
919 void snaplock::sigloghandler(int sig)
920 {
921  std::string signame;
922 
923  switch(sig)
924  {
925  case SIGPIPE:
926  signame = "SIGPIPE";
927  break;
928 
929  default:
930  signame = "UNKNOWN";
931  break;
932 
933  }
934 
935  SNAP_LOG_WARNING("POSIX signal caught: ")(signame);
936 
937  // in this case we return because we want the process to continue
938  //
939  return;
940 }
941 
942 
943 
944 
945 
958 bool snaplock::send_message(snap::snap_communicator_message const & message, bool cache)
959 {
960  return f_messenger->send_message(message, cache);
961 }
962 
963 
973 int snaplock::get_computer_count() const
974 {
975  return f_computers.size();
976 }
977 
978 
1003 int snaplock::quorum() const
1004 {
1005  return f_computers.size() / 2 + 1;
1006 }
1007 
1008 
1019 QString const & snaplock::get_server_name() const
1020 {
1021  return f_server_name;
1022 }
1023 
1024 
1036 bool snaplock::is_ready() const
1037 {
1038  // without at least one leader we are definitely not ready
1039  //
1040  if(f_leaders.empty())
1041  {
1042  SNAP_LOG_TRACE("not considered ready: no leaders.");
1043  return false;
1044  }
1045 
1046  // enough leaders for that cluster?
1047  //
1048  // we consider that having at least 2 leaders is valid because locks
1049  // will still work, an election should be happening when we lose a
1050  // leader fixing that temporary state
1051  //
1052  // the test below allows for the case where we have a single computer
1053  // too (i.e. "one neighbor")
1054  //
1055  // notice how not having received the CLUSTERUP would be taken in
1056  // account here since f_neighbors_count will still be 0 in that case
1057  // (however, the previous empty() test already take that in account)
1058  //
1059  if(f_leaders.size() == 1
1060  && f_neighbors_count != 1)
1061  {
1062  SNAP_LOG_TRACE("not considered ready: no enough leaders for this cluster.");
1063  return false;
1064  }
1065 
1066  // the election_status() function verifies that the quorum is
1067  // attained, but it can change if the cluster grows or shrinks
1068  // so we have to check here again as the lock system becomes
1069  // "unready" when the quorum is lost; see that other function
1070  // for additional info
1071 
1072  // this one probably looks complicated...
1073  //
1074  // if our quorum is 1 or 2 then we need a number of computers
1075  // equal to the total number of computers (i.e. a CLUSTERCOMPLETE
1076  // status which we compute here)
1077  //
1078  if(f_neighbors_quorum < 3
1079  && f_computers.size() < f_neighbors_count)
1080  {
1081  SNAP_LOG_TRACE("not considered ready: quorum changed, re-election expected soon.");
1082  return false;
1083  }
1084 
1085  // the neighbors count & quorum can change over time so
1086  // we have to verify that the number of computers is
1087  // still acceptable here
1088  //
1089  if(f_computers.size() < f_neighbors_quorum)
1090  {
1091  SNAP_LOG_TRACE("not considered ready: quorum lost, re-election expected soon.");
1092  return false;
1093  }
1094 
1095  // are all leaders connected to us?
1096  //
1097  for(auto const & l : f_leaders)
1098  {
1099  if(!l->get_connected())
1100  {
1101  SNAP_LOG_TRACE("not considered ready: no direct connection with leader: \"")
1102  (l->get_name())
1103  ("\".");
1104 
1105  // attempt resending a LOCKSTARTED because it could be that it
1106  // did not work quite right and the snaplock daemons are not
1107  // going to ever talk with each others otherwise
1108  //
1109  // we also make sure we do not send the message too many times,
1110  // in five seconds it should be resolved...
1111  //
1112  time_t const now(time(nullptr));
1113  if(now > f_pace_lockstarted)
1114  {
1115  // pause for 5 to 6 seconds in case this happens a lot
1116  //
1117  f_pace_lockstarted = now + 5;
1118 
1119  // only send it to that specific server snaplock daemon
1120  //
1121  snap::snap_communicator_message temporary_message;
1122  temporary_message.set_sent_from_server(l->get_name());
1123  temporary_message.set_sent_from_service("snaplock");
1124  const_cast<snaplock *>(this)->send_lockstarted(&temporary_message);
1125  }
1126 
1127  return false;
1128  }
1129  }
1130 
1131  // it looks like we are ready
1132  //
1133  return true;
1134 }
1135 
1136 
1163 snaplock::computer_t::pointer_t snaplock::is_leader(QString id) const
1164 {
1165  if(id.isEmpty())
1166  {
1167  id = f_my_id;
1168  }
1169 
1170  for(auto const & l : f_leaders)
1171  {
1172  if(l->get_id() == id)
1173  {
1174  return l;
1175  }
1176  }
1177 
1178  return computer_t::pointer_t();
1179 }
1180 
1181 
1182 snaplock::computer_t::pointer_t snaplock::get_leader_a() const
1183 {
1184 #ifdef _DEBUG
1185  if(!is_leader())
1186  {
1187  throw snaplock_exception_content_invalid_usage("snaplock::get_leader_a(): only a leader can call this function.");
1188  }
1189 #endif
1190 
1191  switch(f_leaders.size())
1192  {
1193  case 0:
1194  default:
1195  throw snaplock_exception_content_invalid_usage("snaplock::get_leader_a(): call this function only when leaders were elected.");
1196 
1197  case 1:
1198  return computer_t::pointer_t(nullptr);
1199 
1200  case 2:
1201  case 3:
1202  return f_leaders[f_leaders[0]->is_self() ? 1 : 0];
1203 
1204  }
1205 }
1206 
1207 
1208 snaplock::computer_t::pointer_t snaplock::get_leader_b() const
1209 {
1210 #ifdef _DEBUG
1211  if(!is_leader())
1212  {
1213  throw snaplock_exception_content_invalid_usage("snaplock::get_leader_b(): only a leader can call this function.");
1214  }
1215 #endif
1216 
1217  switch(f_leaders.size())
1218  {
1219  case 0:
1220  default:
1221  throw snaplock_exception_content_invalid_usage("snaplock::get_leader_b(): call this function only when leaders were elected.");
1222 
1223  case 1:
1224  case 2: // we have a leader A but no leader B when we have only 2 leaders
1225  return computer_t::pointer_t(nullptr);
1226 
1227  case 3:
1228  return f_leaders[f_leaders[2]->is_self() ? 1 : 2];
1229 
1230  }
1231 }
1232 
1233 
1234 
1243 void snaplock::info()
1244 {
1245  SNAP_LOG_INFO("++++++++ SNAPLOCK INFO ++++++++");
1246  SNAP_LOG_INFO("My leader ID: ")(f_my_id);
1247  SNAP_LOG_INFO("My IP address: ")(f_my_ip_address);
1248  SNAP_LOG_INFO("Total number of computers: ")(f_neighbors_count)(" (quorum: ")(f_neighbors_quorum)(", leaders: ")(f_leaders.size())(")");
1249  SNAP_LOG_INFO("Known computers: ")(f_computers.size());
1250  for(auto const & c : f_computers)
1251  {
1252  auto const it(std::find_if(
1253  f_leaders.begin()
1254  , f_leaders.end()
1255  , [&c](auto const & l)
1256  {
1257  return c.second == l;
1258  }));
1259  QString leader;
1260  if(it != f_leaders.end())
1261  {
1262  leader = QString(" (LEADER #%1)").arg(it - f_leaders.begin());
1263  }
1264  SNAP_LOG_INFO(" -- Computer Name: ")(c.second->get_name())(leader);
1265  SNAP_LOG_INFO(" -- Computer ID: ")(c.second->get_id());
1266  SNAP_LOG_INFO(" -- Computer IP Address: ")(c.second->get_ip_address());
1267  }
1268 
1269 }
1270 
1271 
1272 void snaplock::debug_info()
1273 {
1274 #ifdef _DEBUG
1275 SNAP_LOG_TRACE("++++ serialized tickets in debug_info(): ")(serialized_tickets().replace("\n", " --- "));
1276  //if(f_computers.size() != 100)
1277  //{
1278  // SNAP_LOG_INFO("++++ COMPUTER ")(f_communicator_port)(" is not fully connected to all computers?");
1279  //}
1280  //if(f_leaders.size() != 3)
1281  //{
1282  // SNAP_LOG_INFO("++++ COMPUTER ")(f_communicator_port)(" does not (yet?) have 3 leaders");
1283  //}
1284  //else
1285  //{
1286  // SNAP_LOG_INFO(" -- ")(f_leaders[0]->get_name())(" + ")(f_leaders[1]->get_name())(" + ")(f_leaders[2]->get_name());
1287  //}
1288 #else
1289  SNAP_LOG_INFO("this version of snaplock is not a debug version. The debug_info() function does nothing in this version.");
1290 #endif
1291 }
1292 
1293 
1294 
1295 
1304 void snaplock::msg_list_tickets(snap::snap_communicator_message & message)
1305 {
1306  QString ticketlist;
1307  for(auto const & obj_ticket : f_tickets)
1308  {
1309  for(auto const & key_ticket : obj_ticket.second)
1310  {
1311  QString const & obj_name(key_ticket.second->get_object_name());
1312  QString const & key(key_ticket.second->get_entering_key());
1313  snaplock_ticket::ticket_id_t const ticket_id(key_ticket.second->get_ticket_number());
1314  time_t const lock_timeout(key_ticket.second->get_lock_timeout());
1315 
1316  QString timeout_msg;
1317  if(lock_timeout == 0)
1318  {
1319  time_t const obtention_timeout(key_ticket.second->get_obtention_timeout());
1320  timeout_msg = QString("obtention %1 %2")
1321  .arg(snap::snap_child::date_to_string(obtention_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_SHORT))
1322  .arg(snap::snap_child::date_to_string(obtention_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_TIME));
1323  }
1324  else
1325  {
1326  timeout_msg = QString("timeout %1 %2")
1327  .arg(snap::snap_child::date_to_string(lock_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_SHORT))
1328  .arg(snap::snap_child::date_to_string(lock_timeout * 1000000LL, snap::snap_child::date_format_t::DATE_FORMAT_TIME));
1329  }
1330 
1331  QString const msg(QString("ticket id: %1 object name: \"%2\" key: %3 %4\n")
1332  .arg(ticket_id)
1333  .arg(obj_name)
1334  .arg(key)
1335  .arg(timeout_msg));
1336  ticketlist += msg;
1337  }
1338  }
1339  snap::snap_communicator_message list_message;
1340  list_message.set_command("TICKETLIST");
1341  list_message.reply_to(message);
1342  list_message.add_parameter("list", ticketlist);
1343  send_message(list_message);
1344 }
1345 
1346 
1355 void snaplock::ready(snap::snap_communicator_message & message)
1356 {
1357  snap::NOTUSED(message);
1358 
1359  snap::snap_communicator_message clusterstatus_message;
1360  clusterstatus_message.set_command("CLUSTERSTATUS");
1361  clusterstatus_message.set_service("snapcommunicator");
1362  send_message(clusterstatus_message);
1363 }
1364 
1365 
1366 void snaplock::msg_cluster_up(snap::snap_communicator_message & message)
1367 {
1368  f_neighbors_count = message.get_integer_parameter("neighbors_count");
1369  f_neighbors_quorum = f_neighbors_count / 2 + 1;
1370 
1371  SNAP_LOG_INFO("cluster is up with ")
1372  (f_neighbors_count)
1373  (" neightbors, attempt an election then check for leaders by sending a LOCKSTARTED message.");
1374 
1375  election_status();
1376 
1377  send_lockstarted(nullptr);
1378 }
1379 
1380 
1381 void snaplock::msg_cluster_down(snap::snap_communicator_message & message)
1382 {
1383  snap::NOTUSED(message);
1384 
1385  // there is nothing to do here, when the cluster comes back up the
1386  // snapcommunicator will automatically send us a signal about it
1387 
1388  SNAP_LOG_INFO("cluster is down, canceling existing locks and we have to refuse any further lock requests for a while.");
1389 
1390  // in this case we just cannot keep the leaders
1391  //
1392  f_leaders.clear();
1393 
1394  // in case services listen to the NOLOCK, let them know it's gone
1395  //
1396  check_lock_status();
1397 
1398  // we do not call the lockgone() because the HANGUP will be sent
1399  // if required so we do not have to do that twice
1400 }
1401 
1402 
1403 void snaplock::election_status()
1404 {
1405  // we already have election results?
1406  //
1407  if(!f_leaders.empty())
1408  {
1409  // the results may have been "temperred" with (i.e. one of
1410  // the leaders was lost)
1411  //
1412  if(f_leaders.size() == 3
1413  || (f_neighbors_count < 3 && f_leaders.size() == f_neighbors_count))
1414  {
1415  // this could have changed since we may get the list of
1416  // leaders with some of those leaders set to "disabled"
1417  //
1418  check_lock_status();
1419  return;
1420  }
1421  }
1422 
1423  // neighbors count is 0 until we receive a very first CLUSTERUP
1424  // (note that it does not go back to zero on CLUSTERDOWN, however,
1425  // the quorum as checked in the next if() is never going to be
1426  // reached if the cluster is down.)
1427  //
1428  if(f_neighbors_count == 0)
1429  {
1430  return;
1431  }
1432 
1433  // this one probably looks complicated...
1434  //
1435  // if our quorum is 1 or 2 then we need a number of computers
1436  // equal to the total number of computers (i.e. a CLUSTERCOMPLETE
1437  // status which we compute here)
1438  //
1439  if(f_neighbors_quorum < 3
1440  && f_computers.size() < f_neighbors_count)
1441  {
1442  return;
1443  }
1444 
1445  // since the neighbors count & quorum never go back to zero (on a
1446  // CLUSTERDOWN) we have to verify that the number of computers is
1447  // acceptable here
1448  //
1449  // Note: further we will not count computers marked disabled, which
1450  // is done below when sorting by ID, however, that does not
1451  // prevent the quorum to be attained, even with disabled
1452  // computers
1453  //
1454  if(f_computers.size() < f_neighbors_quorum)
1455  {
1456  return;
1457  }
1458 
1459  // to proceed with an election we must have the smallest IP address
1460  // (it is not absolutely required, but that way we avoid many
1461  // consensus problems, in effect we have one "temporary-leader" that ends
1462  // up telling us who the final three leaders are.)
1463  //
1464  for(auto & c : f_computers)
1465  {
1466  // Note: the test fails when we compare to ourselves so we do not
1467  // need any special case
1468  //
1469  if(c.second->get_ip_address() < f_my_ip_address)
1470  {
1471  return;
1472  }
1473  }
1474 
1475  // to select the leaders sort them by identifier and take the first
1476  // three (i.e. lower priority, random, IP, pid.)
1477  //
1478  int off(0);
1479  computer_t::map_t sort_by_id;
1480  for(auto c : f_computers)
1481  {
1482  // ignore nodes with a priority of 15 (i.e. OFF)
1483  //
1484  if(c.second->get_priority() != computer_t::PRIORITY_OFF)
1485  {
1486  QString id(c.second->get_id());
1487 
1488  // is this computer a leader?
1489  //
1490  auto it(std::find(f_leaders.begin(), f_leaders.end(), c.second));
1491  if(it != f_leaders.end())
1492  {
1493  // leaders have a priority of 00
1494  //
1495  id[0] = '0';
1496  id[1] = '0';
1497  }
1498 
1499  sort_by_id[id] = c.second;
1500  }
1501  else
1502  {
1503  ++off;
1504  }
1505  }
1506 
1507  if(f_computers.size() <= 3)
1508  {
1509  if(off != 0)
1510  {
1511  SNAP_LOG_FATAL(
1512  "you cannot have any computer turned OFF when you"
1513  " have three or less computers total in your cluster."
1514  " The elections cannot be completed in these"
1515  " circumstances.");
1516  return;
1517  }
1518  }
1519  else if(f_computers.size() - off < 3)
1520  {
1521  SNAP_LOG_FATAL("you have a total of ")
1522  (f_computers.size())
1523  (" computers in your cluster. You turned off ")
1524  (off)
1525  (" of them, which means less than three are left"
1526  " as candidates for leadership which is not enough."
1527  " You can have a maximum of ")
1528  (f_computers.size() - 3)
1529  (" that are turned off on this cluster.");
1530  return;
1531  }
1532 
1533  if(sort_by_id.size() < 3
1534  && sort_by_id.size() != f_computers.size())
1535  {
1536  return;
1537  }
1538 
1539 //std::cerr << f_communicator_port << " is conducting an election:\n";
1540 //for(auto s : sort_by_id)
1541 //{
1542 //std::cerr << " " << s.second->get_name() << " " << s.first << "\n";
1543 //}
1544 
1545  // the first three are the new leaders
1546  //
1547  snap::snap_communicator_message lockleaders_message;
1548  lockleaders_message.set_command("LOCKLEADERS");
1549  lockleaders_message.set_service("*");
1550  f_leaders.clear();
1551  f_election_date = snap::snap_child::get_current_date();
1552  lockleaders_message.add_parameter("election_date", f_election_date);
1553  auto leader(sort_by_id.begin());
1554  size_t const max(std::min(static_cast<computer_t::map_t::size_type>(3), sort_by_id.size()));
1555  for(size_t idx(0); idx < max; ++idx, ++leader)
1556  {
1557  lockleaders_message.add_parameter(QString("leader%1").arg(idx), leader->second->get_id());
1558  f_leaders.push_back(leader->second);
1559  }
1560  send_message(lockleaders_message);
1561 
1562 SNAP_LOG_WARNING("election status = add leader(s)... ")(f_computers.size())(" comps and ")(f_leaders.size())(" leaders");
1563 
1564  // when the election succeeded we may have to send LOCK messages
1565  // assuming some were cached and did not yet time out
1566  //
1567  check_lock_status();
1568 }
1569 
1570 
1571 void snaplock::check_lock_status()
1572 {
1573  bool const ready(is_ready());
1574  QString const current_status(ready ? "LOCKREADY" : "NOLOCK");
1575 
1576  if(f_lock_status != current_status)
1577  {
1578  f_lock_status = current_status;
1579 
1580  snap::snap_communicator_message status_message;
1581  status_message.set_command(current_status);
1582  status_message.set_service(".");
1583  status_message.add_parameter("cache", "no");
1584  send_message(status_message);
1585 
1586  if(ready
1587  && !f_message_cache.empty())
1588  {
1589  // we still have a cache of locks that can now be processed
1590  //
1591  // note:
1592  // although msg_lock() could re-add some of those messages
1593  // in the f_message_cache vector, it should not since it
1594  // calls the same is_read() function which we know returns
1595  // true and therefore no cache is required
1596  //
1598  cache.swap(f_message_cache);
1599  for(auto mc : cache)
1600  {
1601  msg_lock(mc.f_message);
1602  }
1603  }
1604  }
1605 }
1606 
1607 
1608 void snaplock::send_lockstarted(snap::snap_communicator_message const * message)
1609 {
1610  // tell other snaplock instances that are already listening that
1611  // we are ready; this way we can calculate the number of computers
1612  // available in our network and use that to calculate the QUORUM
1613  //
1614  snap::snap_communicator_message lockstarted_message;
1615  lockstarted_message.set_command("LOCKSTARTED");
1616  if(message == nullptr)
1617  {
1618  lockstarted_message.set_service("*");
1619 
1620  // unfortunately, the following does NOT work as expected...
1621  // (i.e. the following ends up sending the message to ourselves only
1622  // and does not forward to any remote communicators.)
1623  //
1624  //lockstarted_message.set_server("*");
1625  //lockstarted_message.set_service("snaplock");
1626  }
1627  else
1628  {
1629  lockstarted_message.reply_to(*message);
1630  }
1631 
1632  // our info: server name and id
1633  //
1634  lockstarted_message.add_parameter("server_name", f_server_name);
1635  lockstarted_message.add_parameter("lockid", f_my_id);
1636  lockstarted_message.add_parameter("starttime", f_start_time);
1637 
1638  // include the leaders if present
1639  //
1640  if(!f_leaders.empty())
1641  {
1642  lockstarted_message.add_parameter("election_date", f_election_date);
1643  for(size_t idx(0); idx < f_leaders.size(); ++idx)
1644  {
1645  lockstarted_message.add_parameter(QString("leader%1").arg(idx), f_leaders[idx]->get_id());
1646  }
1647  }
1648 
1649  send_message(lockstarted_message);
1650 }
1651 
1652 
1653 void snaplock::msg_lock_leaders(snap::snap_communicator_message & message)
1654 {
1655  f_election_date = message.get_integer_parameter("election_date");
1656 
1657  // save the new leaders in our own list
1658  //
1659  f_leaders.clear();
1660  for(int idx(0); idx < 3; ++idx)
1661  {
1662  QString const param_name(QString("leader%1").arg(idx));
1663  if(message.has_parameter(param_name))
1664  {
1665  computer_t::pointer_t leader(std::make_shared<computer_t>());
1666  QString const lockid(message.get_parameter(param_name));
1667  if(leader->set_id(lockid))
1668  {
1669  computer_t::map_t::iterator exists(f_computers.find(leader->get_name()));
1670  if(exists != f_computers.end())
1671  {
1672  // it already exists, use our existing instance
1673  //
1674  f_leaders.push_back(exists->second);
1675  }
1676  else
1677  {
1678  // we do not yet know of that computer, even though
1679  // it is a leader! (i.e. we are not yet aware that
1680  // somehow we are connected to it)
1681  //
1682  leader->set_connected(false);
1683  f_computers[leader->get_name()] = leader;
1684 
1685  f_leaders.push_back(leader);
1686  }
1687  }
1688  }
1689  }
1690 
1691  if(!f_leaders.empty())
1692  {
1693  synchronize_leaders();
1694 
1695  // set the round-robin position to a random value
1696  //
1697  // note: I know the result is likely skewed, c will be set to
1698  // a number between 0 and 255 and modulo 3 means that you get
1699  // one extra zero (255 % 3 == 0); however, there are 85 times
1700  // 3 in 255 so it probably won't be noticeable.
1701  //
1702  uint8_t c;
1703  RAND_bytes(reinterpret_cast<unsigned char *>(&c), sizeof(c));
1704  f_next_leader = c % f_leaders.size();
1705  }
1706 
1707  // the is_ready() function depends on having f_leaders defined
1708  // and when that happens we may need to empty our cache
1709  //
1710  check_lock_status();
1711 }
1712 
1713 
1724 void snaplock::msg_lock_started(snap::snap_communicator_message & message)
1725 {
1726  // get the server name (that other server telling us it is ready)
1727  //
1728  QString const server_name(message.get_parameter("server_name"));
1729  if(server_name.isEmpty())
1730  {
1731  // name missing
1732  //
1733  throw snap::snap_communicator_invalid_message("snaplock::msg_lockstarted(): Invalid server name (empty).");
1734  }
1735 
1736  // I do not think we would even message ourselves, but in case it happens
1737  // the rest of the function does not support that case well
1738  //
1739  if(server_name == f_server_name)
1740  {
1741  return;
1742  }
1743 
1744  time_t const start_time(message.get_integer_parameter("starttime"));
1745 
1746  computer_t::map_t::iterator it(f_computers.find(server_name));
1747  bool new_computer(it == f_computers.end());
1748  if(new_computer)
1749  {
1750  // create a computer instance so we know it exists
1751  //
1752  computer_t::pointer_t computer(std::make_shared<computer_t>());
1753 
1754  // fill the fields from the "lockid" parameter
1755  //
1756  if(!computer->set_id(message.get_parameter("lockid")))
1757  {
1758  // this is not a valid identifier, ignore altogether
1759  //
1760  return;
1761  }
1762  computer->set_start_time(start_time);
1763 
1764  f_computers[computer->get_name()] = computer;
1765  }
1766  else
1767  {
1768  if(!it->second->get_connected())
1769  {
1770  // we heard of this computer (because it is/was a leader) but
1771  // we had not yet received a LOCKSTARTED message from it; so here
1772  // we consider it a new computer and will reply to the LOCKSTARTED
1773  //
1774  new_computer = true;
1775  it->second->set_connected(true);
1776  }
1777 
1778  if(it->second->get_start_time() != start_time)
1779  {
1780  // when the start time changes that means snaplock
1781  // restarted which can happen without snapcommunicator
1782  // restarting so here we would not know about the feat
1783  // without this parameter and in this case it is very
1784  // much the same as a new computer so send it a
1785  // LOCKSTARTED message back!
1786  //
1787  new_computer = true;
1788  it->second->set_start_time(start_time);
1789  }
1790  }
1791 
1792  // keep the newest election results
1793  //
1794  if(message.has_parameter("election_date"))
1795  {
1796  int64_t const election_date(message.get_integer_parameter("election_date"));
1797  if(election_date > f_election_date)
1798  {
1799  f_election_date = election_date;
1800  f_leaders.clear();
1801  }
1802  }
1803 
1804  bool const set_my_leaders(f_leaders.empty());
1805  if(set_my_leaders)
1806  {
1807  for(int idx(0); idx < 3; ++idx)
1808  {
1809  QString const param_name(QString("leader%1").arg(idx));
1810  if(message.has_parameter(param_name))
1811  {
1812  computer_t::pointer_t leader(std::make_shared<computer_t>());
1813  QString const lockid(message.get_parameter(param_name));
1814  if(leader->set_id(lockid))
1815  {
1816  computer_t::map_t::iterator exists(f_computers.find(leader->get_name()));
1817  if(exists != f_computers.end())
1818  {
1819  // it already exists, use our existing instance
1820  //
1821  f_leaders.push_back(exists->second);
1822  }
1823  else
1824  {
1825  // we do not yet know of that computer, even though
1826  // it is a leader! (i.e. we are not yet aware that
1827  // somehow we are connected to it)
1828  //
1829  leader->set_connected(false);
1830  f_computers[leader->get_name()] = leader;
1831 
1832  f_leaders.push_back(leader);
1833  }
1834  }
1835  }
1836  }
1837  }
1838 
1839  election_status();
1840 
1841  if(new_computer)
1842  {
1843  // send a reply if that was a new computer
1844  //
1845  send_lockstarted(&message);
1846  }
1847 }
1848 
1849 
1862 void snaplock::msg_lock_status(snap::snap_communicator_message & message)
1863 {
1864  snap::snap_communicator_message status_message;
1865  status_message.set_command(is_ready() ? "LOCKREADY" : "NOLOCK");
1866  status_message.reply_to(message);
1867  status_message.add_parameter("cache", "no");
1868  send_message(status_message);
1869 }
1870 
1871 
1889 void snaplock::msg_lock_tickets(snap::snap_communicator_message & message)
1890 {
1891  QString const tickets(message.get_parameter("tickets"));
1892 
1893  // we have one ticket per line, so we first split per line and then
1894  // work on one line at a time
1895  //
1896  snap::snap_string_list const lines(tickets.split('\n'));
1897  for(auto const & l : lines)
1898  {
1900  snap::snap_string_list const vars(l.split('|'));
1901  auto object_name_value(std::find_if(
1902  vars.begin()
1903  , vars.end()
1904  , [](QString const & vv)
1905  {
1906  return vv.startsWith("object_name=");
1907  }));
1908  if(object_name_value != vars.end())
1909  {
1910  auto entering_key_value(std::find_if(
1911  vars.begin()
1912  , vars.end()
1913  , [](QString const & vv)
1914  {
1915  return vv.startsWith("entering_key=");
1916  }));
1917  if(entering_key_value != vars.end())
1918  {
1919  // extract the values which start after the '=' sign
1920  //
1921  QString const object_name(object_name_value->mid(12));
1922  QString const entering_key(entering_key_value->mid(13));
1923 
1924  auto entering_ticket(f_entering_tickets.find(object_name));
1925  if(entering_ticket != f_entering_tickets.end())
1926  {
1927  auto key_ticket(entering_ticket->second.find(entering_key));
1928  if(key_ticket != entering_ticket->second.end())
1929  {
1930  ticket = key_ticket->second;
1931  }
1932  }
1933  if(ticket == nullptr)
1934  {
1935  auto obj_ticket(f_tickets.find(object_name));
1936  if(obj_ticket != f_tickets.end())
1937  {
1938  auto key_ticket(std::find_if(
1939  obj_ticket->second.begin()
1940  , obj_ticket->second.end()
1941  , [&entering_key](auto const & t)
1942  {
1943  return t.second->get_entering_key() == entering_key;
1944  }));
1945  if(key_ticket != obj_ticket->second.end())
1946  {
1947  ticket = key_ticket->second;
1948  }
1949  }
1950  }
1951 
1952  // ticket exists? if not create a new one
1953  //
1954  bool const new_ticket(ticket == nullptr);
1955  if(new_ticket)
1956  {
1957  // creaet a new ticket, some of the parameters are there just
1958  // because they are required; they will be replaced by the
1959  // unserialize call...
1960  //
1961  ticket = std::make_shared<snaplock_ticket>(
1962  this
1963  , f_messenger
1964  , object_name
1965  , entering_key
1966  , snap::snap_lock::SNAP_LOCK_DEFAULT_TIMEOUT + time(nullptr)
1967  , snap::snap_lock::SNAP_LOCK_DEFAULT_TIMEOUT
1968  , f_server_name
1969  , "snaplock");
1970  }
1971 
1972  ticket->unserialize(l);
1973 
1974  // do a couple of additional sanity tests to
1975  // make sure that we want to keep new tickets
1976  //
1977  // first make sure it is marked as "locked"
1978  //
1979  // second check that the owner is a leader that
1980  // exists (the sender uses a LOCK message for
1981  // locks that are not yet locked or require
1982  // a new owner)
1983  //
1984  if(new_ticket
1985  && ticket->is_locked())
1986  {
1987  auto li(std::find_if(
1988  f_leaders.begin()
1989  , f_leaders.end()
1990  , [&ticket](auto const & c)
1991  {
1992  return ticket->get_owner() == c->get_name();
1993  }));
1994  if(li != f_leaders.end())
1995  {
1996  f_tickets[object_name][ticket->get_ticket_key()] = ticket;
1997  }
1998  }
1999  }
2000  }
2001  }
2002 }
2003 
2004 
2014 void snaplock::msg_status(snap::snap_communicator_message & message)
2015 {
2016  // check the service name, it has to be one that means it is a remote
2017  // connection with another snapcommunicator
2018  //
2019  QString const service(message.get_parameter("service"));
2020  if(service == "remote connection" // remote host connected to us
2021  || service == "remote communicator connection") // we connected to remote host
2022  {
2023  // check what the status is now: "up" or "down"
2024  //
2025  QString const status(message.get_parameter("status"));
2026  if(status == "up")
2027  {
2028  // we already broadcast a LOCKSTARTED from CLUSTERUP
2029  // and that's enough
2030  //
2031  }
2032  else
2033  {
2034  // host is down, remove from our list of hosts
2035  //
2036  msg_server_gone(message);
2037  }
2038  }
2039 }
2040 
2041 
2055 void snaplock::msg_server_gone(snap::snap_communicator_message & message)
2056 {
2057  // was it a snaplock service at least?
2058  //
2059  QString const server_name(message.get_parameter("server_name"));
2060  if(server_name.isEmpty()
2061  || server_name == f_server_name)
2062  {
2063  // we never want to remove ourselves?!
2064  //
2065  return;
2066  }
2067 
2068  // is "server_name" known?
2069  //
2070  auto it(f_computers.find(server_name));
2071  if(it == f_computers.end())
2072  {
2073  // no computer found, nothing else to do here
2074  //
2075  return;
2076  }
2077 
2078  // got it, remove it
2079  //
2080  f_computers.erase(it);
2081 
2082  // is that computer a leader?
2083  //
2084  auto li(std::find(
2085  f_leaders.begin()
2086  , f_leaders.end()
2087  , it->second));
2088  if(li != f_leaders.end())
2089  {
2090  f_leaders.erase(li);
2091 
2092  // elect another computer in case the one we just erased was a leader
2093  //
2094  // (of course, no elections occur unless we are the computer with the
2095  // smallest IP address)
2096  //
2097  election_status();
2098 
2099  // if too many leaders were dropped, we may go back to the NOLOCK status
2100  //
2101  // we only send a NOLOCK if the election could not re-assign another
2102  // computer as the missing leader(s)
2103  //
2104  check_lock_status();
2105  }
2106 }
2107 
2108 
2126 void snaplock::stop(bool quitting)
2127 {
2128  if(f_messenger != nullptr)
2129  {
2130  if(quitting || !f_messenger->is_connected())
2131  {
2132  // turn off that connection now, we cannot UNREGISTER since
2133  // we are not connected to snapcommunicator
2134  //
2135  f_communicator->remove_connection(f_messenger);
2136  f_messenger.reset();
2137  }
2138  else
2139  {
2140  f_messenger->mark_done();
2141 
2142  // unregister if we are still connected to the messenger
2143  // and Snap! Communicator is not already quitting
2144  //
2145  snap::snap_communicator_message cmd;
2146  cmd.set_command("UNREGISTER");
2147  cmd.add_parameter("service", f_service_name);
2148  send_message(cmd);
2149  }
2150  }
2151 
2152  if(f_communicator != nullptr)
2153  {
2154  f_communicator->remove_connection(f_interrupt);
2155  f_interrupt.reset();
2156 
2157  f_communicator->remove_connection(f_info);
2158  f_info.reset();
2159 
2160  f_communicator->remove_connection(f_debug_info);
2161  f_debug_info.reset();
2162 
2163  f_communicator->remove_connection(f_timer);
2164  f_timer.reset();
2165  }
2166 }
2167 
2168 
2197 void snaplock::get_parameters(snap::snap_communicator_message const & message, QString * object_name, pid_t * client_pid, time_t * timeout, QString * key, QString * source)
2198 {
2199  // get the "object name" (what we are locking)
2200  // in Snap, the object name is often a URI plus the action we are performing
2201  //
2202  if(object_name != nullptr)
2203  {
2204  *object_name = message.get_parameter("object_name");
2205  if(object_name->isEmpty())
2206  {
2207  // name missing
2208  //
2209  throw snap::snap_communicator_invalid_message("snaplock::get_parameters(): Invalid object name. We cannot lock the empty string.");
2210  }
2211  }
2212 
2213  // get the pid (process identifier) of the process that is
2214  // requesting the lock; this is important to be able to distinguish
2215  // multiple processes on the same computer requesting a lock
2216  //
2217  if(client_pid != nullptr)
2218  {
2219  *client_pid = message.get_integer_parameter("pid");
2220  if(*client_pid < 1)
2221  {
2222  // invalid pid
2223  //
2224  throw snap::snap_communicator_invalid_message(QString("snaplock::get_parameters(): Invalid pid specified for a lock (%1). It must be a positive decimal number.").arg(message.get_parameter("pid")).toUtf8().data());
2225  }
2226  }
2227 
2228  // get the time limit we will wait up to before we decide we
2229  // cannot obtain that lock
2230  //
2231  if(timeout != nullptr)
2232  {
2233  if(message.has_parameter("timeout"))
2234  {
2235  // this timeout may already be out of date in which case
2236  // the lock immediately fails
2237  //
2238  *timeout = message.get_integer_parameter("timeout");
2239  }
2240  else
2241  {
2242  *timeout = time(nullptr) + DEFAULT_TIMEOUT;
2243  }
2244  }
2245 
2246  // get the key of a ticket or entering object
2247  //
2248  if(key != nullptr)
2249  {
2250  *key = message.get_parameter("key");
2251  if(key->isEmpty())
2252  {
2253  // key missing
2254  //
2255  throw snap::snap_communicator_invalid_message("snaplock::get_parameters(): A key cannot be an empty string.");
2256  }
2257  }
2258 
2259  // get the key of a ticket or entering object
2260  //
2261  if(source != nullptr)
2262  {
2263  *source = message.get_parameter("source");
2264  if(source->isEmpty())
2265  {
2266  // source missing
2267  //
2268  throw snap::snap_communicator_invalid_message("snaplock::get_parameters(): A source cannot be an empty string.");
2269  }
2270  }
2271 }
2272 
2273 
2285 void snaplock::msg_absolutely(snap::snap_communicator_message & message)
2286 {
2287  QString const serial(message.get_parameter("serial"));
2288  snap::snap_string_list const segments(serial.split('/'));
2289 
2290  if(segments[0] == "relock")
2291  {
2292  // check serial as defined in msg_lock()
2293  // alive_message.add_parameter("serial", QString("relock/%1/%2").arg(object_name).arg(entering_key));
2294  //
2295  if(segments.size() != 4)
2296  {
2297  SNAP_LOG_WARNING("ABSOLUTELY reply has an invalid relock serial parameters \"")
2298  (serial)
2299  ("\" was expected to have exactly 4 segments.");
2300 
2301  snap::snap_communicator_message lock_failed_message;
2302  lock_failed_message.set_command("LOCKFAILED");
2303  lock_failed_message.reply_to(message);
2304  lock_failed_message.add_parameter("object_name", "unknown");
2305  lock_failed_message.add_parameter("error", "invalid");
2306  send_message(lock_failed_message);
2307 
2308  return;
2309  }
2310 
2311  // notice how the split() re-split the entering key
2312  //
2313  QString const object_name(segments[1]);
2314  QString const server_name(segments[2]);
2315  QString const client_pid(segments[3]);
2316 
2317  auto entering_ticket(f_entering_tickets.find(object_name));
2318  if(entering_ticket != f_entering_tickets.end())
2319  {
2320  QString const entering_key(QString("%1/%2").arg(server_name).arg(client_pid));
2321  auto key_ticket(entering_ticket->second.find(entering_key));
2322  if(key_ticket != entering_ticket->second.end())
2323  {
2324  // remove the alive timeout
2325  //
2326  key_ticket->second->set_alive_timeout(0);
2327 
2328  // got it! start the bakery algorithm
2329  //
2330  key_ticket->second->entering();
2331  }
2332  }
2333  }
2334 
2335  // ignore other messages
2336 }
2337 
2338 
2377 void snaplock::msg_lock(snap::snap_communicator_message & message)
2378 {
2379  QString object_name;
2380  pid_t client_pid(0);
2381  time_t timeout(0);
2382  get_parameters(message, &object_name, &client_pid, &timeout, nullptr, nullptr);
2383 
2384  // do some cleanup as well
2385  //
2386  cleanup();
2387 
2388  // if we are a leader, create an entering key
2389  //
2390  QString const server_name(message.has_parameter("lock_proxy_server_name")
2391  ? message.get_parameter("lock_proxy_server_name")
2392  : message.get_sent_from_server());
2393 
2394  QString const service_name(message.has_parameter("lock_proxy_service_name")
2395  ? message.get_parameter("lock_proxy_service_name")
2396  : message.get_sent_from_service());
2397 
2398  QString const entering_key(QString("%1/%2").arg(server_name).arg(client_pid));
2399 
2400  if(timeout <= time(nullptr))
2401  {
2402  SNAP_LOG_WARNING("Lock on \"")
2403  (object_name)
2404  ("\" / \"")
2405  (client_pid)
2406  ("\" timed out before we could start the locking process.");
2407 
2408  snap::snap_communicator_message lock_failed_message;
2409  lock_failed_message.set_command("LOCKFAILED");
2410  lock_failed_message.reply_to(message);
2411  lock_failed_message.add_parameter("object_name", object_name);
2412  lock_failed_message.add_parameter("key", entering_key);
2413  lock_failed_message.add_parameter("error", "timedout");
2414  send_message(lock_failed_message);
2415 
2416  return;
2417  }
2418 
2419  snap::snap_lock::timeout_t const duration(message.get_integer_parameter("duration"));
2420  if(duration < snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2421  {
2422  // invalid duration, minimum is 3
2423  //
2424  SNAP_LOG_ERROR(duration)
2425  (" is an invalid duration, the minimum accepted is ")
2426  (snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2427  (".");
2428 
2429  snap::snap_communicator_message lock_failed_message;
2430  lock_failed_message.set_command("LOCKFAILED");
2431  lock_failed_message.reply_to(message);
2432  lock_failed_message.add_parameter("object_name", object_name);
2433  lock_failed_message.add_parameter("key", entering_key);
2434  lock_failed_message.add_parameter("error", "invalid");
2435  send_message(lock_failed_message);
2436 
2437  return;
2438  }
2439 
2440  snap::snap_lock::timeout_t unlock_duration(snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT);
2441  if(message.has_parameter("unlock_duration"))
2442  {
2443  unlock_duration = message.get_integer_parameter("unlock_duration");
2444  if(unlock_duration < snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2445  {
2446  // invalid duration, minimum is 60
2447  //
2448  SNAP_LOG_ERROR(unlock_duration)
2449  (" is an invalid unlock duration, the minimum accepted is ")
2450  (snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2451  (".");
2452 
2453  snap::snap_communicator_message lock_failed_message;
2454  lock_failed_message.set_command("LOCKFAILED");
2455  lock_failed_message.reply_to(message);
2456  lock_failed_message.add_parameter("object_name", object_name);
2457  lock_failed_message.add_parameter("key", entering_key);
2458  lock_failed_message.add_parameter("error", "invalid");
2459  send_message(lock_failed_message);
2460 
2461  return;
2462  }
2463  }
2464 
2465  if(!is_ready())
2466  {
2467  SNAP_LOG_TRACE("caching LOCK message for \"")
2468  (object_name)
2469  ("\" as the snaplock system is not yet considered ready.");
2470 
2471  message_cache const mc
2472  {
2473  timeout,
2474  message
2475  };
2476  f_message_cache.push_back(mc);
2477 
2478  // make sure the cache gets cleaned up if the message times out
2479  //
2480  int64_t const timeout_date(f_messenger->get_timeout_date());
2481  if(timeout_date == -1
2482  || timeout_date > timeout)
2483  {
2484  f_timer->set_timeout_date(timeout);
2485  }
2486  return;
2487  }
2488 
2489  if(is_leader() == nullptr)
2490  {
2491  // we are not a leader, we need to forward the message to one
2492  // of the leaders instead
2493  //
2494  forward_message_to_leader(message);
2495  return;
2496  }
2497 
2498  // make sure there is not a ticket with the same name already defined
2499  //
2500  auto entering_ticket(f_entering_tickets.find(object_name));
2501  if(entering_ticket != f_entering_tickets.end())
2502  {
2503  auto key_ticket(entering_ticket->second.find(entering_key));
2504  if(key_ticket != entering_ticket->second.end())
2505  {
2506  // if this is a re-LOCK, then it may be a legitimate duplicate
2507  // in which case we do not want to generate a LOCKFAILED error
2508  //
2509  if(message.has_parameter("serial"))
2510  {
2511  snaplock_ticket::serial_t const serial(message.get_integer_parameter("serial"));
2512  if(key_ticket->second->get_serial() == serial)
2513  {
2514  // legitimate double request from leaders
2515  // (this happens when a leader dies and we have to restart
2516  // a lock negotiation)
2517  //
2518  return;
2519  }
2520  }
2521 
2522  // the object already exists... do not allow duplicates
2523  //
2524  SNAP_LOG_ERROR("an entering ticket has the same object name \"")
2525  (object_name)
2526  ("\" and entering key \"")
2527  (entering_key)
2528  ("\".");
2529 
2530  snap::snap_communicator_message lock_failed_message;
2531  lock_failed_message.set_command("LOCKFAILED");
2532  lock_failed_message.reply_to(message);
2533  lock_failed_message.add_parameter("object_name", object_name);
2534  lock_failed_message.add_parameter("key", entering_key);
2535  lock_failed_message.add_parameter("error", "duplicate");
2536  send_message(lock_failed_message);
2537 
2538  return;
2539  }
2540  }
2541 
2542  // make sure there is not a ticket with the same name already defined
2543  //
2544  // (this is is really important so we can actually properly UNLOCK an
2545  // existing lock since we use the same search and if two entries were
2546  // to be the same we could not know which to unlock; there are a few
2547  // other places where such a search is used actually...)
2548  //
2549  auto obj_ticket(f_tickets.find(object_name));
2550  if(obj_ticket != f_tickets.end())
2551  {
2552  auto key_ticket(std::find_if(
2553  obj_ticket->second.begin()
2554  , obj_ticket->second.end()
2555  , [&entering_key](auto const & t)
2556  {
2557  return t.second->get_entering_key() == entering_key;
2558  }));
2559  if(key_ticket != obj_ticket->second.end())
2560  {
2561  // there is already a ticket with this name/entering key
2562  //
2563  SNAP_LOG_ERROR("a ticket has the same object name \"")
2564  (object_name)
2565  ("\" and entering key \"")
2566  (entering_key)
2567  ("\".");
2568 
2569  snap::snap_communicator_message lock_failed_message;
2570  lock_failed_message.set_command("LOCKFAILED");
2571  lock_failed_message.reply_to(message);
2572  lock_failed_message.add_parameter("object_name", object_name);
2573  lock_failed_message.add_parameter("key", entering_key);
2574  lock_failed_message.add_parameter("error", "duplicate");
2575  send_message(lock_failed_message);
2576 
2577  return;
2578  }
2579  }
2580 
2581  snaplock_ticket::pointer_t ticket(std::make_shared<snaplock_ticket>(
2582  this
2583  , f_messenger
2584  , object_name
2585  , entering_key
2586  , timeout
2587  , duration
2588  , server_name
2589  , service_name));
2590 
2591  f_entering_tickets[object_name][entering_key] = ticket;
2592 
2593  // finish up ticket initialization
2594  //
2595  ticket->set_unlock_duration(unlock_duration);
2596 
2597  // general a serial number for that ticket
2598  //
2599  f_ticket_serial = (f_ticket_serial + 1) & 0x00FFFFFF;
2600  if(f_leaders[0]->get_id() != f_my_id)
2601  {
2602  if(f_leaders.size() >= 2
2603  && f_leaders[1]->get_id() != f_my_id)
2604  {
2605  f_ticket_serial |= 1 << 24;
2606  }
2607  else if(f_leaders.size() >= 3
2608  && f_leaders[2]->get_id() != f_my_id)
2609  {
2610  f_ticket_serial |= 2 << 24;
2611  }
2612  }
2613  ticket->set_serial(f_ticket_serial);
2614 
2615  if(message.has_parameter("serial"))
2616  {
2617  // if we have a "serial" number in that message, we lost a leader
2618  // and when that happens we are not unlikely to have lost the
2619  // client that requested the LOCK, send an ALIVE message to make
2620  // sure that the client still exists before entering the ticket
2621  //
2622  ticket->set_alive_timeout(5 + time(nullptr));
2623 
2624  snap::snap_communicator_message alive_message;
2625  alive_message.set_command("ALIVE");
2626  alive_message.set_server(server_name);
2627  alive_message.set_service(service_name);
2628  alive_message.add_parameter("serial", QString("relock/%1/%2").arg(object_name).arg(entering_key));
2629  alive_message.add_parameter("timestamp", time(nullptr));
2630  send_message(alive_message);
2631  }
2632  else
2633  {
2634  // act on the new ticket
2635  //
2636  ticket->entering();
2637  }
2638 
2639  // the list of tickets changed, make sure we update timeout timer
2640  //
2641  cleanup();
2642 }
2643 
2644 
2653 void snaplock::msg_unlock(snap::snap_communicator_message & message)
2654 {
2655  if(!is_ready())
2656  {
2657  SNAP_LOG_ERROR("received an UNLOCK when snaplock is not ready to receive LOCK messages.");
2658  return;
2659  }
2660 
2661  if(is_leader() == nullptr)
2662  {
2663  // we are not a leader, we need to forward to a leader to handle
2664  // the message properly
2665  //
2666  forward_message_to_leader(message);
2667  return;
2668  }
2669 
2670  QString object_name;
2671  pid_t client_pid(0);
2672  get_parameters(message, &object_name, &client_pid, nullptr, nullptr, nullptr);
2673 
2674  // if the ticket still exists, send the UNLOCKED and then erase it
2675  //
2676  auto obj_ticket(f_tickets.find(object_name));
2677  if(obj_ticket != f_tickets.end())
2678  {
2679  QString const server_name(message.has_parameter("lock_proxy_server_name")
2680  ? message.get_parameter("lock_proxy_server_name")
2681  : message.get_sent_from_server());
2682 
2683  //QString const service_name(message.has_parameter("lock_proxy_service_name")
2684  // ? message.get_parameter("lock_proxy_service_name")
2685  // : message.get_sent_from_service());
2686 
2687  QString const entering_key(QString("%1/%2").arg(server_name).arg(client_pid));
2688  auto key_ticket(std::find_if(
2689  obj_ticket->second.begin()
2690  , obj_ticket->second.end()
2691  , [&entering_key](auto const & t)
2692  {
2693  return t.second->get_entering_key() == entering_key;
2694  }));
2695  if(key_ticket != obj_ticket->second.end())
2696  {
2697  // this function will send a DROPTICKET to the other leaders
2698  // and the UNLOCKED to the source (unless we already sent the
2699  // UNLOCKED which gets sent at most once.)
2700  //
2701  key_ticket->second->drop_ticket();
2702 
2703  obj_ticket->second.erase(key_ticket);
2704  if(obj_ticket->second.empty())
2705  {
2706  // we are done with this one!
2707  //
2708  f_tickets.erase(obj_ticket);
2709  }
2710  }
2711 else SNAP_LOG_WARNING("and we could not find that key in that object's map...");
2712  }
2713 
2714  // reset the timeout with the other locks
2715  //
2716  cleanup();
2717 }
2718 
2719 
2726 void snaplock::msg_lock_entering(snap::snap_communicator_message & message)
2727 {
2728  QString object_name;
2729  time_t timeout(0);
2730  QString key;
2731  QString source;
2732  get_parameters(message, &object_name, nullptr, &timeout, &key, &source);
2733 
2734  // the server_name and client_pid never include a slash so using
2735  // such as separators is safe
2736  //
2737  if(timeout > time(nullptr)) // lock still in the future?
2738  {
2739  if(is_ready()) // still have leaders?
2740  {
2741  // the entering is just a flag (i.e. entering[i] = true)
2742  // in our case the existance of a ticket is enough to know
2743  // that we entered
2744  //
2745  bool allocate(true);
2746  auto const obj_ticket(f_entering_tickets.find(object_name));
2747  if(obj_ticket != f_entering_tickets.end())
2748  {
2749  auto const key_ticket(obj_ticket->second.find(key));
2750  allocate = key_ticket == obj_ticket->second.end();
2751  }
2752  if(allocate)
2753  {
2754  // ticket does not exist, so create it now
2755  // (note: ticket should only exist on originator)
2756  //
2757  int32_t const duration(message.get_integer_parameter("duration"));
2758  if(duration < snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2759  {
2760  // invalid duration, minimum is 3
2761  //
2762  SNAP_LOG_ERROR(duration)
2763  (" is an invalid duration, the minimum accepted is ")
2764  (snap::snap_lock::SNAP_LOCK_MINIMUM_TIMEOUT)
2765  (".");
2766 
2767  snap::snap_communicator_message lock_failed_message;
2768  lock_failed_message.set_command("LOCKFAILED");
2769  lock_failed_message.reply_to(message);
2770  lock_failed_message.add_parameter("object_name", object_name);
2771  lock_failed_message.add_parameter("key", key);
2772  lock_failed_message.add_parameter("error", "invalid");
2773  send_message(lock_failed_message);
2774 
2775  return;
2776  }
2777 
2778  int32_t unlock_duration(snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT);
2779  if(message.has_parameter("unlock_duration"))
2780  {
2781  unlock_duration = message.get_integer_parameter("unlock_duration");
2782  if(unlock_duration != snap::snap_lock::SNAP_UNLOCK_USES_LOCK_TIMEOUT
2783  && unlock_duration < snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2784  {
2785  // invalid duration, minimum is 60
2786  //
2787  SNAP_LOG_ERROR(duration)
2788  (" is an invalid unlock duration, the minimum accepted is ")
2789  (snap::snap_lock::SNAP_UNLOCK_MINIMUM_TIMEOUT)
2790  (".");
2791 
2792  snap::snap_communicator_message lock_failed_message;
2793  lock_failed_message.set_command("LOCKFAILED");
2794  lock_failed_message.reply_to(message);
2795  lock_failed_message.add_parameter("object_name", object_name);
2796  lock_failed_message.add_parameter("key", key);
2797  lock_failed_message.add_parameter("error", "invalid");
2798  send_message(lock_failed_message);
2799 
2800  return;
2801  }
2802  }
2803 
2804  // we have to know where this message comes from
2805  //
2806  snap::snap_string_list const source_segments(source.split("/"));
2807  if(source_segments.size() != 2)
2808  {
2809  SNAP_LOG_ERROR("Invalid number of parameters in source (found ")
2810  (source_segments.size())
2811  (", expected 2.)");
2812 
2813  snap::snap_communicator_message lock_failed_message;
2814  lock_failed_message.set_command("LOCKFAILED");
2815  lock_failed_message.reply_to(message);
2816  lock_failed_message.add_parameter("object_name", object_name);
2817  lock_failed_message.add_parameter("key", key);
2818  lock_failed_message.add_parameter("error", "invalid");
2819  send_message(lock_failed_message);
2820 
2821  return;
2822  }
2823 
2824  snaplock_ticket::pointer_t ticket(std::make_shared<snaplock_ticket>(
2825  this
2826  , f_messenger
2827  , object_name
2828  , key
2829  , timeout
2830  , duration
2831  , source_segments[0]
2832  , source_segments[1]));
2833 
2834  f_entering_tickets[object_name][key] = ticket;
2835 
2836  // finish up on ticket initialization
2837  //
2838  ticket->set_owner(message.get_sent_from_server());
2839  ticket->set_unlock_duration(unlock_duration);
2840  ticket->set_serial(message.get_integer_parameter("serial"));
2841  }
2842 
2843  snap::snap_communicator_message reply;
2844  reply.set_command("LOCKENTERED");
2845  reply.reply_to(message);
2846  reply.add_parameter("object_name", object_name);
2847  reply.add_parameter("key", key);
2848  send_message(reply);
2849  }
2850  else
2851  {
2852  SNAP_LOG_DEBUG("received LOCKENTERING while we are thinking we are not ready.");
2853  }
2854  }
2855 
2856  cleanup();
2857 }
2858 
2859 
2870 void snaplock::msg_lock_entered(snap::snap_communicator_message & message)
2871 {
2872  QString object_name;
2873  QString key;
2874  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
2875 
2876  auto const obj_entering_ticket(f_entering_tickets.find(object_name));
2877  if(obj_entering_ticket != f_entering_tickets.end())
2878  {
2879  auto const key_entering_ticket(obj_entering_ticket->second.find(key));
2880  if(key_entering_ticket != obj_entering_ticket->second.end())
2881  {
2882  key_entering_ticket->second->entered();
2883  }
2884  }
2885 }
2886 
2887 
2894 void snaplock::msg_lock_exiting(snap::snap_communicator_message & message)
2895 {
2896  QString object_name;
2897  QString key;
2898  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
2899 
2900  // when exiting we just remove the entry with that key
2901  //
2902  auto const obj_entering(f_entering_tickets.find(object_name));
2903  if(obj_entering != f_entering_tickets.end())
2904  {
2905  auto const key_entering(obj_entering->second.find(key));
2906  if(key_entering != obj_entering->second.end())
2907  {
2908  obj_entering->second.erase(key_entering);
2909 
2910  // we also want to remove it from the ticket f_entering
2911  // map if it is there (older ones are there!)
2912  //
2913  bool run_activation(false);
2914  auto const obj_ticket(f_tickets.find(object_name));
2915  if(obj_ticket != f_tickets.end())
2916  {
2917  for(auto const & key_ticket : obj_ticket->second)
2918  {
2919  key_ticket.second->remove_entering(key);
2920  run_activation = true;
2921  }
2922  }
2923  if(run_activation)
2924  {
2925  // try to activate the lock right now since it could
2926  // very well be the only ticket and that is exactly
2927  // when it is viewed as active!
2928  //
2929  // Note: this is from my old version, if I am correct
2930  // it cannot happen anymore because (1) this is
2931  // not the owner so the activation would not
2932  // take anyway and (2) the ticket is not going
2933  // to be marked as being ready at this point
2934  // (that happens later)
2935  //
2936  // XXX we probably should remove this statement
2937  // and the run_activation flag which would
2938  // then be useless
2939  //
2940  activate_first_lock(object_name);
2941  }
2942 
2943  if(obj_entering->second.empty())
2944  {
2945  f_entering_tickets.erase(obj_entering);
2946  }
2947  }
2948  }
2949 
2950  // the list of tickets is not unlikely changed so we need to make
2951  // a call to cleanup to make sure the timer is reset appropriately
2952  //
2953  cleanup();
2954 }
2955 
2956 
2972 void snaplock::msg_drop_ticket(snap::snap_communicator_message & message)
2973 {
2974  QString object_name;
2975  QString key;
2976  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
2977 
2978  snap::snap_string_list const segments(key.split('/'));
2979 
2980  // drop the regular ticket
2981  //
2982  // if we have only 2 segments, then there is no corresponding ticket
2983  // since tickets are added only once we have a ticket_id
2984  //
2985  QString entering_key;
2986  if(segments.size() == 3)
2987  {
2988  auto obj_ticket(f_tickets.find(object_name));
2989  if(obj_ticket != f_tickets.end())
2990  {
2991  auto key_ticket(obj_ticket->second.find(key));
2992  if(key_ticket != obj_ticket->second.end())
2993  {
2994  obj_ticket->second.erase(key_ticket);
2995  }
2996 
2997  if(obj_ticket->second.empty())
2998  {
2999  f_tickets.erase(obj_ticket);
3000  }
3001 
3002  // one ticket was erased, another may be first now
3003  //
3004  activate_first_lock(object_name);
3005  }
3006 
3007  // we received the ticket_id in the message, so
3008  // we have to regenerate the entering_key without
3009  // the ticket_id (which is the first element)
3010  //
3011  entering_key = QString("%1/%2").arg(segments[1]).arg(segments[2]);
3012  }
3013  else
3014  {
3015  // we received the entering_key in the message, use as is
3016  //
3017  entering_key = key;
3018  }
3019 
3020  // drop the entering ticket
3021  //
3022  auto obj_entering_ticket(f_entering_tickets.find(object_name));
3023  if(obj_entering_ticket != f_entering_tickets.end())
3024  {
3025  auto key_entering_ticket(obj_entering_ticket->second.find(entering_key));
3026  if(key_entering_ticket != obj_entering_ticket->second.end())
3027  {
3028  obj_entering_ticket->second.erase(key_entering_ticket);
3029  }
3030 
3031  if(obj_entering_ticket->second.empty())
3032  {
3033  f_entering_tickets.erase(obj_entering_ticket);
3034  }
3035  }
3036 
3037  // the list of tickets is not unlikely changed so we need to make
3038  // a call to cleanup to make sure the timer is reset appropriately
3039  //
3040  cleanup();
3041 }
3042 
3043 
3052 void snaplock::msg_get_max_ticket(snap::snap_communicator_message & message)
3053 {
3054  QString object_name;
3055  QString key;
3056  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3057 
3058  // remove any f_tickets that timed out by now because these should
3059  // not be taken in account in the max. computation
3060  //
3061  cleanup();
3062 
3063  snaplock_ticket::ticket_id_t last_ticket(get_last_ticket(object_name));
3064 
3065  snap::snap_communicator_message reply;
3066  reply.set_command("MAXTICKET");
3067  reply.reply_to(message);
3068  reply.add_parameter("object_name", object_name);
3069  reply.add_parameter("key", key);
3070  reply.add_parameter("ticket_id", last_ticket);
3071  send_message(reply);
3072 }
3073 
3074 
3085 void snaplock::msg_max_ticket(snap::snap_communicator_message & message)
3086 {
3087  QString object_name;
3088  QString key;
3089  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3090 
3091  // the MAXTICKET is an answer that has to go in a still un-added ticket
3092  //
3093  auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3094  if(obj_entering_ticket != f_entering_tickets.end())
3095  {
3096  auto const key_entering_ticket(obj_entering_ticket->second.find(key));
3097  if(key_entering_ticket != obj_entering_ticket->second.end())
3098  {
3099  key_entering_ticket->second->max_ticket(message.get_integer_parameter("ticket_id"));
3100  }
3101  }
3102 }
3103 
3104 
3120 void snaplock::msg_add_ticket(snap::snap_communicator_message & message)
3121 {
3122  QString object_name;
3123  QString key;
3124  time_t timeout;
3125  get_parameters(message, &object_name, nullptr, &timeout, &key, nullptr);
3126 
3127 #ifdef _DEBUG
3128  {
3129  auto const obj_ticket(f_tickets.find(object_name));
3130  if(obj_ticket != f_tickets.end())
3131  {
3132  auto const key_ticket(obj_ticket->second.find(key));
3133  if(key_ticket != obj_ticket->second.end())
3134  {
3135  // this ticket exists on this system
3136  //
3137  throw std::logic_error("snaplock::add_ticket() ticket already exists");
3138  }
3139  }
3140  }
3141 #endif
3142 
3143  // the client_pid parameter is part of the key (3rd segment)
3144  //
3145  snap::snap_string_list const segments(key.split('/'));
3146  if(segments.size() != 3)
3147  {
3148  SNAP_LOG_ERROR("Expected exactly 3 segments in \"")
3149  (key)
3150  ("\" to add a ticket.");
3151 
3152  snap::snap_communicator_message lock_failed_message;
3153  lock_failed_message.set_command("LOCKFAILED");
3154  lock_failed_message.reply_to(message);
3155  lock_failed_message.add_parameter("object_name", object_name);
3156  lock_failed_message.add_parameter("key", key);
3157  lock_failed_message.add_parameter("error", "invalid");
3158  send_message(lock_failed_message);
3159 
3160  return;
3161  }
3162 
3163  bool ok(false);
3164  uint32_t const number(segments[0].toUInt(&ok, 16));
3165  if(!ok)
3166  {
3167  SNAP_LOG_ERROR("somehow ticket number \"")
3168  (segments[0])
3169  ("\" is not a valid hexadecimal number");
3170 
3171  snap::snap_communicator_message lock_failed_message;
3172  lock_failed_message.set_command("LOCKFAILED");
3173  lock_failed_message.reply_to(message);
3174  lock_failed_message.add_parameter("object_name", object_name);
3175  lock_failed_message.add_parameter("key", key);
3176  lock_failed_message.add_parameter("error", "invalid");
3177  send_message(lock_failed_message);
3178 
3179  return;
3180  }
3181 
3182  // by now all existing snaplock instances should already have
3183  // an entering ticket for that one ticket
3184  //
3185  auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3186  if(obj_entering_ticket == f_entering_tickets.end())
3187  {
3188  SNAP_LOG_ERROR("Expected entering ticket object for \"")
3189  (object_name)
3190  ("\" not found when adding a ticket.");
3191 
3192  snap::snap_communicator_message lock_failed_message;
3193  lock_failed_message.set_command("LOCKFAILED");
3194  lock_failed_message.reply_to(message);
3195  lock_failed_message.add_parameter("object_name", object_name);
3196  lock_failed_message.add_parameter("key", key);
3197  lock_failed_message.add_parameter("error", "invalid");
3198  send_message(lock_failed_message);
3199 
3200  return;
3201  }
3202 
3203  // the key we need to search is not the new ticket key but the
3204  // entering key, build it from the segments
3205  //
3206  QString const entering_key(QString("%1/%2").arg(segments[1]).arg(segments[2]));
3207  auto const key_entering_ticket(obj_entering_ticket->second.find(entering_key));
3208  if(key_entering_ticket == obj_entering_ticket->second.end())
3209  {
3210  SNAP_LOG_ERROR("Expected entering ticket key for \"")
3211  (object_name)
3212  ("\" not found when adding a ticket.");
3213 
3214  snap::snap_communicator_message lock_failed_message;
3215  lock_failed_message.set_command("LOCKFAILED");
3216  lock_failed_message.reply_to(message);
3217  lock_failed_message.add_parameter("object_name", object_name);
3218  lock_failed_message.add_parameter("key", key);
3219  lock_failed_message.add_parameter("error", "invalid");
3220  send_message(lock_failed_message);
3221 
3222  return;
3223  }
3224 
3225  // make it an official ticket now
3226  //
3227  // this should happen on all snaplock other than the one that
3228  // first received the LOCK message
3229  //
3230  set_ticket(object_name, key, key_entering_ticket->second);
3231 
3232  // WARNING: the set_ticket_number() function has the same side
3233  // effects as the add_ticket() function without the
3234  // send_message() call
3235  //
3236  f_tickets[object_name][key]->set_ticket_number(number);
3237 
3238  snap::snap_communicator_message ticket_added_message;
3239  ticket_added_message.set_command("TICKETADDED");
3240  ticket_added_message.reply_to(message);
3241  ticket_added_message.add_parameter("object_name", object_name);
3242  ticket_added_message.add_parameter("key", key);
3243  send_message(ticket_added_message);
3244 }
3245 
3246 
3254 void snaplock::msg_ticket_added(snap::snap_communicator_message & message)
3255 {
3256  QString object_name;
3257  QString key;
3258  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3259 
3260  auto const obj_ticket(f_tickets.find(object_name));
3261  if(obj_ticket != f_tickets.end())
3262  {
3263  auto const key_ticket(obj_ticket->second.find(key));
3264  if(key_ticket != obj_ticket->second.end())
3265  {
3266  // this ticket exists on this system
3267  //
3268  auto const obj_entering_ticket(f_entering_tickets.find(object_name));
3269  if(obj_entering_ticket == f_entering_tickets.end())
3270  {
3271  // this happens all the time because the entering ticket
3272  // gets removed on the first TICKETADDED we receive so
3273  // on the second one we get here...
3274  //
3275  SNAP_LOG_TRACE("called with object \"")
3276  (object_name)
3277  ("\" not present in f_entering_ticket (key: \"")
3278  (key)
3279  ("\".)");
3280  return;
3281  }
3282  key_ticket->second->ticket_added(obj_entering_ticket->second);
3283  }
3284  else
3285  {
3286  SNAP_LOG_DEBUG("found object \"")
3287  (object_name)
3288  ("\" but could not find a ticket with key \"")
3289  (key)
3290  ("\"...");
3291  }
3292  }
3293  else
3294  {
3295  SNAP_LOG_DEBUG("object \"")
3296  (object_name)
3297  ("\" not found.");
3298  }
3299 }
3300 
3301 
3309 void snaplock::msg_ticket_ready(snap::snap_communicator_message & message)
3310 {
3311  QString object_name;
3312  QString key;
3313  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3314 
3315  auto obj_ticket(f_tickets.find(object_name));
3316  if(obj_ticket != f_tickets.end())
3317  {
3318  auto key_ticket(obj_ticket->second.find(key));
3319  if(key_ticket != obj_ticket->second.end())
3320  {
3321  // we can mark this ticket as activated
3322  //
3323  key_ticket->second->set_ready();
3324  }
3325  }
3326 }
3327 
3328 
3343 void snaplock::msg_activate_lock(snap::snap_communicator_message & message)
3344 {
3345  QString object_name;
3346  QString key;
3347  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3348 
3349  QString first_key("no-key");
3350 
3351  auto ticket(find_first_lock(object_name));
3352  if(ticket != nullptr)
3353  {
3354  // found it!
3355  //
3356  first_key = ticket->get_ticket_key();
3357 
3358  if(key == first_key)
3359  {
3360  // we can mark this ticket as activated
3361  //
3362  ticket->lock_activated();
3363  }
3364  }
3365 
3366  // always reply, if we could not find the key, then we returned 'no-key'
3367  // as the key parameter
3368  //
3369  snap::snap_communicator_message lock_activated_message;
3370  lock_activated_message.set_command("LOCKACTIVATED");
3371  lock_activated_message.reply_to(message);
3372  lock_activated_message.add_parameter("object_name", object_name);
3373  lock_activated_message.add_parameter("key", key);
3374  lock_activated_message.add_parameter("other_key", first_key);
3375  send_message(lock_activated_message);
3376 
3377  // the list of tickets is not unlikely changed so we need to make
3378  // a call to cleanup to make sure the timer is reset appropriately
3379  //
3380  cleanup();
3381 }
3382 
3383 
3392 void snaplock::msg_lock_activated(snap::snap_communicator_message & message)
3393 {
3394  QString object_name;
3395  QString key;
3396  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3397 
3398  QString const & other_key(message.get_parameter("other_key"));
3399  if(other_key == key)
3400  {
3401  auto obj_ticket(f_tickets.find(object_name));
3402  if(obj_ticket != f_tickets.end())
3403  {
3404  auto key_ticket(obj_ticket->second.find(key));
3405  if(key_ticket != obj_ticket->second.end())
3406  {
3407  // that key is still here!
3408  // time to activate
3409  //
3410  key_ticket->second->lock_activated();
3411  }
3412  }
3413  }
3414 }
3415 
3416 
3442 void snaplock::msg_lock_failed(snap::snap_communicator_message & message)
3443 {
3444  QString object_name;
3445  QString key;
3446  get_parameters(message, &object_name, nullptr, nullptr, &key, nullptr);
3447 
3448  QString forward_server;
3449  QString forward_service;
3450 
3451  // remove f_entering_tickets entries if we find matches there
3452  //
3453  auto obj_entering(f_entering_tickets.find(object_name));
3454  if(obj_entering != f_entering_tickets.end())
3455  {
3456  auto key_entering(obj_entering->second.find(key));
3457  if(key_entering != obj_entering->second.end())
3458  {
3459  forward_server = key_entering->second->get_server_name();
3460  forward_service = key_entering->second->get_service_name();
3461 
3462  obj_entering->second.erase(key_entering);
3463  }
3464 
3465  if(obj_entering->second.empty())
3466  {
3467  obj_entering = f_entering_tickets.erase(obj_entering);
3468  }
3469  else
3470  {
3471  ++obj_entering;
3472  }
3473  }
3474 
3475  // remove any f_tickets entries if we find matches there
3476  //
3477  auto obj_ticket(f_tickets.find(object_name));
3478  if(obj_ticket != f_tickets.end())
3479  {
3480  bool try_activate(false);
3481  auto key_ticket(obj_ticket->second.find(key));
3482  if(key_ticket == obj_ticket->second.end())
3483  {
3484  key_ticket = std::find_if(
3485  obj_ticket->second.begin()
3486  , obj_ticket->second.end()
3487  , [&key](auto const & t)
3488  {
3489  return t.second->get_entering_key() == key;
3490  });
3491  }
3492  if(key_ticket != obj_ticket->second.end())
3493  {
3494  // Note: if we already found it in the f_entering_tickets then
3495  // the server and service names are going to be exactly
3496  // the same so there is no need to test that here
3497  //
3498  forward_server = key_ticket->second->get_server_name();
3499  forward_service = key_ticket->second->get_service_name();
3500 
3501  obj_ticket->second.erase(key_ticket);
3502  try_activate = true;
3503  }
3504 
3505  if(obj_ticket->second.empty())
3506  {
3507  obj_ticket = f_tickets.erase(obj_ticket);
3508  }
3509  else
3510  {
3511  if(try_activate)
3512  {
3513  // something was erased, a new ticket may be first
3514  //
3515  activate_first_lock(obj_ticket->first);
3516  }
3517 
3518  ++obj_ticket;
3519  }
3520  }
3521 
3522  if(!forward_server.isEmpty()
3523  && !forward_service.isEmpty())
3524  {
3525  // we deleted an entry, forward the message to the service
3526  // that requested that lock
3527  //
3528  message.set_server(forward_server);
3529  message.set_service(forward_service);
3530  send_message(message);
3531  }
3532 
3533  // the list of tickets is not unlikely changed so we need to make
3534  // a call to cleanup to make sure the timer is reset appropriately
3535  //
3536  cleanup();
3537 }
3538 
3539 
3564 void snaplock::activate_first_lock(QString const & object_name)
3565 {
3566  auto ticket(find_first_lock(object_name));
3567 
3568  if(ticket != nullptr)
3569  {
3570  // there is what we think is the first ticket
3571  // that should be actived now; we need to share
3572  // with the other 2 leaders to make sure of that
3573  //
3574  ticket->activate_lock();
3575  }
3576 }
3577 
3578 
3579 snaplock_ticket::pointer_t snaplock::find_first_lock(QString const & object_name)
3580 {
3581  snaplock_ticket::pointer_t first_ticket;
3582  auto const obj_ticket(f_tickets.find(object_name));
3583 
3584  if(obj_ticket != f_tickets.end())
3585  {
3586  // loop through making sure that we activate a ticket only
3587  // if the obtention date was not already reached; if that
3588  // date was reached before we had the time to activate the
3589  // lock, then the client should have abandonned the lock
3590  // request anyway...
3591  //
3592  // (this is already done in the cleanup(), but a couple of
3593  // other functions may call the activate_first_lock()
3594  // function!)
3595  //
3596  for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3597  {
3598  if(key_ticket->second->timed_out())
3599  {
3600  // that ticket timed out, send an UNLOCK or LOCKFAILED
3601  // message and get rid of it
3602  //
3603  key_ticket->second->lock_failed();
3604  if(key_ticket->second->timed_out())
3605  {
3606  // still timed out, remove it
3607  //
3608  key_ticket = obj_ticket->second.erase(key_ticket);
3609  }
3610  }
3611  else
3612  {
3613  if(first_ticket == nullptr)
3614  {
3615  first_ticket = key_ticket->second;
3616  }
3617  ++key_ticket;
3618  }
3619  }
3620 
3621  if(obj_ticket->second.empty())
3622  {
3623  // it is empty now, get rid of that set of tickets
3624  //
3625  f_tickets.erase(obj_ticket);
3626  }
3627  }
3628 
3629  return first_ticket;
3630 }
3631 
3632 
3655 void snaplock::synchronize_leaders()
3656 {
3657  // there is nothing to do if we are by ourselves because we cannot
3658  // gain any type of concensus unless we are expected to be the only
3659  // one in which case there is no synchronization requirements anyway
3660  //
3661  if(f_leaders.size() <= 1)
3662  {
3663  return;
3664  }
3665 
3666  // only leaders can synchronize each others
3667  // (other snaplocks do not have any tickets to synchronize)
3668  //
3669  if(!is_leader())
3670  {
3671  return;
3672  }
3673 
3674  // determine whether we are leader #0 or not, if zero, then we
3675  // call msg_lock() directly, otherwise we do a send_message()
3676  //
3677  bool const leader0(f_leaders[0]->get_id() == f_my_id);
3678 
3679  // a vector of messages for which we have to call msg_lock()
3680  //
3681  snap::snap_communicator_message::vector_t local_locks;
3682 
3683  // if entering a ticket is definitely not locked, although it
3684  // could be ready (one step away from being locked!) we still
3685  // restart the whole process with the new leaders if such
3686  // exist
3687  //
3688  // Note: of course we restart the process only if the owner
3689  // was that one leader that disappeared, not if the
3690  // ticket is owned by a remaining leader
3691  //
3692  for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); ++obj_entering)
3693  {
3694  for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
3695  {
3696  QString const owner_name(key_entering->second->get_owner());
3697  auto key_leader(std::find_if(
3698  f_leaders.begin()
3699  , f_leaders.end()
3700  , [&owner_name](auto const & l)
3701  {
3702  return l->get_name() == owner_name;
3703  }));
3704  if(key_leader == f_leaders.end())
3705  {
3706  // give new ownership to leader[0]
3707  //
3708  snap::snap_communicator_message lock_message;
3709  lock_message.set_command("LOCK");
3710  lock_message.set_server(f_leaders[0]->get_name());
3711  lock_message.set_service("snaplock");
3712  lock_message.set_sent_from_server(key_entering->second->get_server_name());
3713  lock_message.set_sent_from_service(key_entering->second->get_service_name());
3714  lock_message.add_parameter("object_name", key_entering->second->get_object_name());
3715  lock_message.add_parameter("pid", key_entering->second->get_client_pid());
3716  lock_message.add_parameter("timeout", key_entering->second->get_obtention_timeout());
3717  lock_message.add_parameter("duration", key_entering->second->get_lock_duration());
3718  lock_message.add_parameter("unlock_duration", key_entering->second->get_unlock_duration());
3719  if(leader0)
3720  {
3721  // we are leader #0 so directly call msg_lock()
3722  //
3723  // first we remove the entry otherwise we get a duplicate
3724  // error since we try to readd the same ticket
3725  //
3726  key_entering = obj_entering->second.erase(key_entering);
3727  local_locks.push_back(lock_message);
3728  }
3729  else
3730  {
3731  // we are not leader #0, so send the message to it
3732  //
3733  ++key_entering;
3734  lock_message.add_parameter("serial", key_entering->second->get_serial());
3735  send_message(lock_message);
3736  }
3737  }
3738  else
3739  {
3740  ++key_entering;
3741  }
3742  }
3743  }
3744 
3745  // a ticket may still be unlocked in which case we want to
3746  // restart the lock process as if still entering
3747  //
3748  // if locked, a ticket is assigned leader0 as its new owner so
3749  // further work on that ticket works as expected
3750  //
3751  QString serialized;
3752  for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); ++obj_ticket)
3753  {
3754  for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3755  {
3756  QString const owner_name(key_ticket->second->get_owner());
3757  auto key_leader(std::find_if(
3758  f_leaders.begin()
3759  , f_leaders.end()
3760  , [&owner_name](auto const & l)
3761  {
3762  return l->get_name() == owner_name;
3763  }));
3764  if(key_ticket->second->is_locked())
3765  {
3766  // if ticket was locked by the leader that disappeared, we
3767  // transfer ownership to the new leader #0
3768  //
3769  if(key_leader == f_leaders.end())
3770  {
3771  key_ticket->second->set_owner(f_leaders[0]->get_name());
3772  }
3773 
3774  // and send that ticket to the other leaders to make sure
3775  // they all agree on its current state
3776  //
3777  serialized += key_ticket->second->serialize();
3778  serialized += QChar('\n');
3779 
3780  ++key_ticket;
3781  }
3782  else
3783  {
3784  // it was not locked yet, restart the LOCK process from
3785  // the very beginning
3786  //
3787  if(key_leader == f_leaders.end())
3788  {
3789  // give new ownership to leader[0]
3790  //
3791  snap::snap_communicator_message lock_message;
3792  lock_message.set_command("LOCK");
3793  lock_message.set_server(f_leaders[0]->get_name());
3794  lock_message.set_service("snaplock");
3795  lock_message.set_sent_from_server(key_ticket->second->get_server_name());
3796  lock_message.set_sent_from_service(key_ticket->second->get_service_name());
3797  lock_message.add_parameter("object_name", key_ticket->second->get_object_name());
3798  lock_message.add_parameter("pid", key_ticket->second->get_client_pid());
3799  lock_message.add_parameter("timeout", key_ticket->second->get_obtention_timeout());
3800  lock_message.add_parameter("duration", key_ticket->second->get_lock_duration());
3801  lock_message.add_parameter("unlock_duration", key_ticket->second->get_unlock_duration());
3802  if(leader0)
3803  {
3804  // we are leader #0 so directly call msg_lock()
3805  //
3806  key_ticket = obj_ticket->second.erase(key_ticket);
3807  local_locks.push_back(lock_message);
3808  }
3809  else
3810  {
3811  // we are not leader #0, so send the message to it
3812  //
3813  ++key_ticket;
3814  lock_message.add_parameter("serial", key_ticket->second->get_serial());
3815  send_message(lock_message);
3816  }
3817  }
3818  else
3819  {
3820  ++key_ticket;
3821  }
3822  }
3823  }
3824  }
3825 
3826  // we send those after the loops above because the msg_lock() is
3827  // not unlikely to change the f_entering_tickets map and looping
3828  // through it when another function is going to modify it is not
3829  // wise
3830  //
3831  for(auto lm : local_locks)
3832  {
3833  msg_lock(lm);
3834  }
3835 
3836  // send LOCKTICkETS if there is serialized ticket data
3837  //
3838  if(!serialized.isEmpty())
3839  {
3840  snap::snap_communicator_message lock_tickets_message;
3841  lock_tickets_message.set_command("LOCKTICKETS");
3842  lock_tickets_message.set_service("snaplock");
3843  lock_tickets_message.add_parameter("tickets", serialized);
3844 
3845  auto const la(get_leader_a());
3846  if(la != nullptr)
3847  {
3848  lock_tickets_message.set_server(la->get_name());
3849  send_message(lock_tickets_message);
3850 
3851  auto const lb(get_leader_b());
3852  if(lb != nullptr)
3853  {
3854  lock_tickets_message.set_server(lb->get_name());
3855  send_message(lock_tickets_message);
3856  }
3857  }
3858  }
3859 }
3860 
3861 
3875 void snaplock::forward_message_to_leader(snap::snap_communicator_message & message)
3876 {
3877  // we are not a leader, we work as a proxy by forwarding the
3878  // message to a leader, we add our trail so the LOCKED and
3879  // other messages can be proxied back
3880  //
3881  // Note: using the get_sent_from_server() means that we may not
3882  // even see the return message, it may be proxied to another
3883  // server directly or through another route
3884  //
3885  message.set_service("snaplock");
3886  message.add_parameter("lock_proxy_server_name", message.get_sent_from_server());
3887  message.add_parameter("lock_proxy_service_name", message.get_sent_from_service());
3888 
3889  f_next_leader = (f_next_leader + 1) % f_leaders.size();
3890  message.set_server(f_leaders[f_next_leader]->get_name());
3891 
3892  send_message(message);
3893 }
3894 
3895 
3903 void snaplock::cleanup()
3904 {
3905  time_t next_timeout(std::numeric_limits<time_t>::max());
3906 
3907  // when we receive LOCK requests before we have leaders elected, they
3908  // get added to our cache, so do some cache clean up when not empty
3909  //
3910  for(auto c(f_message_cache.begin()); c != f_message_cache.end(); )
3911  {
3912  if(c->f_timeout <= time(nullptr))
3913  {
3914  QString object_name;
3915  pid_t client_pid(0);
3916  time_t timeout(0);
3917  get_parameters(c->f_message, &object_name, &client_pid, &timeout, nullptr, nullptr);
3918 
3919  SNAP_LOG_WARNING("Lock on \"")(object_name)("\" / \"")(client_pid)("\" timed out before leaders were known.");
3920 
3921  QString const server_name(c->f_message.has_parameter("lock_proxy_server_name")
3922  ? c->f_message.get_parameter("lock_proxy_server_name")
3923  : c->f_message.get_sent_from_server());
3924  QString const entering_key(QString("%1/%2").arg(server_name).arg(client_pid));
3925 
3926  snap::snap_communicator_message lock_failed_message;
3927  lock_failed_message.set_command("LOCKFAILED");
3928  lock_failed_message.reply_to(c->f_message);
3929  lock_failed_message.add_parameter("object_name", object_name);
3930  lock_failed_message.add_parameter("key", entering_key);
3931  lock_failed_message.add_parameter("error", "timedout");
3932  send_message(lock_failed_message);
3933 
3934  c = f_message_cache.erase(c);
3935  }
3936  else
3937  {
3938  if(c->f_timeout < next_timeout)
3939  {
3940  next_timeout = c->f_timeout;
3941  }
3942  ++c;
3943  }
3944  }
3945 
3946  // remove any f_tickets that timed out
3947  //
3948  for(auto obj_ticket(f_tickets.begin()); obj_ticket != f_tickets.end(); )
3949  {
3950  bool try_activate(false);
3951  for(auto key_ticket(obj_ticket->second.begin()); key_ticket != obj_ticket->second.end(); )
3952  {
3953  if(key_ticket->second->timed_out())
3954  {
3955  key_ticket->second->lock_failed();
3956  if(key_ticket->second->timed_out())
3957  {
3958  // still timed out, remove it
3959  //
3960  key_ticket = obj_ticket->second.erase(key_ticket);
3961  try_activate = true;
3962  }
3963  }
3964  else
3965  {
3966  if(key_ticket->second->get_current_timeout() < next_timeout)
3967  {
3968  next_timeout = key_ticket->second->get_current_timeout();
3969  }
3970  ++key_ticket;
3971  }
3972  }
3973 
3974  if(obj_ticket->second.empty())
3975  {
3976  obj_ticket = f_tickets.erase(obj_ticket);
3977  }
3978  else
3979  {
3980  if(try_activate)
3981  {
3982  // something was erased, a new ticket may be first
3983  //
3984  activate_first_lock(obj_ticket->first);
3985  }
3986 
3987  ++obj_ticket;
3988  }
3989  }
3990 
3991  // remove any f_entering_tickets that timed out
3992  //
3993  for(auto obj_entering(f_entering_tickets.begin()); obj_entering != f_entering_tickets.end(); )
3994  {
3995  for(auto key_entering(obj_entering->second.begin()); key_entering != obj_entering->second.end(); )
3996  {
3997  if(key_entering->second->timed_out())
3998  {
3999  key_entering->second->lock_failed();
4000  if(key_entering->second->timed_out())
4001  {
4002  // still timed out, remove it
4003  //
4004  key_entering = obj_entering->second.erase(key_entering);
4005  }
4006  }
4007  else
4008  {
4009  if(key_entering->second->get_current_timeout() < next_timeout)
4010  {
4011  next_timeout = key_entering->second->get_current_timeout();
4012  }
4013  ++key_entering;
4014  }
4015  }
4016 
4017  if(obj_entering->second.empty())
4018  {
4019  obj_entering = f_entering_tickets.erase(obj_entering);
4020  }
4021  else
4022  {
4023  ++obj_entering;
4024  }
4025  }
4026 
4027  // got a new timeout?
4028  //
4029  if(next_timeout != std::numeric_limits<time_t>::max())
4030  {
4031  // out timeout is in seconds, snap_communicator expects
4032  // micro seconds so multiply by 1 million
4033  //
4034  // we add +1 to the second to avoid looping like crazy
4035  // if we timeout just around the "wrong" time
4036  //
4037  f_timer->set_timeout_date((next_timeout + 1) * 1000000LL);
4038  }
4039  else
4040  {
4041  f_timer->set_timeout_date(-1);
4042  }
4043 }
4044 
4045 
4064 snaplock_ticket::ticket_id_t snaplock::get_last_ticket(QString const & object_name)
4065 {
4067 
4068  // Note: There is no need to check the f_entering_tickets list
4069  // since that one does not yet have any ticket number assigned
4070  // and thus the maximum there would return 0 every time
4071  //
4072  auto obj_ticket(f_tickets.find(object_name));
4073  if(obj_ticket != f_tickets.end())
4074  {
4075  // note:
4076  // the std::max_element() algorithm would require many more
4077  // get_ticket_number() when our loop uses one per ticket max.
4078  //
4079  for(auto key_ticket : obj_ticket->second)
4080  {
4081  snaplock_ticket::ticket_id_t const ticket_number(key_ticket.second->get_ticket_number());
4082  if(ticket_number > last_ticket)
4083  {
4084  last_ticket = ticket_number;
4085  }
4086  }
4087  }
4088 
4089  return last_ticket;
4090 }
4091 
4092 
4103 void snaplock::set_ticket(QString const & object_name, QString const & key, snaplock_ticket::pointer_t ticket)
4104 {
4105  f_tickets[object_name][key] = ticket;
4106 }
4107 
4108 
4120 snaplock_ticket::key_map_t const snaplock::get_entering_tickets(QString const & object_name)
4121 {
4122  auto const it(f_entering_tickets.find(object_name));
4123  if(it == f_entering_tickets.end())
4124  {
4125  return snaplock_ticket::key_map_t();
4126  }
4127 
4128  return it->second;
4129 }
4130 
4131 
4139 void snaplock::lock_exiting(snap::snap_communicator_message & message)
4140 {
4141  msg_lock_exiting(message);
4142 }
4143 
4144 
4145 
4153 void snaplock::tool_message(snap::snap_communicator_message const & message)
4154 {
4155  SNAP_LOG_TRACE("tool received message [")(message.to_message())("] for ")(f_server_name);
4156 
4157  QString const command(message.get_command());
4158 
4159  switch(command[0].unicode())
4160  {
4161  case 'H':
4162  if(command == "HELP")
4163  {
4164  // Snap! Communicator is asking us about the commands that we support
4165  //
4166  snap::snap_communicator_message reply;
4167  reply.set_command("COMMANDS");
4168 
4169  // list of commands understood by service
4170  // (many are considered to be internal commands... users
4171  // should look at the LOCK and UNLOCK messages only)
4172  //
4173  reply.add_parameter("list", "CLUSTERDOWN,CLUSTERUP,HELP,QUITTING,READY,STOP,TICKETLIST,UNKNOWN");
4174 
4175  send_message(reply);
4176  return;
4177  }
4178  break;
4179 
4180  case 'Q':
4181  if(command == "QUITTING")
4182  {
4183  // If we received the QUITTING command, then somehow we sent
4184  // a message to Snap! Communicator, which is already in the
4185  // process of quitting... we should get a STOP too, but we
4186  // can just quit ASAP too
4187  //
4188  stop(true);
4189  return;
4190  }
4191  break;
4192 
4193  case 'R':
4194  if(command == "READY")
4195  {
4196  if(f_opt.is_defined("list"))
4197  {
4198  snap::snap_communicator_message list_message;
4199  list_message.set_command("LISTTICKETS");
4200  list_message.set_service("snaplock");
4201  list_message.set_server(f_server_name);
4202  list_message.add_parameter("cache", "no");
4203  list_message.add_parameter("transmission_report", "failure");
4204  send_message(list_message);
4205  }
4206  return;
4207  }
4208  break;
4209 
4210  case 'S':
4211  if(command == "STOP")
4212  {
4213  // Someone is asking us to leave
4214  //
4215  stop(false);
4216  return;
4217  }
4218  break;
4219 
4220  case 'T':
4221  if(command == "TICKETLIST")
4222  {
4223  // received the answer to our LISTTICKETS request
4224  //
4225  ticket_list(message);
4226  stop(false);
4227  return;
4228  }
4229  else if(command == "TRANSMISSIONREPORT")
4230  {
4231  QString const status(message.get_parameter("status"));
4232  if(status == "failed")
4233  {
4234  SNAP_LOG_ERROR("the transmission of our TICKLIST message failed to travel to a snaplock service");
4235  stop(false);
4236  }
4237  return;
4238  }
4239  break;
4240 
4241  case 'U':
4242  if(command == "UNKNOWN")
4243  {
4244  // we sent a command that Snap! Communicator did not understand
4245  //
4246  SNAP_LOG_ERROR("we sent unknown command \"")(message.get_parameter("command"))("\" and probably did not get the expected result (2).");
4247  return;
4248  }
4249  break;
4250 
4251  }
4252 
4253  // unknown commands get reported and process goes on
4254  //
4255  SNAP_LOG_ERROR("unsupported command \"")(command)("\" was received on the connection with Snap! Communicator.");
4256  {
4257  snap::snap_communicator_message reply;
4258  reply.set_command("UNKNOWN");
4259  reply.add_parameter("command", command);
4260  send_message(reply);
4261  }
4262 
4263  return;
4264 }
4265 
4266 
4271 void snaplock::ticket_list(snap::snap_communicator_message const & message)
4272 {
4273  QString const list(message.get_parameter("list"));
4274 
4275  // add newlines for people who have TRACE mode would otherwise have
4276  // a hard time to find the actual list
4277  //
4278  if(list.isEmpty())
4279  {
4280  // TODO: add a --quiet command line option
4281  //
4282  std::cout << std::endl << "...no locks found..." << std::endl;
4283  }
4284  else
4285  {
4286  std::cout << std::endl << list << std::endl;
4287  }
4288 }
4289 
4290 
4291 QString snaplock::serialized_tickets()
4292 {
4293  QString result;
4294 
4295  for(auto const & obj_ticket : f_tickets)
4296  {
4297  for(auto const & key_ticket : obj_ticket.second)
4298  {
4299  result += key_ticket.second->serialize();
4300  result += QChar('\n');
4301  }
4302  }
4303 
4304  return result;
4305 }
4306 
4307 
4308 
4309 }
4310 // snaplock namespace
4311 // vim: ts=4 sw=4 et
static ticket_id_t const NO_TICKET
Definition: snaplock.h:215
advgetopt::option const g_options[]
Definition: snaplock.cpp:126
std::shared_ptr< computer_t > pointer_t
Definition: snaplock.h:330
#define SNAPLOCK_VERSION_STRING
Definition: version.h.in:29
std::vector< message_cache > vector_t
Definition: snaplock.h:421
snaplock(int argc, char *argv[])
Initializes a snaplock object.
Definition: snaplock.cpp:596
Handle the locks timeout.
Definition: snaplock.h:90
Handle snaplock command line commands.
Definition: snaplock.h:176
advgetopt::options_environment const g_options_environment
Definition: snaplock.cpp:192
std::shared_ptr< snaplock_ticket > pointer_t
Definition: snaplock.h:207
std::map< QString, pointer_t > map_t
Definition: snaplock.h:331
Handle the SIGINT Unix signal.
Definition: snaplock.h:69
Handle the SIGUSR2 Unix signal.
Definition: snaplock.h:132
Handle the SIGUSR1 Unix signal.
Definition: snaplock.h:111
std::map< QString, pointer_t > key_map_t
Definition: snaplock.h:209
Handle messages from the Snap Communicator server.
Definition: snaplock.h:153

This document is part of the Snap! Websites Project.

Copyright by Made to Order Software Corp.

Syndicate content

Snap! Websites
An Open Source CMS System in C++

Contact Us Directly