Emergent Chief



    One of many widespread methods utilized in peer-to-peer programs is to
    order cluster nodes in keeping with their ‘age’. The oldest member of
    the cluster performs the function of the coordinator for the cluster.
    The coordinator is answerable for deciding on membership modifications
    in addition to making choices corresponding to the place
    Mounted Partitions ought to be positioned
    throughout cluster nodes.

    To kind the cluster,
    one of many cluster nodes acts as a seed node or an introducer node.
    All of the cluster nodes be a part of the cluster by contacting the seed node.

    Each cluster node is configured with the seed node deal with.
    When a cluster node is began, it tries to contact the seed node
    to affix the cluster.

    class ClusterNode…

      MembershipService membershipService;
      public void begin(Config config) {
          this.membershipService =  new MembershipService(config.getListenAddress());
          membershipService.be a part of(config.getSeedAddress());

    The seed node might be any of the cluster nodes. It is configured with its personal
    deal with because the seed node deal with and is the primary node that’s began.
    It instantly begins accepting requests. The age of the seed node is 1.

    class MembershipService…

      Membership membership;
      public void be a part of(InetAddressAndPort seedAddress) {
          int maxJoinAttempts = 5;
          for(int i = 0; i < maxJoinAttempts; i++){
              strive {
              } catch (Exception e) {
                  logger.information("Be part of try " + i + "from " + selfAddress + " to " + seedAddress + " failed. Retrying");
          throw new JoinFailedException("Unable to affix the cluster after " + maxJoinAttempts + " makes an attempt");
      non-public void joinAttempt(InetAddressAndPort seedAddress) throws ExecutionException, TimeoutException {
          if (selfAddress.equals(seedAddress)) {
              int membershipVersion = 1;
              int age = 1;
              updateMembership(new Membership(membershipVersion, Arrays.asList(new Member(selfAddress, age, MemberStatus.JOINED))));
          lengthy id = this.messageId++;
          CompletableFuture<JoinResponse> future = new CompletableFuture<>();
          JoinRequest message = new JoinRequest(id, selfAddress);
          pendingRequests.put(id, future);
          community.ship(seedAddress, message);
          JoinResponse joinResponse = Uninterruptibles.getUninterruptibly(future, 5, TimeUnit.SECONDS);
      non-public void begin() {
          logger.information(selfAddress + " joined the cluster. Membership=" + membership);
      non-public void updateMembership(Membership membership) {
          this.membership  = membership;

    There may be a couple of seed node. However seed nodes begin accepting
    requests solely after they themselves be a part of the cluster. Additionally the cluster
    will probably be practical if the seed node is down, however no new nodes will probably be in a position
    so as to add to the cluster.

    Non seed nodes then ship the be a part of request to the seed node.
    The seed node handles the be a part of request by creating a brand new member report
    and assigning its age.
    It then updates its personal membership record and sends messages to all of the
    current members with the brand new membership record.
    It then waits to make it possible for the response is
    returned from each node, however will ultimately return the be a part of response
    even when the response is delayed.

    class MembershipService…

      public void handleJoinRequest(JoinRequest joinRequest) {
      non-public void handleNewJoin(JoinRequest joinRequest) {
          Checklist<Member> existingMembers = membership.getLiveMembers();
          ResultsCollector resultsCollector = broadcastMembershipUpdate(existingMembers);
          JoinResponse joinResponse = new JoinResponse(joinRequest.messageId, selfAddress, membership);
          resultsCollector.whenComplete((response, exception) -> {
              logger.information("Sending be a part of response from " + selfAddress + " to " + joinRequest.from);
              community.ship(joinRequest.from, joinResponse);

    class Membership…

      public Membership addNewMember(InetAddressAndPort deal with) {
          var newMembership = new ArrayList<>(liveMembers);
          int age = yongestMemberAge() + 1;
          newMembership.add(new Member(deal with, age, MemberStatus.JOINED));
          return new Membership(model + 1, newMembership, failedMembers);
      non-public int yongestMemberAge() {
          return liveMembers.stream().map(m -> m.age).max(Integer::evaluate).orElse(0);

    If a node which was already a part of the cluster is attempting to rejoin
    after a crash, the failure detector state associated to that member is

    class MembershipService…

      non-public void handlePossibleRejoin(JoinRequest joinRequest) {
          if (membership.isFailed(joinRequest.from)) {
              //member rejoining
              logger.information(joinRequest.from  + " rejoining the cluster. Eradicating it from failed record");

    It is then added as a brand new member. Every member must be recognized
    uniquely. It may be assigned a singular identifier at startup.
    This then offers a degree of reference that makes it attainable to
    examine whether it is an current cluster node that’s rejoining.

    The membership class maintains the record of stay members in addition to
    failed members. The members are moved from stay to failed record
    in the event that they cease sending HeartBeat as defined within the
    failure detection part.

    class Membership…

      public class Membership {
          Checklist<Member> liveMembers = new ArrayList<>();
          Checklist<Member> failedMembers = new ArrayList<>();
          public boolean isFailed(InetAddressAndPort deal with) {
              return failedMembers.stream().anyMatch(m -> m.deal with.equals(deal with));

    Sending membership updates to all the prevailing members

    Membership updates are despatched to all the opposite nodes concurrently.
    The coordinator additionally wants to trace whether or not all of the members
    efficiently obtained the updates.

    A typical approach is to ship a a technique request to all nodes
    and count on an acknowledgement message.
    The cluster nodes ship acknowledgement messages to the coordinator
    to verify receipt of the membership replace.
    A ResultCollector object can observe receipt of all of the
    messages asynchronously, and is notified each time
    an acknowledgement is obtained for a membership replace.
    It completes its future as soon as the anticipated
    acknowledgement messages are obtained.

    class MembershipService…

      non-public ResultsCollector broadcastMembershipUpdate(Checklist<Member> existingMembers) {
          ResultsCollector resultsCollector = sendMembershipUpdateTo(existingMembers);
          resultsCollector.orTimeout(2, TimeUnit.SECONDS);
          return resultsCollector;
      Map<Lengthy, CompletableFuture> pendingRequests = new HashMap();
      non-public ResultsCollector sendMembershipUpdateTo(Checklist<Member> existingMembers) {
          var otherMembers = otherMembers(existingMembers);
          ResultsCollector collector = new ResultsCollector(otherMembers.dimension());
          if (otherMembers.dimension() == 0) {
              return collector;
          for (Member m : otherMembers) {
              lengthy id = this.messageId++;
              CompletableFuture<Message> future = new CompletableFuture();
              future.whenComplete((end result, exception)->{
                  if (exception == null){
              pendingRequests.put(id, future);
              community.ship(m.deal with, new UpdateMembershipRequest(id, selfAddress, membership));
          return collector;

    class MembershipService…

      non-public void handleResponse(Message message) {
      non-public void completePendingRequests(Message message) {
          CompletableFuture requestFuture = pendingRequests.get(message.messageId);
          if (requestFuture != null) {

    class ResultsCollector…

      class ResultsCollector {
          int totalAcks;
          int receivedAcks;
          CompletableFuture future = new CompletableFuture();
          public ResultsCollector(int totalAcks) {
              this.totalAcks = totalAcks;
          public void ackReceived() {
              if (receivedAcks == totalAcks) {
          public void orTimeout(int time, TimeUnit unit) {
              future.orTimeout(time, unit);
          public void whenComplete(BiConsumer<? tremendous Object, ? tremendous Throwable> func) {
          public void full() {

    To see how ResultCollector works, take into account a cluster
    with a set of nodes: let’s name them athens, byzantium and cyrene.
    athens is performing as a coordinator. When a brand new node – delphi –
    sends a be a part of request to athens, athens updates the membership and sends the updateMembership request
    to byantium and cyrene. It additionally creates a ResultCollector object to trace
    acknowledgements. It data every acknowledgement obtained
    with ResultCollector. When it receives acknowledgements from each
    byzantium and cyrene, it then responds to delphi.

    Frameworks like Akka
    use Gossip Dissemination and Gossip Convergence
    to trace whether or not updates have reached all cluster nodes.

    An instance state of affairs

    Contemplate one other three nodes.
    Once more, we’ll name them athens, byzantium and cyrene.
    athens acts as a seed node; the opposite two nodes are configured as such.

    When athens begins, it detects that it’s itself the seed node.
    It instantly initializes the membership record and begins
    accepting requests.

    When byzantium begins, it sends a be a part of request to athens.
    Word that even when byzantium begins earlier than athens, it is going to preserve
    attempting to ship be a part of requests till it could actually connect with athens.
    Athens lastly provides byzantium to the membership record and sends the
    up to date membership record to byzantium. As soon as byzantium receives
    the response from athens, it could actually begin accepting requests.

    With all-to-all heartbeating, byzantium begins sending heartbeats
    to athens, and athens sends heartbeat to byzantium.

    cyrene begins subsequent. It sends be a part of requests to athens.
    Athens updates the membership record and sends up to date membership
    record to byantium. It then sends the be a part of response with
    the membership record to cyrene.

    With all to all heartbeating, cyrene, athens and byzantium
    all ship heartbeats to one another.

    Dealing with lacking membership updates

    It is attainable that some cluster nodes miss membership updates.
    There are two options to deal with this downside.

    If all members are sending heartbeat to all different members,
    the membership model quantity may be despatched as a part of the heartbeat.
    The cluster node that handles the heartbeat can
    then ask for the most recent membership.
    Frameworks like Akka which use Gossip Dissemination
    observe convergence of the gossiped state.

    class MembershipService…

      non-public void handleHeartbeatMessage(HeartbeatMessage message) {
          if (isCoordinator() && message.getMembershipVersion() < this.membership.getVersion()) {
              membership.getMember(message.from).ifPresent(member -> {
                  logger.information("Membership model in " + selfAddress + "=" + this.membership.model + " and in " + message.from + "=" + message.getMembershipVersion());
                  logger.information("Sending membership replace from " + selfAddress + " to " + message.from);

    Within the above instance, if byzantium misses the membership replace
    from athens, will probably be detected when byzantine sends the heartbeat
    to athens. athens can then ship the most recent membership to byzantine.

    Alternatively every cluster node can examine the lastest membership record periodically,
    – say each one second – with different cluster nodes.
    If any of the nodes determine that their member record is outdated,
    it could actually then ask for the most recent membership record so it could actually replace it.
    To have the ability to evaluate membership lists, typically
    a model quantity is maintained and incremented everytime
    there’s a change.

    Failure Detection

    Every cluster additionally runs a failure detector to examine if
    heartbeats are lacking from any of the cluster nodes.
    In a easy case, all cluster nodes ship heartbeats to all the opposite nodes.
    However solely the coordinator marks the nodes as failed and
    communicates the up to date membership record to all the opposite nodes.
    This makes positive that not all nodes unilaterally deciding if
    another nodes have failed. Hazelcast is an instance
    of this implementation.

    class MembershipService…

      non-public boolean isCoordinator() {
          Member coordinator = membership.getCoordinator();
          return coordinator.deal with.equals(selfAddress);
      TimeoutBasedFailureDetector<InetAddressAndPort> failureDetector
              = new TimeoutBasedFailureDetector<InetAddressAndPort>(Length.ofSeconds(2));
      non-public void checkFailedMembers(Checklist<Member> members) {
          if (isCoordinator()) {
          } else {
              //if failed member consists of coordinator, then examine if this node is the subsequent coordinator.
      void removeFailedMembers() {
          Checklist<Member> failedMembers = checkAndGetFailedMembers(membership.getLiveMembers());
          if (failedMembers.isEmpty()) {

    Avoiding all-to-all heartbeating

    All-to-all heartbeating will not be possible in massive clusters.
    Usually every node will obtain heartbeats from
    just a few different nodes. If a failure is detected,
    it is broadcasted to all the opposite nodes
    together with the coordinator.

    For instance in Akka a node ring is fashioned
    by sorting community addresses and every cluster node sends
    heartbeats to just a few cluster nodes.
    Ignite arranges all of the nodes within the cluster
    in a hoop and every node sends heartbeat solely to the node subsequent
    to it.
    Hazelcast makes use of all-to-all heartbeat.

    Any membership modifications, due to nodes being added or
    node failures have to be broadcast to all the opposite
    cluster nodes. A node can join to each different node to
    ship the required data.
    Gossip Dissemination can be utilized
    to broadcast this data.

    Cut up Mind Scenario

    Though a single coordinator node decides when to
    mark one other nodes as down, there isn’t any express leader-election
    taking place to pick out which node acts as a coordinator.
    Each cluster node expects a heartbeat from the prevailing
    coordinator node; if it would not get a heartbeat in time,
    it could actually then declare to be the coordinator and take away the prevailing
    coordinator from the memberlist.

    class MembershipService…

      non-public void claimLeadershipIfNeeded(Checklist<Member> members) {
          Checklist<Member> failedMembers = checkAndGetFailedMembers(members);
          if (!failedMembers.isEmpty() && isOlderThanAll(failedMembers)) {
              var newMembership = membership.failed(failedMembers);
      non-public boolean isOlderThanAll(Checklist<Member> failedMembers) {
          return failedMembers.stream().allMatch(m -> m.age < thisMember().age);
      non-public Checklist<Member> checkAndGetFailedMembers(Checklist<Member> members) {
          Checklist<Member> failedMembers = members
                  .filter(member -> !member.deal with.equals(selfAddress) && failureDetector.isMonitoring(member.deal with) && !failureDetector.isAlive(member.deal with))
                  .map(member -> new Member(member.deal with, member.age, member.standing)).gather(Collectors.toList());
              failureDetector.take away(member.deal with);
              logger.information(selfAddress + " marking " + member.deal with + " as DOWN");
          return failedMembers;

    This will create a state of affairs the place there are two or extra subgroups
    fashioned in an current cluster, every contemplating the others
    to have failed. That is known as split-brain downside.

    Contemplate a 5 node cluster, athens, byzantium, cyrene, delphi and euphesus.
    If athens receives heartbeats from dephi and euphesus, however
    stops getting heartbeats from byzantium, cyrene, it marks
    each byzantium and cyrene as failed.

    byzantium and cyrene might ship heartbeats to one another,
    however cease receiving heartbeats from cyrene, dephi and euphesus.
    byzantium being the second oldest member of the cluster,
    then turns into the coordinator.
    So two separate clusters are fashioned one with athens as
    the coordinator and the opposite with byzantium because the coordinator.

    Dealing with cut up mind

    One widespread option to deal with cut up mind subject is to
    examine whether or not there are sufficient members to deal with any
    consumer request, and reject the request if there
    aren’t sufficient stay members. For instance,
    Hazelcast permits you to configure
    minimal cluster dimension to execute any consumer request.

    public void handleClientRequest(Request request) {
        if (!hasMinimumRequiredSize()) {
            throw new NotEnoughMembersException("Requires minium 3 members to serve the request");
    non-public boolean hasMinimumRequiredSize() {
        return membership.getLiveMembers().dimension() > 3;

    The half which has the vast majority of the nodes,
    continues to function, however as defined within the Hazelcast
    documentation, there’ll at all times be a
    time window
    through which this safety has but to come back into impact.

    The issue may be averted if cluster nodes are
    not marked as down except it is assured that they
    will not trigger cut up mind.
    For instance, Akka recommends
    that you simply don’t have nodes
    marked as down
    via the failure detector; you’ll be able to as a substitute use its
    cut up mind resolver.

    Recovering from cut up mind

    The coordinator runs a periodic job to examine if it
    can connect with the failed nodes.
    If a connection may be established, it sends a particular
    message indicating that it desires to set off a
    cut up mind merge.

    If the receiving node is the coordinator of the subcluster,
    it is going to examine to see if the cluster that’s initiating
    the request is a part of the minority group. Whether it is,
    it is going to ship a merge request. The coordinator of the minority group,
    which receives the merge request, will then execute
    the merge request on all of the nodes within the minority sub group.

    class MembershipService…

      splitbrainCheckTask = taskScheduler.scheduleWithFixedDelay(() -> {
              1, 1, TimeUnit.SECONDS);

    class MembershipService…

      non-public void searchOtherClusterGroups() {
          if (membership.getFailedMembers().isEmpty()) {
          Checklist<Member> allMembers = new ArrayList<>();
              if (isCoordinator()) {
              for (Member member : membership.getFailedMembers()) {
                  logger.information("Sending SplitBrainJoinRequest to " + member.deal with);
                  community.ship(member.deal with, new SplitBrainJoinRequest(messageId++, this.selfAddress, membership.model, membership.getLiveMembers().dimension()));

    If the receiving node is the coordinator of the bulk subgroup, it asks the
    sending coordinator node to merge with itself.

    class MembershipService…

      non-public void handleSplitBrainJoinMessage(SplitBrainJoinRequest splitBrainJoinRequest) {
          logger.information(selfAddress + " Dealing with SplitBrainJoinRequest from " + splitBrainJoinRequest.from);
          if (!membership.isFailed(splitBrainJoinRequest.from)) {
          if (!isCoordinator()) {
          if(splitBrainJoinRequest.getMemberCount() < membership.getLiveMembers().dimension()) {
              //requesting node ought to be a part of this cluster.
              logger.information(selfAddress + " Requesting " + splitBrainJoinRequest.from + " to rejoin the cluster");
              community.ship(splitBrainJoinRequest.from, new SplitBrainMergeMessage(splitBrainJoinRequest.messageId, selfAddress));
          } else {
              //we have to be a part of the opposite cluster
      non-public void mergeWithOtherCluster(InetAddressAndPort otherClusterCoordinator) {
          handleMerge(new MergeMessage(messageId++, selfAddress, otherClusterCoordinator)); //provoke merge on this node.
      non-public void askAllLiveMembersToMergeWith(InetAddressAndPort mergeToAddress) {
          Checklist<Member> liveMembers = membership.getLiveMembers();
          for (Member m : liveMembers) {
              community.ship(m.deal with, new MergeMessage(messageId++, selfAddress, mergeToAddress));

    Within the instance mentioned within the above part, when athens
    can talk with byzantium, it is going to ask byzantium to merge
    with itself.

    The coordinator of the smaller subgroup,
    then asks all of the cluster nodes
    inside its group to set off a merge.
    The merge operation shuts down and rejoins the cluster
    nodes to the coordinator of the bigger group.

    class MembershipService…

      non-public void handleMerge(MergeMessage mergeMessage) {
          logger.information(selfAddress + " Merging with " + mergeMessage.getMergeToAddress());
          //be a part of the cluster once more via the opposite cluster's coordinator
          taskScheduler.execute(()-> {
              be a part of(mergeMessage.getMergeToAddress());

    Within the instance above, byzantium and cyrene shutdown and
    rejoin athens to kind a full cluster once more.

    Comparability with Chief and Followers

    It is helpful to check this sample with that of
    Chief and Followers. The leader-follower
    setup, as utilized by patterns like Constant Core,
    doesn’t perform except the chief is chosen
    by operating an election. This ensures that the
    Quorum of cluster nodes have
    an settlement about who the chief is. Within the worst case
    state of affairs, if an settlement is not reached, the system will
    be unavailable to course of any requests.
    In different phrases, it prefers consistency over availability.

    The emergent chief, alternatively will at all times
    have some cluster node performing as a pacesetter for processing
    consumer requests. On this case, availability is most popular
    over consistency.


    Please enter your comment!
    Please enter your name here