博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Curator leader 选举(一)
阅读量:4970 次
发布时间:2019-06-12

本文共 4812 字,大约阅读时间需要 16 分钟。

要想使用Leader选举功能,需要添加recipes包,可以在maven中添加如下依赖:

<dependency>

<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.0</version>
</dependency>

 

当然了,由于recipes需要使用framework,所以你肯定还要添加如下依赖:

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.9.0</version>
</dependency>

 

 

最后,为了简化测试也为了便于学习,可以添加test依赖:

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.9.0</version>
</dependency>

 

LeaderLatch使用流程

  recipes包里面提供了Leader选举实现,Spark中的master选举使用的就是reciples包里面的LeaderLatch,使用他们可以极大的简化代码,使你将注意力更多的放在核心业务逻辑上。Leader选举的实现在org.apache.curator.framework.recipes.leader包中,这个包提供了两组Leader选举:

  1.LeaderLatch,LeaderLatchListener

  2.LeaderSelector,LeaderSelectorListener,LeaderSelectorListenerAdapter

这两组类都可以实现Leader选举,spark 使用的是第一种。再这篇文章里,只介绍第一种。

 

第一组使用起来非常简单,使用思路大致如下:假设你有3个节点,姑且叫做node0,node1,node2。你需要为每一个node创建一个CuratorFramework,LeaderLatch,LeaderLatchListener,如下:

 

node0:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

node1:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

node2:

  1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();

  2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()

 

你首先要创建CuratorFramework,然后并启动它,一个CuratorFramework就是一个ZooKeeper客户端。然后创建LeaderLatch,并制定刚才创建的CuratorFramework和一个leaderPath,leaderPath是一个ZooKeepe路径,node0,node1,node2中的leaderPath必须一致。创建好LeaderLatch之后,需要为他注册一个LeaderLatchListener回掉,如果某个node成为leader,那么会调用这个node的LeaderLatchListener的isLeader(),因此你可以在这里写自己的业务逻辑。最后,调用LeaderLatch的start(),这个LeaderLatch将参加选举了。

 

可以参考如下代码:

import java.util.ArrayList;import java.util.List;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.leader.LeaderLatch;import org.apache.curator.framework.recipes.leader.LeaderLatchListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;public class LeaderDemo {    public static void main(String[]args) throws Exception{        List
leaders=new ArrayList
(); List
clients=new ArrayList
(); TestingServer server=new TestingServer(); try{ for(int i=0;i<10;i++){ CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(20000,3)); clients.add(client); LeaderLatch leader=new LeaderLatch(client,"/francis/leader"); leader.addListener(new LeaderLatchListener(){ @Override public void isLeader() { // TODO Auto-generated method stub System.out.println("I am Leader"); } @Override public void notLeader() { // TODO Auto-generated method stub System.out.println("I am not Leader"); }}); leaders.add(leader); client.start(); leader.start(); } Thread.sleep(Integer.MAX_VALUE); }finally{ for(CuratorFramework client:clients){ CloseableUtils.closeQuietly(client); } for(LeaderLatch leader:leaders){ CloseableUtils.closeQuietly(leader); } CloseableUtils.closeQuietly(server); } Thread.sleep(Integer.MAX_VALUE); }}

 

LeaderLatch和LeaderLatchListener方法介绍

LeaderLatch提供了如下方法:

 start()/close():启动/停止LeaderLatch

 addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener

 hasLeadership():如果LeaderLatch是Leader,那么返回true,否则false。

 getLeader():

 await:等待Leaderlatch成为Leader。

 

LeaderLatchListener提供了如下方法:

  isLeader():当LeaderLatch的hasLeaderShip()从false到true后,就会调用isLeader(),表明这个LeaderLatch成为leader了。

  notLeader():当LeaderLatch的hahLeaderShip从true到false后,就会调用notLeader(),表明这个LeaderLatch不再是leader了。

 

LeaderLatch在Master-Slave中的应用

在一个典型的master-slave场景下。你可以在isLeader中做如下处理:

  1.每一个master类都有一个state属性,初始值为standby.

  2.在isLeader中,从持久话引擎中读取要恢复的数据到一个临时的内存缓存中

  3.将这个master的state修改为recovering

  4.通知所有worker将其内部的master修改为当前master。

  5.将临时内存缓存中的数据恢复到master内部。

  6.将master状态修改为alive,然后这个master就可以对外服务了。

注意第5步,由于将持久话引擎中的数据添加到了master内部的内存中,所以需要确保之多恢复一次语义。

 

转载于:https://www.cnblogs.com/francisYoung/p/5464789.html

你可能感兴趣的文章
上周热点回顾(8.18-8.24)
查看>>
Feature toggle
查看>>
day02
查看>>
我是怎么招聘程序员的
查看>>
gvim 配置Pydiction
查看>>
Linux安装指定mysql版本
查看>>
Exception in thread "main" java.lang.ClassNotFoundException: 解决方法
查看>>
移动应用(手机应用)开发IM聊天程序解决方案
查看>>
[转载] K3漏油器全紫铜替换原硅胶垫教程。标准姿势
查看>>
python set
查看>>
VC中使用ADO操作数据库的方法
查看>>
如何判断域名是否被微信拦截 被已经被微信封了的的域名网址如何在微信中正常打开...
查看>>
分布式锁的三种实现方式
查看>>
AJAX原生JS代码
查看>>
ThinkPHP提示错误
查看>>
poj 2109 pow函数也能这么用?p的开n次方
查看>>
Oracle database link
查看>>
清北学堂2017NOIP冬令营入学测试P4749 F’s problem(f)
查看>>
POJ 1840 Eqs HASH
查看>>
python调用shell小技巧
查看>>