如何实现基于Jedis+ZK的分布式序列号生成器

本篇内容主要讲解“如何实现基于Jedis+ZK的分布式序列号生成器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何实现基于Jedis+ZK的分布式序列号生成器”吧!

创新互联公司坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站设计制作、成都网站制作、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的大竹网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

部分源码参考Jedis实现分布式锁博客:

package com.xxx.arch.seq.utlis;

import com.xxx.arch.seq.client.redis.RedisSEQ;
import lombok.extern.slf4j.Slf4j;


/**
 * arch-seq 唯一code 获取客户端
 *
 * @author jdkleo
 */
@Slf4j
public class SEQUtil {

    /**
     * 生成默认KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位
     *
     * @param
     * @return
     */
    public static long getSEQ() {
        return RedisSEQ.getSEQ();
    }

    /**
     * 生成默认KEY连续的UUID,共total个
     *
     * @param total - 连续多少个
     * @return
     */
    public static long[] getSEQ(long total) {
        long value = RedisSEQ.getSEQ(total);
        return getValueArray(value, (int) total);
    }

    /**
     * 生成指定KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位
     *
     * @param seqName
     * @return
     */
    public static long getSEQ(String seqName) {
        return RedisSEQ.getSEQ(seqName, 1);
    }

    /**
     * 生成指定KEY连续的UUID,共total个
     *
     * @param seqName
     * @param total
     * @return
     */
    public static long[] getSEQ(String seqName, long total) {
        long value = RedisSEQ.getSEQ(seqName, total);
        return getValueArray(value, (int) total);
    }


    private static long[] getValueArray(long value, int total) {
        int n = total;
        long[] ret = new long[n];
        do {
            ret[n - 1] = value--;
        } while (--n > 0);
        return ret;
    }
}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Redis版本SEQ(有序SEQ)
 *
 * @author zhangyang
 * @createDate 2019-01-22
 * @since 2.x
 */
@Slf4j
public class RedisSEQ extends StreamCloseAble {

    //默认的REDIS SEQ初始化状态器KEY
    private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT";
    //默认的REDIS SEQ初始化状态器VAL
    private static final String _DEFAULT_SEQ_INIT_PENDING = "pending";
    private static final String _DEFAULT_SEQ_INIT_READY = "ready";
    //SEQ初始化容器状态
    private static volatile boolean _DEFAULT_SEQ_INIT_STATUS;

    //默认REDIS SEQ序列号的名称
    private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ";

    //本地模式自增ID槽
    private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0);

    static {
        JedisConfig.JedisConn jedisConn = null;
        try {
            jedisConn = JedisConfig.getInstance().getConn();
            //if REDIS宕机或第一次:创建初始化状态成功后,初始化redis keys(该方法可以恢复上次redis宕机数据)
            if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//抢到REDIS初始化锁,并将其标记为pending状态
                try {
                    RedisSEQTimer.getInstance().removeNotUsedKeys();
                    RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,从ZK上读取初始数据
                    jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,标记为ready状态
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    //初始化arch.seq REDIS数据异常,有可能是ZK相关问题,也有可能是REDIS问题,请排查
                    log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY);
                    jedisConn.del(_DEFAULT_SEQ_INIT_KEY);
                }
            }
            //else{...} 没抢到REDIS初始化锁的话:不作任何处理
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready");
        } finally {
            close(jedisConn);
        }
    }


    public static Long getSEQ() {
        return getSEQ(_DEFAULT_SEQ_NAME, 1);
    }

    public static Long getSEQ(long total) {
        return getSEQ(_DEFAULT_SEQ_NAME, total);
    }

    public static Long getSEQ(String seqName, long total) {
        Long result = null;
        JedisConfig.JedisConn jedisConn = null;
        try {
            //获取redis连接
            jedisConn = JedisConfig.getInstance().getConn();
            //获得REDIS初始化状态不成功
            if (!tryInitReady(jedisConn)) {
                //arch.seq By REDIS版本不能正常初始化,请检查REDIS服务。
                throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service.");
            }
            //开启分布式锁
            //if (jedisConn.tryLock(seqName, 1000, 2000)) {
            try {
                String day = RedisSEQTimer.getInstance().getDayFormat();
                String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total));
                result = Long.parseLong(day + incrVal);
            } catch (Exception e) {
                e.printStackTrace();
                log.warn("try lock failed,the arch.seq tool will be retry after sleep some times.");
                Thread.sleep(randTime());
                result = getSEQ(seqName, total);
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
            //redis生成失败,返回本地ID:15位纳秒+1位自然数轮询
            //在获取【自增序列号:{},序列号分布式锁:{}】时发生了异常,系统返回了本地生成的自增序列号,不影响系统使用,但请管理员尽快协查!
            log.error("An exception occurred while acquiring self-incremental sequence number '{}', " +
                    "sequence number distributed lock '{}',The system returns the locally generated self-incremental " +
                    "sequence number, which does not affect the use of the system, but the administrator should check " +
                    "it as soon as possible.", seqName, seqName + "_LOCK");
            result = xUUID();
        } finally {
            //切记,一定要释放分布式锁(注:释放锁的同时jedisConn会自动释放connection,无需再次CLOSE)
            if (jedisConn != null) {
                //jedisConn.unLock(seqName);
                jedisConn.close();
            }
            if (log.isDebugEnabled()) {
                log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace());
            }
        }
        return result;
        //arch.seq发生了不可预测的异常,请联系架构部处理!
        //throw new RuntimeException("arch.seq发生了不可预测的异常,请联系架构部处理!");
    }

    private static String getStackTrace() {
        StringBuilder result = new StringBuilder();
        StackTraceElement[] element = Thread.currentThread().getStackTrace();
        for (int i = 0; i < element.length; i++) {
            result.append("\t").append(element[i]).append("\n");
        }
        return result.toString();
    }

    private static long randTime() {
        return new Random().nextInt(50) + 50;
    }

    private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException {
        int times = 0;
        for (; times < 3; times++) {
            if (getSEQInitReady(jedisConn)) {
                break;
            }
            Thread.sleep(100);
        }
        return times < 3;
    }

    /**
     * 获得SEQ初始化状态
     *
     * @param jedisConn
     * @return
     */
    private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) {
        if (!_DEFAULT_SEQ_INIT_STATUS) {
            synchronized (RedisSEQ.class) {
                if (!_DEFAULT_SEQ_INIT_STATUS) {
                    _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY));
                }
            }
        }
        return _DEFAULT_SEQ_INIT_STATUS;
    }

    /**
     * 获得REDIS自增序列号最新值,并同步更新到ZK备份数据节点守护线程中
     *
     * @param jedisConn
     * @param day
     * @param seqName
     * @param total
     * @return
     */
    private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) {
        String key = seqName + "_" + day;
        Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key);
        if (incrVal > 9999999999L) {
            throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal);
        }
        //塞到要更新的ZK队列中
        RedisSEQTimer.getInstance().push(key, incrVal);
        return incrVal;
    }

    /**
     * 单机模式生成UUID
     *
     * @return
     */
    private static Long xUUID() {
        int rand = _LOCAL_INCR.incrementAndGet() % 10;
        String result = System.nanoTime() + "" + rand;
        return Long.parseLong(result);
    }

}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.client.zk.ZkClientUtil;
import org.apache.commons.lang3.time.DateUtils;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


