扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
成都创新互联是少有的网站设计、做网站、营销型企业网站、小程序制作、手机APP,开发、制作、设计、买链接、推广优化一站式服务网络公司,2013年开创至今,坚持透明化,价格低,无套路经营理念。让网页惊喜每一位访客多年来深受用户好评
为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装 |
Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象。 1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现 1.1 connectString:连接串 1.2 retryPolicy:重试连接策略。有四种实现,分别是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed 1.3sessionTimeoutMs:会话超时时间,默认为60000ms 1.4connectionTimeoutMs连接超时时间,默认为15000ms
注意对于retryPolicy策略通过一个接口来让用户自定义实现 |
2.1创建连接
/** 重试策略: 初始时间为1s, 重试10次 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
/** 通过工厂创建连接 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build();
/** 开启连接 */ cf.start(); |
2.2 新增节点
/** * 新增节点:指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 * 1.creatingParentsIfNeeded() 递归创建父目录 * 2.withMode() 节点类型(持久|临时) * 3.forPath() 指定路径 */ cf.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/super/c1", "c1内容".getBytes()); |
2.3 删除节点
/** * 删除节点 * 1.deletingChildrenIfNeeded() 递归删除 * 2.guaranteed() 确保节点被删除 * 3. withVersion(int version) //特定版本号 */ cf.delete().deletingChildrenIfNeeded().forPath("/super"); |
2.4 读取和修改数据
/** * 读取和修改数据 : getData()和setData() */ cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes());
/** 读取节点内容 */ String c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("c2_data-->"+c2_data);
/** 修改节点内容 */ cf.setData().forPath("/super/c2", "修改c2的内容".getBytes()); String update_c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("update_c2_data-->"+update_c2_data); |
2.5 绑定回调函数
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void proce***esult(CuratorFramework cf, CuratorEvent event) throws Exception { System.out.println("code-->" + event.getResultCode()); System.out.println("type-->" + event.getType()); System.out.println("线程为-->" + Thread.currentThread().getName()); } }, pool).forPath("/super/c3", "c2的内容".getBytes());
System.out.println("主线程-->" + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE); |
2.6 读取子节点和判断节点是否存在
/** * 读取子节点的方法: getChildren() * 判断节点是否存在: checkExists() */ List for (String p: list) { System.out.println(p); }
//如果为null标识不存在 Stat stat = cf.checkExists().forPath("/super/c4"); System.out.println(stat); |
如果要使用类似Wather的监听功能Curator必须依赖一个jar包,Maven依赖 有了这个依赖包,使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里主要有两种监听方式 NodeCacheListener:监听节点的新增、修改操作 PathChildrenCacheListener:监听子节点的新增、修改、删除操作 |
4.1 分布式锁
在分布式场景中,为了保证数据的一致性,经常在程序运行的某一个点需要进行同步操作(java提供了synchronized或者Reentrantlock实现)比如看一个小示例,这个示例出现分布式不同步的问题 比如:之前是在高并发下访问一个程序,现在则是在高并发下访问多个服务器节点(分布式)
使用Curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,之前实现的时候遇到过,这里强烈推荐使用Curator分布式锁 public class Lock2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超时时间 */ private static final int SESSION_TIMEOUT = 5000; //MS static int count = 10; public static CuratorFramework createCuratorFramework(){ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); return cf; } public static void main(String[] args) throws Exception { final CountDownLatch countDown = new CountDownLatch(1); for (int i =0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework cf = createCuratorFramework(); cf.start(); //锁对象 client 锁节点 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); try { countDown.await(); lock.acquire(); //获得锁 number(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release();//释放锁 } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start();; } Thread.sleep(2000); countDown.countDown(); } public static void number() { count--; System.out.println(Thread.currentThread().getName() + "-->" + count); } } |
4.2 分布式计数器功能
一说到分布式计数器,可能脑海里想到AtomicInteger(原子累加)这种经典方式,如果针对一个JVM的场景当然没问题,但是现在是在分布式场景下,就需要利用Curator框架的DistributedAtomicInteger了 public class CuratorAtomicInteger { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超时时间 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception { CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); //使用DistributedAtomicInteger DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000)); //atomicInteger.increment(); atomicInteger.add(1); AtomicValue |
4.3 Barrier
4.3.1 DistributedDoubleBarrier
分布式Barrier 类DistributedDoubleBarrier: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点同时开始,中间谁先运行完毕,谁后运行完毕不关心,但是最终一定是一块退出运行的
public class CuratorBarrier { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超时时间 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 实例化5个客户端对象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + " 已准备好!"); barrier.enter(); System.out.println("同时开始运行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println("运行完毕..."); barrier.leave(); System.out.println("同时退出运行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } } } |
4.3.2 DistributedBarrier
分布式Barrier 类DistributedBarrier: 它会阻塞所有节点上的等待进程(所有节点进入待执行状态),直到“某一个人吹哨”说开始执行, 然后所有的节点同时开始 public class CuratorBarrier2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超时时间 */ private static final int SESSION_TIMEOUT = 5000; //MS static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 实例化5个客户端对象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/superBarrier"); System.out.println(Thread.currentThread().getName() + " 设置barrier"); barrier.setBarrier(); //设置 barrier.waitOnBarrier(); //等待 System.out.println("开始执行程序..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } Thread.sleep(5000); barrier.removeBarrier(); //释放 } } |
Curator内部实现的几种重试策略: 1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加. 2.RetryNTimes:指定最大重试次数的重试策略 3.RetryOneTime:仅重试一次 4.RetryUntilElapsed:一直重试直到达到规定的时间 |
5.1 ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
参数说明 1.baseSleepTimeMs 初始sleep时间 2.maxRetries 最大重试次数 3.maxSleepMs 最大重试时间 |
5.2 RetryNTimes
RetryNTimes(int n, int sleepMsBetweenRetries) 参数说明 1.n 最大重试次数 2.sleepMsBetweenRetries 每次重试的间隔时间 |
5.3 RetryOneTime
RetryOneTime(int sleepMsBetweenRetry)
参数说明 1.sleepMsBetweenRetry为重试间隔的时间 |
5.4 RetryUntilElapsed
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
参数说明 1.maxElapsedTimeMs 最大重试时间 2.sleepMsBetweenRetries 每次重试的间隔时间 |
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流