开发者

zookeeper的Leader选举机制源码解析

开发者 https://www.devze.com 2023-04-01 10:36 出处:网络 作者: 京东物流
目录zookeeper01Leader选举机制02Leader选举集群配置03Leader选举流程3.1 Leader选举采用多层队列架构04解析代码入口类05选举流程代码解析06选举核心逻辑07总结zookeeper
目录
  • zookeeper
  • 01Leader选举机制
  • 02Leader选举集群配置
  • 03Leader选举流程
    • 3.1 Leader选举采用多层队列架构
  • 04解析代码入口类
    • 05选举流程代码解析
      • 06选举核心逻辑
        • 07总结

          zookeeper

          一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。

          01Leader选举机制

          Leader选举机制采用半数选举算法。

          每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

          02Leader选举集群配置

          • 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
          • 修改zoo.cfg文件,修改值如下:
          【plain】
          zoo1.cfg文件内容:
          dataDir=/export/data/zookeeper-1
          clientPort=2181
          server.1=127.0.0.1:2001:3001
          server.2=127.0.0.1:2002:3002:participant
          server.3=127.0.0.1:2003:3003:participant
          server.4=127.0.0.1:2004:3004:observer
          zoo2.cfg文件内容:
          dataDir=/export/data/zookeeper-2
          clientPort=2182
          server.1=127.0.0.1:2001:3001
          server.2=127.0.0.1:2002:3002:participant
          server.3=127.0.0.1:2003:3003:participant
          server.4=127.0.0.1:2004:3004:observer
          zoo3.cfg文件内容:
          dataDir=/export/data/zookeeper-3
          clientPort=2183
          server.1=127.0.0.1:2001:3001
          server.2=127.0.0.1:2002:3002:participant
          server.3=127.0.0.1:2003:3003:participant
          server.4=127.0.0.1:2004:3004:observer
          zoo4.cfg文件内容:
          dataDir=/export/data/zookeeper-4
          clientPort=2184
          server.1=127.0.0.1:2001:3001
          server.2=127.0.0.1:2002:3002:participant
          server.3=127.0.0.1:2003:3003:participant
          server.4=127.0.0.1:2004:3004:observer
          
          • server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识
          • participant默认参与选举标识,可不写. observer不参与选举

          4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

          • 启动三个zookeeper实例:
          • bin/zkServer.sh start conf/zoo1.cfg
          • bin/zkServer.sh start conf/zoo2.cfg
          • bin/zkServer.sh start conf/zoo3.cfg
          • 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。

          03Leader选举流程

          zookeeper的Leader选举机制源码解析

          图1 第一轮到第二轮投票流程

          前提:

          设定票据数据格式vote(sid,zxid,epoch)

          • sid是Server ID每台服务的唯一标识,是myid文件内容;
          • zxid是数据事务id号;
          • epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

          按照顺序启动sid=1,sid=2节点

          第一轮投票:

          • sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;
          • sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;
          • sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);
          • sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,开发者_C教程0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;
          • 第一轮投票选举结束。

          第二轮投票:

          • sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;
          • sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;
          • sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;
          • sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。

          这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

          3.1 Leader选举采用多层队列架构

          zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。

          zookeeper的Leader选举机制源码解析

          图2 多层队列上下关系交互流程图

          04解析代码入口类

          通过查看zkServer.sh文件内容找到服务启动类:

          org.apache.zookeeper.server.quorum.QuorumPeerMain

          05选举流程代码解析

          • 加载配置文件QuorumPeerConfig.parse(path);

          针对 Leader选举关键配置信息如下:

          • 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。
            • 设置peerType当前应用是否参与选举
          • new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.
          【Java】
          public QuorumMaj(Properties props) throws ConfigException {
                  for (Entry<Object, Object> entry : props.entrySet()) {
                      String key = entry.getKey().toString();
                      String value = entry.getValue().toString();
                      //读取集群配置文件中的server.开头的应用实例配置信息
                      if (key.startsWith("server.")) {
                          int dot = key.indexOf('.');
                          long sid = Long.parseLong(key.substring(dot + 1));
                          QuorumServer qs = new QuorumServer(sid, value);
                          allMembers.put(Long.valueOf(sid), qs);
                          if (qs.type == LearnerType.PARTICIPANT)
          //应用实例绑定的角色为PARTICIPANT意为参与选举
                              votingMembers.put(Long.valueOf(sid), qs);
                          else {
                              //观察者成员
                              observingMembers.put(Long.valueOf(sid), qs);
                          }
                      } else if (key.equals("version")) {
                          version = Long.parseLong(value, 16);
                      }
                  }
                  //过半基数
                  half = votingMembers.size() / 2;
              }
          

          QuorumPeerMain.runFromConfig(config) 启动服务;

          QuorumPeer.startLeaderElection() 开启选举服务;

          • 设置当前选票new Vote(sid,zxid,epoch)
          【plain】
          synchronized public void startLeaderElection(){
          try {
                     if (getPeerState() == ServerState.LOOKING) {
                         //首轮:当前节点默认投票对象为自己
                         currentVote = new Vote(myid, getLastLoggedzxid(), getCurrentEpoch());
                     }
                 } catch(IOException e) {
                     RuntimeException re = new RuntimeException(e.getMessage());
                     re.setStackTrace(e.getStackTrace());
                     throw re;
                 }
          //........
          }
          
          • 创建选举管理类:QuorumCnxnManager;
          • 初始化recvQueue<Message(sid,ByteBuffer)>接收投票队列(第二层传输队列);
          • 初始化queueSendMap<sid,queue>按sid发送投票队列(第二层传输队列);
          • 初始化senderWorkerMap<sid,Sendworker>发送投票工作线程容器,表示着与sid投票编程客栈节点已连接;
          • 初始化选举监听线程类QuorumCnxnManager.Listener。
          【Java】
          //QuorumPeer.createCnxnManager()
          public QuorumCnxManager(QuorumPeer self,
                                  final long mySid,
                                  Map&lt;Long,QuorumPeer.QuorumServer&gt; view,
                                  QuorumAuthServer authServer,
                                  QuorumAuthLearner authLearner,
                                  int socketTimeout,
                                  boolean listenOnAllIPs,
                                  int quorumCnxnThreadsSize,
                                  boolean quorumSaslAuthEnabled) {
              //接收投票队列(第二层传输队列)
              this.recvQueue = new ArrayblockingQueue&lt;Message&gt;(RECV_CAPACITY);
              //按sid发送投票队列(第二层传输队列)
              this.queueSendMap = new ConcurrentHashMap&lt;Long, ArrayBlockingQueue&lt;ByteBuffer&gt;&gt;();
              //发送投票工作线程容器,表示着与sid投票节点已连接 
              this.senderWorkerMap = new ConcurrentHashMap&lt;Long, SendWorker&gt;();
              this.lastMessageSent = new ConcurrentHashMap&lt;Long, ByteBuffer&gt;();
              String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
              if(cnxToValue != null){
                  this.cnxTO = Integer.parseInt(cnxToValue);
              }
              this.self = self;
              this.mySid = mySid;
              this.socketTimeout = socketTimeout;
              this.view = view;
              this.listenOnAllIPs = listenOnAllIPs;
              initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
                      quorumSaslAuthEnabled);
              // Starts listener thread that waits for connection requests 
              //创建选举监听线程 接收选举投票请求
              listener = new Listener();
              listener.setName("QuorumPeerListener");
          }
          //QuorumPeer.createElectionAlgorithm
          protected Election createElectionAlgorithm(int electionAlgorithm){
              Election le=null;
              //TODO: use a factory rather than a switch
              switch (electionAlgorithm) {
              case 0:
                  le = new LeaderElection(this);
                  break;
              case 1:
                  le = new AuthFastLeaderElection(this);
                  break;
              case 2:
                  le = new AuthFastLeaderElection(this, true);
                  break;
              case 3:
                  qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
                  QuorumCnxManager.Listener listener = qcm.listener;
                  if(listener != null){
                      listener.start();//启动选举监听线程
                      FastLeaderElection fle = new FastLeaderElection(this, qcm);
                      fle.start();
                      le = fle;
                  } else {
                      LOG.error("Null listener when initializing cnx manager");
                  }
                  break;
              default:
                  assert false;
              }
          return le;}
          
          • 开启选举监听线程QuorumCnxnManager.Listener;
          • 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap<sid,SendWorker>;
          • sid>self.sid才可以连接过来。
          【Java】
          //上面的listener.start()执行后,选择此方法
          public void run() {
              int numRetries = 0;
              InetSocketAddress addr;
              Socket client = null;
              while((!shutdown) && (numRetries < 3)){
                  try {
                      ss = new ServerSocket();
                      ss.setReuseAddress(true);
                      if (self.getQuorumListenOnAllIPs()) {
                          int port = self.getElectionAddress().getPort();
                          addr = new InetSocketAddress(port);
                      } else {
                          // Resolve hostname for this server in case the
                          // underlying ip address has changed.
                          self.recreateSocketAddresses(self.getId());
                          addr = self.getElectionAddress();
                      }
                      LOG.info("My election bind port: " + addr.toString());
                      setName(addr.toString());
                      ss.bind(addr);
                      while (!shutdown) {
                          client = ss.accept();
                          setSockOpts(client);
                          LOG.info("Received connection request "
                                  + client.getRemoteSocketAddress());
                          // Receive and handle the connection request
                          // asynchronously if the quorum sasl authentication is
                          // enabled. This is required because sasl server
                          // authentication process may take few seconds to finish,
                          // this may delay next peer connection requests.
                          if (quorumSaslAuthEnabled) {
                              receiveConnectionAsync(client);
                          } else {
          //接收连接信息
                              receiveConnection(client);
                          }
                          numRetries = 0;
                      }
                  } catch (IOException e) {
                      if (shutdown) {
                          break;
                      }
                      LOG.error("Exception while listening", e);
                      numRetries++;
                      try {
                          ss.close();
                          Thread.sleep(1000);
                      } catch (IOException ie) {
                          LOG.error("Error closing server socket", ie);
                      } catch (InterruptedException ie) {
                          LOG.error("Interrupted while sleeping. " +
                              "Ignoring exception", ie);
                      }
                      closeSocket(client);
                  }
              }
              LOG.info("Leaving listener编程");
              if (!shutdown) {
                  LOG.error("As I'm leaving the listener thread, "
                          + "I won't be able to participate in leader "
                          + "election any longer: "
                          + self.getElectionAddress());
              } else if (ss != null) {
                  // Clean up for shutdown.
                  try {
                      ss.close();
                  } catch (IOException ie) {
                      // Don't log an error for shutdown.
                      LOG.debug("Error closing server socket", ie);
                  }
              }
          }
          //代码执行路径:receiveConnection()->handleConnection(...)
          private void handleConnection(Socket sock, DataInputStream din)
                      throws IOException {
          //...省略
               if (sid < self.getId()) {
                      /*
                       * This replica might still believe that the connection to sid is
                       * up, so we have to shut down the workers before trying to open a
                       * new connection.
                       */
                      SendWorker sw = senderWorkerMap.get(sid);
                      if (sw != null) {
                          sw.finish();
                      }
                      /*
                       * Now we start a new connection
                       */
                      LOG.debug("Create new connection to server: {}", sid);
                      closeSocket(sock);
                      if (electionAddr != null) {
                          connectOne(sid, electionAddr);
                      } else {
                          connectOne(sid);
                      }
                  } else { // Otherwise start worker threads to receive data.
                      SendWorker sw = new SendWorker(sock, sid);
                      RecvWorker rw = new RecvWorker(sock, din, sid, sw);
                      sw.setRecv(rw);
                      SendWorker vsw = senderWorkerMap.get(sid);
                      if (vsw != null) {
                          vsw.finish();
                      }
            //存储连接信息<sid,SendWorker>
                      senderWorkerMap.put(sid, sw);
                      queueSendMap.putIfAbsent(sid,
                              new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
                      sw.start();
                      rw.start();
               }
          }
          
          • 创建FastLeaderElection快速选举服务;
          • 初始选票发送队列sendqueue(第一层队列)
          • 初始选票接收队列recvqueue(第一层队列)
          • 创建线程WorkerSender
          • 创建线程WorkerReceiver
          【Java】
          //FastLeaderElection.starter
          private void starter(QuorumPeer self, QuorumCnxManager manager) {
              this.self = self;
              proposedLeader = -1;
              proposedZxid = -1;
              //发送队列sendqueue(第一层队列)
              sendqueue = new LinkedBlockingQueue<ToSend>();
              //接收队列recvqueue(第一层队列)
              recvqueue = new LinkedBlockingQueue<Notification>();
              this.messenger = new Messenger(manager);
          }
          //new Messenger(manager)
          Messenger(QuorumCnxManager manager) {
              //创建线程WorkerSender
              this.ws = new Whttp://www.devze.comorkerSender(manager);
              this.wsThread = new Thread(this.ws,
                      "WorkerSender[myid=" + self.getId() + "]");
              this.wsThread.setDaemon(true);
              //创建线程WorkerReceiver
              this.wr = new WorkerReceiver(manager);
              this.wrThread = new Thread(this.wr,
                      "WorkerReceiver[myid=" + self.getId() + "]");
              this.wrThread.setDaemon(true);
          }
          
          • 开启WorkerSender和WorkerReceiver线程。

          WorkerSender线程自旋获取sendqueue第一层队列元素

          • sendqueue队列元素内容为相关选票信息详见ToSend类;
          • 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;
          • 不相同将sendqueue队列元素转储到queueSendMap<sid,queue>第二层传输队列中。
          【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
          //...
            public void run() {
              while (!stop) {
                  try {
                      ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                      if(m == null) continue;
            //将投票信息发送出去
                      process(m);
                  } catch (InterruptedException e) {
                      break;
                  }
              }
              LOG.info("WorkerSender is down");
            }
          }
          //QuorumCnxManager#toSend
          public void toSend(Long sid, ByteBuffer b) {
              /*
               * If sending message to myself, then simply enqueue it (loopbachttp://www.devze.comk).
               */
              if (this.mySid == sid) {
                   b.position(0);
                   addToRecvQueue(new Message(b.duplicate(), sid));
                  /*
                   * Otherwise send to the corresponding thread to send.
                   */
              } else {
                   /*
                    * Start a new connection if doesn't have one already.
                    */
                   ArrayBlockingQueue&lt;ByteBuffer&gt; bq = new ArrayBlockingQueue&lt;ByteBuffer&gt;(
                      SEND_CAPACITY);
                   ArrayBlockingQueue&lt;ByteBuffer&gt; oldq = queueSendMap.putIfAbsent(sid, bq);
                   //转储到queueSendMap&lt;sid,queue&gt;第二层传输队列中
                   if (oldq != null) {
                       addToSendQueue(oldq, b);
                   } else {
                       addToSendQueue(bq, b);
                   }
                   connectOne(sid);     
              }
          }
          

          WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。

          【Java】
          //WorkerReceiver
          public void run() {
              Message response;
              while (!stop) {
                // Sleeps on receive
                try {
                    //自旋获取recvQueue第二层传输队列元素
                    response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                    if(response == null) continue;
                    // The current protocol and two previous generations all send at least 28 bytes
                    if (response.buffer.capacity() &lt; 28) {
                        LOG.error("Got a short response: " + response.buffer.capacity());
                        continue;
                    }
                    //...
            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                   //第二层传输队列元素转存到recvqueue第一层队列中
                   recvqueue.offer(n);
                   //...
                }
              }
          //...
          }
          

          06选举核心逻辑

          • 启动线程QuorumPeer

          开始Leader选举投票makeLEStrategy().lookForLeader();

          sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。

          【plain】
          //QuorunPeer.run
          //...
          try {
             reconfigFlagClear();
              if (shuttingDownLE) {
                 shuttingDownLE = false;
                 startLeaderElection();
                 }
              //makeLEStrategy().lookForLeader() 发送投票
              setCurrentVote(makeLEStrategy().lookForLeader());
          } catch (Exception e) {
              LOG.warn("Unexpected exception", e);
              setPeerState(ServerState.LOOKING);
          }  
          //...
          //FastLeaderElection.lookLeader
          public Vote lookForLeader() throws InterruptedException {
          //...
            //向其他应用发送投票
          sendNotifications();
          //...
          }
          private void sendNotifications() {
              //获取应用节点
              for (long sid : self.getCurrentAndNextConfigVoters()) {
                  QuorumVerifier qv = self.getQuorumVerifier();
                  ToSend notmsg = new ToSend(ToSend.mType.notification,
                          proposedLeader,
                          proposedZxid,
                          logicalclock.get(),
                          QuorumPeer.ServerState.LOOKING,
                          sid,
                          proposedEpoch, qv.toString().getBytes());
                  if(LOG.isDebugEnabled()){
                      LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                            Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                            " (n.round), " + sid + " (recipient), " + self.getId() +
                            " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
                  }
                  //储存投票信息
                  sendqueue.offer(notmsg);
              }
          }
          class WorkerSender extends ZooKeeperThread {
              //...
              public void run() {
              while (!stop) {
                  try {
          //提取已储存的投票信息
                      ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                      if(m == null) continue;
                      process(m);
                  } catch (InterruptedException e) {
                      break;
                  }
              }
              LOG.info("WorkerSender is down");
            }
          //...
          }
          

          自旋recvqueue队列元素获取投票过来的选票信息:

          【Java】
          public Vote lookForLeader() throws InterrjsuptedException {
          //...
          /*
           * Loop in which we exchange notifications until we find a leader
           */
          while ((self.getPeerState() == ServerState.LOOKING) &amp;&amp;
                  (!stop)){
              /*
               * Remove next notification from queue, times out after 2 times
               * the termination time
               */
              //提取投递过来的选票信息
              Notification n = recvqueue.poll(notTimeout,
                      TimeUnit.MILLISECONDS);
          /*
           * Sends more notifications if haven't received enough.
           * Otherwise processes new notification.
           */
          if(n == null){
              if(manager.haveDelivered()){
                  //已全部连接成功,并且前一轮投票都完成,需要再次发起投票
                  sendNotifications();
              } else {
                  //如果未收到选票信息,manager.contentAll()自动连接其它socket节点
                  manager.connectAll();
              }
              /*
               * Exponential backoff
               */
              int tmpTimeOut = notTimeout*2;
              notTimeout = (tmpTimeOut &lt; maxNotificationInterval?
                      tmpTimeOut : maxNotificationInterval);
              LOG.info("Notification time out: " + notTimeout);
                   }
               //....
              }
            //...
          }
          
          【Java】
          //manager.connectAll()-&gt;connectOne(sid)-&gt;initiateConnection(...)-&gt;startConnection(...)
          private boolean startConnection(Socket sock, Long sid)
                  throws IOException {
              DataOutputStream dout = null;
              DataInputStream din = null;
              try {
                  // Use BufferedOutputStream to reduce the number of IP packets. This is
                  // important for x-DC scenarIOS.
                  BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
                  dout = new DataOutputStream(buf);
                  // Sending id and challenge
                  // represents protocol version (in other words - message type)
                  dout.writeLong(PROTOCOL_VERSION);
                  dout.writeLong(self.getId());
                  String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
                  byte[] addr_bytes = addr.getBytes();
                  dout.writeInt(addr_bytes.length);
                  dout.write(addr_bytes);
                  dout.flush();
                  din = new DataInputStream(
                          new BufferedInputStream(sock.getInputStream()));
              } catch (IOException e) {
                  LOG.warn("Ignoring exception reading or writing challenge: ", e);
                  closeSocket(sock);
                  return false;
              }
              // authenticate learner
              QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
              if (qps != null) {
                  // TODO - investigate why reconfig makes qps null.
                  authLearner.authenticate(sock, qps.hostname);
              }
              // If lost the challenge, then drop the new connection
              //保证集群中所有节点之间只有一个通道连接
              if (sid &gt; self.getId()) {
                  LOG.info("Have smaller server identifier, so dropping the " +
                          "connection: (" + sid + ", " + self.getId() + ")");
                  closeSocket(sock);
                  // Otherwise proceed with the connection
              } else {
                  SendWorker sw = new SendWorker(sock, sid);
                  RecvWorker rw = new RecvWorker(sock, din, sid, sw);
                  sw.setRecv(rw);
                  SendWorker vsw = senderWorkerMap.get(sid);
                  if(vsw != null)
                      vsw.finish();
                  senderWorkerMap.put(sid, sw);
                  queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue&lt;ByteBuffer&gt;(
                          SEND_CAPACITY));
                  sw.start();
                  rw.start();
                  return true;
              }
              return false;
          }
          

          如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap<sid,SendWorker>中。对应第2步中的sid<self.sid逻辑,保证集群中所有节点之间只有一个通道连接。

          zookeeper的Leader选举机制源码解析

          节点之间连接方式

          【Java】
          public Vote lookForLeader() throws InterruptedException {
          //...
              if (n.electionEpoch > logicalclock.get()) {
                  //当前选举周期小于选票周期,重置recvset选票池
                  //大于当前周期更新当前选票信息,再次发送投票
                  logicalclock.set(n.electionEpoch);
                  recvset.clear();
                  if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                          getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                      updateProposal(n.leader, n.zxid, n.peerEpoch);
                  } else {
                      updateProposal(getInitId(),
                              getInitLastLoggedZxid(),
                              getPeerEpoch());
                  }
                  sendNotifications();
              } else if (n.electionEpoch < logicalclock.get()) {
                  if(LOG.isDebugEnabled()){
                      LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                              + Long.toHexString(n.electionEpoch)
                              + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                  }
                  break;
              } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                      proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期
                  //接收的选票与当前选票PK成功后,替换当前选票
                  updateProposal(n.leader, n.zxid, n.peerEpoch);
                  sendNotifications();
              }
          //...
          }
          

          在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:

          • 判断当前选票和接收过来的选票周期是否一致
          • 大于当前周期更新当前选票信息,再次发送投票
          • 周期相等:当前选票信息和接收的选票信息进行PK
          【Java】
          //接收的选票与当前选票PK
          protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
                  LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                          Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
                  if(self.getQuorumVerifier().getWeight(newId) == 0){
                      return false;
                  }
                  /*
                   * We return true if one of the following three cases hold:
                   * 1- New epoch is higher
                   * 2- New epoch is the same as current epoch, but new zxid is higher
                   * 3- New epoch is the same as current epoch, new zxid is the same
                   *  as current zxid, but server id is higher.
                   */
                  return ((newEpoch > curEpoch) ||
                          ((newEpoch == curEpoch) &&
                          ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
            }
          

          在上述代码中的totalOrderPredicate方法逻辑如下:

          • 竞选周期大于当前周期为true
          • 竞选周期相等,竞选zxid大于当前zxid为true
          • 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true
          • 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。
          【Java】
          public Vote lookForLeader() throws InterruptedException {
          //...
             //存储节点对应的选票信息
              // key:选票来源sid  value:选票推举的Leader sid
              recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
              //半数选举开始
              if (termPredicate(recvset,
                      new Vote(proposedLeader, proposedZxid,
                              logicalclock.get(), proposedEpoch))) {
                  // Verify if there is any change in the proposed leader
                  while((n = recvqueue.poll(finalizeWait,
                          TimeUnit.MILLISECONDS)) != null){
                      if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                              proposedLeader, proposedZxid, proposedEpoch)){
                          recvqueue.put(n);
                          break;
                      }
                  }
                  /*WorkerSender
                   * This predicate is true once we don't read any new
                   * relevant message from the reception queue
                   */
                  if (n == null) {
                      //已选举出leader 更新当前节点是否为leader 
                      self.setPeerState((proposedLeader == self.getId()) ?
                              ServerState.LEADING: learningState());
                      Vote endVote = new Vote(proposedLeader,
                              proposedZxid, proposedEpoch);
                      leaveInstance(endVote);
                      return endVote;
                  }
              }
          //...
          }
          /**
               * Termination predicate. Given a set of votes, determines if have
               * sufficient to declare the end of the election round.
               *
               * @param votes
               *            Set of votes
               * @param vote
               *            Identifier of the vote received last  PK后的选票
               */
          private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
              SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
              voteSet.addQuorumVerifier(self.getQuorumVerifier());
              if (self.getLastSeenQuorumVerifier() != null
                      && self.getLastSeenQuorumVerifier().getVersion() > self
                              .getQuorumVerifier().getVersion()) {
                  voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
              }
              /*
               * First make the views consistent. Sometimes peers will have different
               * zxids for a server depending on timing.
               */
              //votes 来源于recvset 存储各个节点推举出来的选票信息
              for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
          //选举出的sid和其它节点选择的sid相同存储到voteSet变量中。
                  if (vote.equals(entry.getValue())) {
          //保存推举出来的sid
                      voteSet.addAck(entry.getKey());
                  }
              }
              //判断选举出来的选票数量是否过半
              return voteSet.hasAllQuorums();
          }
          //QuorumMaj#containsQuorum
          public boolean containsQuorum(Set<Long> ackSet) {
              return (ackSet.size() > half);
             }
          

          在上述代码中:recvset是存储每个sid推举的选票信息。

          第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

          第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

          最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

          • setPeerState更新当前节点角色;
          • proposedLeader选举出来的sid和自己sid相等,设置为Leader;
          • 上述条件不相等,设置为Follower或Observing;
          • 更新currentVote当前选票为Leader的选票vote(2,0,1)。

          07总结

          通过对Leader选举源码的解析,可以了解到:

          • 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。
          • 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。
          • 为BIO在多线程技术上的实践带来了宝贵的经验。

          以上就是zookeeper的Leader选举机制源码解析的详细内容,更多关于zookeeper Leader选举的资料请关注我们其它相关文章!

          0

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          关注公众号