public class RedisSEQTimer extends StreamCloseAble {
    public static final String DAY_FORMAT_PATTERN = "yyMMdd";

    public static volatile RedisSEQTimer redisSEQTimer;

    private final ConcurrentHashMap REDIS_INCR_MAP = new ConcurrentHashMap<>();

    private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient();

    private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS";

    //zk节点最大值每次递增数
    private long _REDIS_MAXVALUE_INIT = 10_000L;

    private Timer _TIMER = new Timer(true);

    //是否处于清理状态
    private volatile boolean _CLEAN_STATUS;

    //清理key
    private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY";

    private RedisSEQTimer() {
        super();
        //启动zk巡查服务
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                checkAndConfigure();
            }
        }, new Date(), 1 * 60 * 1000);

        //每天定时清理垃圾数据
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                removeNotUsedKeys();
            }
        }, getFirstTime(), 24 * 60 * 60 * 1000);
    }


    public static RedisSEQTimer getInstance() {
        if (redisSEQTimer == null) {
            synchronized (RedisSEQTimer.class) {
                if (redisSEQTimer == null) {
                    redisSEQTimer = new RedisSEQTimer();
                }
            }
        }
        return redisSEQTimer;
    }

    /**
     * 定期更新ZK节点
     */
    private synchronized void checkAndConfigure() {
        if (_CLEAN_STATUS) {
            return;
        }
        if (REDIS_INCR_MAP.isEmpty()) {
            return;
        }
        String endDay = "_" + getDayFormat();
        List notTodayKeys = new ArrayList<>();
        Set> entrySet = REDIS_INCR_MAP.entrySet();
        for (Map.Entry entry : entrySet) {
            //不是今天的key不作处理
            if (!entry.getKey().endsWith(endDay)) {
                notTodayKeys.add(entry.getKey());
                return;
            }
            //将最新的值写到zk节点上 节点格式:/KEY_yyMMdd
            String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey();
            if (_ZK_CLIENT.exists(zkNode)) {
                _ZK_CLIENT.writeData(zkNode, entry.getValue());
            } else {
                try {
                    _ZK_CLIENT.createPersistent(zkNode, entry.getValue());
                } catch (RuntimeException e) {
                    //not to write log ,it's will be retry in next time.
                }
            }
        }
        ;
        if (!notTodayKeys.isEmpty()) {
            for (String key : notTodayKeys) {
                REDIS_INCR_MAP.remove(key);
            }
        }
    }

    /**
     * 删除不再使用的KEY(包含redis和zk节点)
     */
    public synchronized void removeNotUsedKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        _CLEAN_STATUS = true;
        JedisConfig.JedisConn jedisConn = null;
        String requestId = UUID.randomUUID().toString();
        boolean tryLock = false;
        try {
            List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);

            //保留两天。考虑到多个机器的时间可能不一致,如果在刚过零点删除了昨天的sequence,另一台机器可能还需要使用它,则会出现id重复
            Date now = new Date();
            Date yesterday = DateUtils.addDays(now, -1);
            List keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday));

            if (list != null && !list.isEmpty()) {
                jedisConn = JedisConfig.getInstance().getConn();
                if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) {
                    JedisConfig.JedisConn finalJedisConn = jedisConn;
                    for (String node : list) {
                        String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length());
                        if (!keepDays.contains(dayPart)) {
                            REDIS_INCR_MAP.remove(node);
                            finalJedisConn.del(node);
                            removeZkNode(node);
                        }
                    }
                }
            }
        } finally {
            _CLEAN_STATUS = false;
            if (jedisConn != null) {
                if (tryLock) {
                    jedisConn.unLock(_REMOVE_KEY, requestId);
                }
                jedisConn.close();
            }
        }
    }

    /**
     * 移除ZK节点
     *
     * @param node
     */
    private void removeZkNode(String node) {
        String path = _DEFAULT_ZK_NAMESPACE + "/" + node;
        if (_ZK_CLIENT.exists(path)) {
            try {
                _ZK_CLIENT.delete(path);
            } catch (Exception e) {
            }
        }
    }


    /**
     * 获得每天定时任务的执行时间
     *
     * @return
     */
    private Date getFirstTime() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.HOUR_OF_DAY, 24); // 24点  可以更改时间
        calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分钟 随机
        calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒  随机
        return calendar.getTime();
    }

    /**
     * 获得区间随机整数
     *
     * @param exclude - 最大数,exclude
     * @param from    - 最小数,include
     * @return
     */
    private int getRandNum(int exclude, int from) {
        return new Random().nextInt(exclude) + from;
    }


    /**
     * 将某天的KEY塞到相应队列
     *
     * @param key - 业务KEY key_yyMMdd
     * @param val - 值
     * @return 是否成功
     */
    public synchronized void push(String key, Long val) {
        REDIS_INCR_MAP.put(key, val);
    }

    public String getDayFormat() {
        return getDayFormat(new Date());
    }

    public String getDayFormat(Date date) {
        return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date);
    }

    /**
     * 初始化redis keys
     */
    public void initRedisKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);
        if (list != null && !list.isEmpty()) {
            Long zkVal;
            JedisConfig.JedisConn jedisConn = null;
            for (int i = 0; i < list.size(); i++) {
                zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i));
                if (zkVal != null) {
                    String requestId = UUID.randomUUID().toString();
                    boolean tryLock = false;
                    try {
                        jedisConn = JedisConfig.getInstance().getConn();
                        //获得锁才更新,没获得锁就放弃更新
                        if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) {
                            jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT));
                        }
                    } finally {
                        if (jedisConn != null) {
                            if (tryLock) {
                                jedisConn.unLock(list.get(i), requestId);
                            }
                            jedisConn.close();
                        }
                    }
                }
            }
        }
    }


}
package com.xxx.arch.seq.client.tool;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.Collections;
import java.util.List;


