本篇内容主要讲解“如何实现基于Jedis+ZK的分布式序列号生成器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何实现基于Jedis+ZK的分布式序列号生成器”吧!
部分源码参考Jedis实现分布式锁博客:
package com.xxx.arch.seq.utlis; import com.xxx.arch.seq.client.redis.RedisSEQ; import lombok.extern.slf4j.Slf4j; /** * arch-seq 唯一code 获取客户端 * * @author jdkleo */ @Slf4j public class SEQUtil { /** * 生成默认KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位 * * @param * @return */ public static long getSEQ() { return RedisSEQ.getSEQ(); } /** * 生成默认KEY连续的UUID,共total个 * * @param total - 连续多少个 * @return */ public static long[] getSEQ(long total) { long value = RedisSEQ.getSEQ(total); return getValueArray(value, (int) total); } /** * 生成指定KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位 * * @param seqName * @return */ public static long getSEQ(String seqName) { return RedisSEQ.getSEQ(seqName, 1); } /** * 生成指定KEY连续的UUID,共total个 * * @param seqName * @param total * @return */ public static long[] getSEQ(String seqName, long total) { long value = RedisSEQ.getSEQ(seqName, total); return getValueArray(value, (int) total); } private static long[] getValueArray(long value, int total) { int n = total; long[] ret = new long[n]; do { ret[n - 1] = value--; } while (--n > 0); return ret; } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import lombok.extern.slf4j.Slf4j; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; /** * Redis版本SEQ(有序SEQ) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */ @Slf4j public class RedisSEQ extends StreamCloseAble { //默认的REDIS SEQ初始化状态器KEY private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT"; //默认的REDIS SEQ初始化状态器VAL private static final String _DEFAULT_SEQ_INIT_PENDING = "pending"; private static final String _DEFAULT_SEQ_INIT_READY = "ready"; //SEQ初始化容器状态 private static volatile boolean _DEFAULT_SEQ_INIT_STATUS; //默认REDIS SEQ序列号的名称 private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ"; //本地模式自增ID槽 private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0); static { JedisConfig.JedisConn jedisConn = null; try { jedisConn = JedisConfig.getInstance().getConn(); //if REDIS宕机或第一次:创建初始化状态成功后,初始化redis keys(该方法可以恢复上次redis宕机数据) if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//抢到REDIS初始化锁,并将其标记为pending状态 try { RedisSEQTimer.getInstance().removeNotUsedKeys(); RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,从ZK上读取初始数据 jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,标记为ready状态 } catch (Exception e) { log.error(e.getMessage(), e); //初始化arch.seq REDIS数据异常,有可能是ZK相关问题,也有可能是REDIS问题,请排查 log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY); jedisConn.del(_DEFAULT_SEQ_INIT_KEY); } } //else{...} 没抢到REDIS初始化锁的话:不作任何处理 } catch (Exception e) { log.error(e.getMessage(), e); log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready"); } finally { close(jedisConn); } } public static Long getSEQ() { return getSEQ(_DEFAULT_SEQ_NAME, 1); } public static Long getSEQ(long total) { return getSEQ(_DEFAULT_SEQ_NAME, total); } public static Long getSEQ(String seqName, long total) { Long result = null; JedisConfig.JedisConn jedisConn = null; try { //获取redis连接 jedisConn = JedisConfig.getInstance().getConn(); //获得REDIS初始化状态不成功 if (!tryInitReady(jedisConn)) { //arch.seq By REDIS版本不能正常初始化,请检查REDIS服务。 throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service."); } //开启分布式锁 //if (jedisConn.tryLock(seqName, 1000, 2000)) { try { String day = RedisSEQTimer.getInstance().getDayFormat(); String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total)); result = Long.parseLong(day + incrVal); } catch (Exception e) { e.printStackTrace(); log.warn("try lock failed,the arch.seq tool will be retry after sleep some times."); Thread.sleep(randTime()); result = getSEQ(seqName, total); } } catch (Throwable e) { log.error(e.getMessage(), e); //redis生成失败,返回本地ID:15位纳秒+1位自然数轮询 //在获取【自增序列号:{},序列号分布式锁:{}】时发生了异常,系统返回了本地生成的自增序列号,不影响系统使用,但请管理员尽快协查! log.error("An exception occurred while acquiring self-incremental sequence number '{}', " + "sequence number distributed lock '{}',The system returns the locally generated self-incremental " + "sequence number, which does not affect the use of the system, but the administrator should check " + "it as soon as possible.", seqName, seqName + "_LOCK"); result = xUUID(); } finally { //切记,一定要释放分布式锁(注:释放锁的同时jedisConn会自动释放connection,无需再次CLOSE) if (jedisConn != null) { //jedisConn.unLock(seqName); jedisConn.close(); } if (log.isDebugEnabled()) { log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace()); } } return result; //arch.seq发生了不可预测的异常,请联系架构部处理! //throw new RuntimeException("arch.seq发生了不可预测的异常,请联系架构部处理!"); } private static String getStackTrace() { StringBuilder result = new StringBuilder(); StackTraceElement[] element = Thread.currentThread().getStackTrace(); for (int i = 0; i < element.length; i++) { result.append("\t").append(element[i]).append("\n"); } return result.toString(); } private static long randTime() { return new Random().nextInt(50) + 50; } private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException { int times = 0; for (; times < 3; times++) { if (getSEQInitReady(jedisConn)) { break; } Thread.sleep(100); } return times < 3; } /** * 获得SEQ初始化状态 * * @param jedisConn * @return */ private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) { if (!_DEFAULT_SEQ_INIT_STATUS) { synchronized (RedisSEQ.class) { if (!_DEFAULT_SEQ_INIT_STATUS) { _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY)); } } } return _DEFAULT_SEQ_INIT_STATUS; } /** * 获得REDIS自增序列号最新值,并同步更新到ZK备份数据节点守护线程中 * * @param jedisConn * @param day * @param seqName * @param total * @return */ private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) { String key = seqName + "_" + day; Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key); if (incrVal > 9999999999L) { throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal); } //塞到要更新的ZK队列中 RedisSEQTimer.getInstance().push(key, incrVal); return incrVal; } /** * 单机模式生成UUID * * @return */ private static Long xUUID() { int rand = _LOCAL_INCR.incrementAndGet() % 10; String result = System.nanoTime() + "" + rand; return Long.parseLong(result); } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.client.zk.ZkClientUtil; import org.apache.commons.lang3.time.DateUtils; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class RedisSEQTimer extends StreamCloseAble { public static final String DAY_FORMAT_PATTERN = "yyMMdd"; public static volatile RedisSEQTimer redisSEQTimer; private final ConcurrentHashMap<String, Long> REDIS_INCR_MAP = new ConcurrentHashMap<>(); private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient(); private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS"; //zk节点最大值每次递增数 private long _REDIS_MAXVALUE_INIT = 10_000L; private Timer _TIMER = new Timer(true); //是否处于清理状态 private volatile boolean _CLEAN_STATUS; //清理key private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY"; private RedisSEQTimer() { super(); //启动zk巡查服务 _TIMER.schedule(new TimerTask() { @Override public void run() { checkAndConfigure(); } }, new Date(), 1 * 60 * 1000); //每天定时清理垃圾数据 _TIMER.schedule(new TimerTask() { @Override public void run() { removeNotUsedKeys(); } }, getFirstTime(), 24 * 60 * 60 * 1000); } public static RedisSEQTimer getInstance() { if (redisSEQTimer == null) { synchronized (RedisSEQTimer.class) { if (redisSEQTimer == null) { redisSEQTimer = new RedisSEQTimer(); } } } return redisSEQTimer; } /** * 定期更新ZK节点 */ private synchronized void checkAndConfigure() { if (_CLEAN_STATUS) { return; } if (REDIS_INCR_MAP.isEmpty()) { return; } String endDay = "_" + getDayFormat(); List<String> notTodayKeys = new ArrayList<>(); Set<Map.Entry<String, Long>> entrySet = REDIS_INCR_MAP.entrySet(); for (Map.Entry<String, Long> entry : entrySet) { //不是今天的key不作处理 if (!entry.getKey().endsWith(endDay)) { notTodayKeys.add(entry.getKey()); return; } //将最新的值写到zk节点上 节点格式:<ARCH_SEQ前缀>/KEY_yyMMdd String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey(); if (_ZK_CLIENT.exists(zkNode)) { _ZK_CLIENT.writeData(zkNode, entry.getValue()); } else { try { _ZK_CLIENT.createPersistent(zkNode, entry.getValue()); } catch (RuntimeException e) { //not to write log ,it's will be retry in next time. } } } ; if (!notTodayKeys.isEmpty()) { for (String key : notTodayKeys) { REDIS_INCR_MAP.remove(key); } } } /** * 删除不再使用的KEY(包含redis和zk节点) */ public synchronized void removeNotUsedKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } _CLEAN_STATUS = true; JedisConfig.JedisConn jedisConn = null; String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); //保留两天。考虑到多个机器的时间可能不一致,如果在刚过零点删除了昨天的sequence,另一台机器可能还需要使用它,则会出现id重复 Date now = new Date(); Date yesterday = DateUtils.addDays(now, -1); List<String> keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday)); if (list != null && !list.isEmpty()) { jedisConn = JedisConfig.getInstance().getConn(); if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) { JedisConfig.JedisConn finalJedisConn = jedisConn; for (String node : list) { String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length()); if (!keepDays.contains(dayPart)) { REDIS_INCR_MAP.remove(node); finalJedisConn.del(node); removeZkNode(node); } } } } } finally { _CLEAN_STATUS = false; if (jedisConn != null) { if (tryLock) { jedisConn.unLock(_REMOVE_KEY, requestId); } jedisConn.close(); } } } /** * 移除ZK节点 * * @param node */ private void removeZkNode(String node) { String path = _DEFAULT_ZK_NAMESPACE + "/" + node; if (_ZK_CLIENT.exists(path)) { try { _ZK_CLIENT.delete(path); } catch (Exception e) { } } } /** * 获得每天定时任务的执行时间 * * @return */ private Date getFirstTime() { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.HOUR_OF_DAY, 24); // 24点 可以更改时间 calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分钟 随机 calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒 随机 return calendar.getTime(); } /** * 获得区间随机整数 * * @param exclude - 最大数,exclude * @param from - 最小数,include * @return */ private int getRandNum(int exclude, int from) { return new Random().nextInt(exclude) + from; } /** * 将某天的KEY塞到相应队列 * * @param key - 业务KEY key_yyMMdd * @param val - 值 * @return 是否成功 */ public synchronized void push(String key, Long val) { REDIS_INCR_MAP.put(key, val); } public String getDayFormat() { return getDayFormat(new Date()); } public String getDayFormat(Date date) { return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date); } /** * 初始化redis keys */ public void initRedisKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); if (list != null && !list.isEmpty()) { Long zkVal; JedisConfig.JedisConn jedisConn = null; for (int i = 0; i < list.size(); i++) { zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i)); if (zkVal != null) { String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { jedisConn = JedisConfig.getInstance().getConn(); //获得锁才更新,没获得锁就放弃更新 if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) { jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT)); } } finally { if (jedisConn != null) { if (tryLock) { jedisConn.unLock(list.get(i), requestId); } jedisConn.close(); } } } } } } }
package com.xxx.arch.seq.client.tool; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Collections; import java.util.List; @Slf4j public class ZkClient { private CuratorFramework client; public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(serverList) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeout) .retryPolicy(retryPolicy) .build(); client.start(); } public boolean exists(String path) { try { return client.checkExists().forPath(path) != null; } catch (Exception e) { return false; } } public void writeData(String path, Long value) { try { client.setData().forPath(path, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public void createPersistent(String zkNode, Long value) { try { client.create().forPath(zkNode, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public List<String> getChildren(String path) { try { return client.getChildren().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } return Collections.emptyList(); } public Long readData(String path) { try { byte[] data = client.getData().forPath(path); return Long.parseLong(new String(data)); } catch (Exception e) { log.error(e.getMessage(), e); } return null; } public void delete(String path) { try { client.delete().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.zk; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.constant.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkClientUtil { private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class); private static volatile ZkClient zkClient = null; public static ZkClient getZkClient() { if (zkClient == null) { synchronized (ZkClientUtil.class) { if (zkClient == null) { initZkClient(); } } } return zkClient; } private static void initZkClient() { try { String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING; if (logger.isInfoEnabled()) { logger.info("zk cluster[" + serverList + "]"); } if (serverList == null || serverList.trim().isEmpty()) { throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used"); } else { zkClient = new ZkClient(serverList, 15000, 60000); } } catch (Exception e) { logger.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.tool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; /** * Created by zhangyang on 2016/5/31. */ public class StreamCloseAble { private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class); /** * 关闭输入输出流 * * @param closeAbles */ public static void close(Closeable... closeAbles) { if (closeAbles == null || closeAbles.length <= 0) { return; } for (Closeable closeAble : closeAbles) { if (closeAble != null) { try { closeAble.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } } }
到此,相信大家对“如何实现基于Jedis+ZK的分布式序列号生成器”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。