@Slf4j
public class ZkClient {

    private CuratorFramework client;

    public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(serverList)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
    }


    public boolean exists(String path) {
        try {
            return client.checkExists().forPath(path) != null;
        } catch (Exception e) {
            return false;
        }
    }

    public void writeData(String path, Long value) {
        try {
            client.setData().forPath(path, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public void createPersistent(String zkNode, Long value) {
        try {
            client.create().forPath(zkNode, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public List getChildren(String path) {
        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return Collections.emptyList();
    }

    public Long readData(String path) {
        try {
            byte[] data = client.getData().forPath(path);
            return Long.parseLong(new String(data));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }

    public void delete(String path) {
        try {
            client.delete().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
package com.xxx.arch.seq.client.zk;

import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZkClientUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class);

    private static volatile ZkClient zkClient = null;

    public static ZkClient getZkClient() {
        if (zkClient == null) {
            synchronized (ZkClientUtil.class) {
                if (zkClient == null) {
                    initZkClient();
                }
            }
        }
        return zkClient;
    }

    private static void initZkClient() {
        try {
            String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING;
            if (logger.isInfoEnabled()) {
                logger.info("zk cluster[" + serverList + "]");
            }
            if (serverList == null || serverList.trim().isEmpty()) {
                throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used");
            } else {
                zkClient = new ZkClient(serverList, 15000, 60000);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

}
package com.xxx.arch.seq.client.tool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;

/**
 * Created by zhangyang on 2016/5/31.
 */
public class StreamCloseAble {
    private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class);

    /**
     * 关闭输入输出流
     *
     * @param closeAbles
     */
    public static void close(Closeable... closeAbles) {
        if (closeAbles == null || closeAbles.length <= 0) {
            return;
        }
        for (Closeable closeAble : closeAbles) {
            if (closeAble != null) {
                try {
                    closeAble.close();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}

到此,相信大家对“如何实现基于Jedis+ZK的分布式序列号生成器”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


网站名称:如何实现基于Jedis+ZK的分布式序列号生成器
标题URL:http://csdahua.cn/article/igjpgj.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流