本篇内容主要讲解“如何实现基于Jedis+ZK的分布式序列号生成器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何实现基于Jedis+ZK的分布式序列号生成器”吧!
部分源码参考Jedis实现分布式锁博客:
package com.xxx.arch.seq.utlis;
import com.xxx.arch.seq.client.redis.RedisSEQ;
import lombok.extern.slf4j.Slf4j;
/**
* arch-seq 唯一code 获取客户端
*
* @author jdkleo
*/
@Slf4j
public class SEQUtil {
/**
* 生成默认KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位
*
* @param
* @return
*/
public static long getSEQ() {
return RedisSEQ.getSEQ();
}
/**
* 生成默认KEY连续的UUID,共total个
*
* @param total - 连续多少个
* @return
*/
public static long[] getSEQ(long total) {
long value = RedisSEQ.getSEQ(total);
return getValueArray(value, (int) total);
}
/**
* 生成指定KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位
*
* @param seqName
* @return
*/
public static long getSEQ(String seqName) {
return RedisSEQ.getSEQ(seqName, 1);
}
/**
* 生成指定KEY连续的UUID,共total个
*
* @param seqName
* @param total
* @return
*/
public static long[] getSEQ(String seqName, long total) {
long value = RedisSEQ.getSEQ(seqName, total);
return getValueArray(value, (int) total);
}
private static long[] getValueArray(long value, int total) {
int n = total;
long[] ret = new long[n];
do {
ret[n - 1] = value--;
} while (--n > 0);
return ret;
}
}
package com.xxx.arch.seq.client.redis;
import com.xxx.arch.seq.client.tool.StreamCloseAble;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Redis版本SEQ(有序SEQ)
*
* @author zhangyang
* @createDate 2019-01-22
* @since 2.x
*/
@Slf4j
public class RedisSEQ extends StreamCloseAble {
//默认的REDIS SEQ初始化状态器KEY
private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT";
//默认的REDIS SEQ初始化状态器VAL
private static final String _DEFAULT_SEQ_INIT_PENDING = "pending";
private static final String _DEFAULT_SEQ_INIT_READY = "ready";
//SEQ初始化容器状态
private static volatile boolean _DEFAULT_SEQ_INIT_STATUS;
//默认REDIS SEQ序列号的名称
private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ";
//本地模式自增ID槽
private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0);
static {
JedisConfig.JedisConn jedisConn = null;
try {
jedisConn = JedisConfig.getInstance().getConn();
//if REDIS宕机或第一次:创建初始化状态成功后,初始化redis keys(该方法可以恢复上次redis宕机数据)
if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//抢到REDIS初始化锁,并将其标记为pending状态
try {
RedisSEQTimer.getInstance().removeNotUsedKeys();
RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,从ZK上读取初始数据
jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,标记为ready状态
} catch (Exception e) {
log.error(e.getMessage(), e);
//初始化arch.seq REDIS数据异常,有可能是ZK相关问题,也有可能是REDIS问题,请排查
log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY);
jedisConn.del(_DEFAULT_SEQ_INIT_KEY);
}
}
//else{...} 没抢到REDIS初始化锁的话:不作任何处理
} catch (Exception e) {
log.error(e.getMessage(), e);
log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready");
} finally {
close(jedisConn);
}
}
public static Long getSEQ() {
return getSEQ(_DEFAULT_SEQ_NAME, 1);
}
public static Long getSEQ(long total) {
return getSEQ(_DEFAULT_SEQ_NAME, total);
}
public static Long getSEQ(String seqName, long total) {
Long result = null;
JedisConfig.JedisConn jedisConn = null;
try {
//获取redis连接
jedisConn = JedisConfig.getInstance().getConn();
//获得REDIS初始化状态不成功
if (!tryInitReady(jedisConn)) {
//arch.seq By REDIS版本不能正常初始化,请检查REDIS服务。
throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service.");
}
//开启分布式锁
//if (jedisConn.tryLock(seqName, 1000, 2000)) {
try {
String day = RedisSEQTimer.getInstance().getDayFormat();
String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total));
result = Long.parseLong(day + incrVal);
} catch (Exception e) {
e.printStackTrace();
log.warn("try lock failed,the arch.seq tool will be retry after sleep some times.");
Thread.sleep(randTime());
result = getSEQ(seqName, total);
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
//redis生成失败,返回本地ID:15位纳秒+1位自然数轮询
//在获取【自增序列号:{},序列号分布式锁:{}】时发生了异常,系统返回了本地生成的自增序列号,不影响系统使用,但请管理员尽快协查!
log.error("An exception occurred while acquiring self-incremental sequence number '{}', " +
"sequence number distributed lock '{}',The system returns the locally generated self-incremental " +
"sequence number, which does not affect the use of the system, but the administrator should check " +
"it as soon as possible.", seqName, seqName + "_LOCK");
result = xUUID();
} finally {
//切记,一定要释放分布式锁(注:释放锁的同时jedisConn会自动释放connection,无需再次CLOSE)
if (jedisConn != null) {
//jedisConn.unLock(seqName);
jedisConn.close();
}
if (log.isDebugEnabled()) {
log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace());
}
}
return result;
//arch.seq发生了不可预测的异常,请联系架构部处理!
//throw new RuntimeException("arch.seq发生了不可预测的异常,请联系架构部处理!");
}
private static String getStackTrace() {
StringBuilder result = new StringBuilder();
StackTraceElement[] element = Thread.currentThread().getStackTrace();
for (int i = 0; i < element.length; i++) {
result.append("\t").append(element[i]).append("\n");
}
return result.toString();
}
private static long randTime() {
return new Random().nextInt(50) + 50;
}
private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException {
int times = 0;
for (; times < 3; times++) {
if (getSEQInitReady(jedisConn)) {
break;
}
Thread.sleep(100);
}
return times < 3;
}
/**
* 获得SEQ初始化状态
*
* @param jedisConn
* @return
*/
private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) {
if (!_DEFAULT_SEQ_INIT_STATUS) {
synchronized (RedisSEQ.class) {
if (!_DEFAULT_SEQ_INIT_STATUS) {
_DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY));
}
}
}
return _DEFAULT_SEQ_INIT_STATUS;
}
/**
* 获得REDIS自增序列号最新值,并同步更新到ZK备份数据节点守护线程中
*
* @param jedisConn
* @param day
* @param seqName
* @param total
* @return
*/
private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) {
String key = seqName + "_" + day;
Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key);
if (incrVal > 9999999999L) {
throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal);
}
//塞到要更新的ZK队列中
RedisSEQTimer.getInstance().push(key, incrVal);
return incrVal;
}
/**
* 单机模式生成UUID
*
* @return
*/
private static Long xUUID() {
int rand = _LOCAL_INCR.incrementAndGet() % 10;
String result = System.nanoTime() + "" + rand;
return Long.parseLong(result);
}
}
package com.xxx.arch.seq.client.redis;
import com.xxx.arch.seq.client.tool.StreamCloseAble;
import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.client.zk.ZkClientUtil;
import org.apache.commons.lang3.time.DateUtils;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class RedisSEQTimer extends StreamCloseAble {
public static final String DAY_FORMAT_PATTERN = "yyMMdd";
public static volatile RedisSEQTimer redisSEQTimer;
private final ConcurrentHashMap<String, Long> REDIS_INCR_MAP = new ConcurrentHashMap<>();
private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient();
private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS";
//zk节点最大值每次递增数
private long _REDIS_MAXVALUE_INIT = 10_000L;
private Timer _TIMER = new Timer(true);
//是否处于清理状态
private volatile boolean _CLEAN_STATUS;
//清理key
private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY";
private RedisSEQTimer() {
super();
//启动zk巡查服务
_TIMER.schedule(new TimerTask() {
@Override
public void run() {
checkAndConfigure();
}
}, new Date(), 1 * 60 * 1000);
//每天定时清理垃圾数据
_TIMER.schedule(new TimerTask() {
@Override
public void run() {
removeNotUsedKeys();
}
}, getFirstTime(), 24 * 60 * 60 * 1000);
}
public static RedisSEQTimer getInstance() {
if (redisSEQTimer == null) {
synchronized (RedisSEQTimer.class) {
if (redisSEQTimer == null) {
redisSEQTimer = new RedisSEQTimer();
}
}
}
return redisSEQTimer;
}
/**
* 定期更新ZK节点
*/
private synchronized void checkAndConfigure() {
if (_CLEAN_STATUS) {
return;
}
if (REDIS_INCR_MAP.isEmpty()) {
return;
}
String endDay = "_" + getDayFormat();
List<String> notTodayKeys = new ArrayList<>();
Set<Map.Entry<String, Long>> entrySet = REDIS_INCR_MAP.entrySet();
for (Map.Entry<String, Long> entry : entrySet) {
//不是今天的key不作处理
if (!entry.getKey().endsWith(endDay)) {
notTodayKeys.add(entry.getKey());
return;
}
//将最新的值写到zk节点上 节点格式:<ARCH_SEQ前缀>/KEY_yyMMdd
String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey();
if (_ZK_CLIENT.exists(zkNode)) {
_ZK_CLIENT.writeData(zkNode, entry.getValue());
} else {
try {
_ZK_CLIENT.createPersistent(zkNode, entry.getValue());
} catch (RuntimeException e) {
//not to write log ,it's will be retry in next time.
}
}
}
;
if (!notTodayKeys.isEmpty()) {
for (String key : notTodayKeys) {
REDIS_INCR_MAP.remove(key);
}
}
}
/**
* 删除不再使用的KEY(包含redis和zk节点)
*/
public synchronized void removeNotUsedKeys() {
if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
return;
}
_CLEAN_STATUS = true;
JedisConfig.JedisConn jedisConn = null;
String requestId = UUID.randomUUID().toString();
boolean tryLock = false;
try {
List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);
//保留两天。考虑到多个机器的时间可能不一致,如果在刚过零点删除了昨天的sequence,另一台机器可能还需要使用它,则会出现id重复
Date now = new Date();
Date yesterday = DateUtils.addDays(now, -1);
List<String> keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday));
if (list != null && !list.isEmpty()) {
jedisConn = JedisConfig.getInstance().getConn();
if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) {
JedisConfig.JedisConn finalJedisConn = jedisConn;
for (String node : list) {
String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length());
if (!keepDays.contains(dayPart)) {
REDIS_INCR_MAP.remove(node);
finalJedisConn.del(node);
removeZkNode(node);
}
}
}
}
} finally {
_CLEAN_STATUS = false;
if (jedisConn != null) {
if (tryLock) {
jedisConn.unLock(_REMOVE_KEY, requestId);
}
jedisConn.close();
}
}
}
/**
* 移除ZK节点
*
* @param node
*/
private void removeZkNode(String node) {
String path = _DEFAULT_ZK_NAMESPACE + "/" + node;
if (_ZK_CLIENT.exists(path)) {
try {
_ZK_CLIENT.delete(path);
} catch (Exception e) {
}
}
}
/**
* 获得每天定时任务的执行时间
*
* @return
*/
private Date getFirstTime() {
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY, 24); // 24点 可以更改时间
calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分钟 随机
calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒 随机
return calendar.getTime();
}
/**
* 获得区间随机整数
*
* @param exclude - 最大数,exclude
* @param from - 最小数,include
* @return
*/
private int getRandNum(int exclude, int from) {
return new Random().nextInt(exclude) + from;
}
/**
* 将某天的KEY塞到相应队列
*
* @param key - 业务KEY key_yyMMdd
* @param val - 值
* @return 是否成功
*/
public synchronized void push(String key, Long val) {
REDIS_INCR_MAP.put(key, val);
}
public String getDayFormat() {
return getDayFormat(new Date());
}
public String getDayFormat(Date date) {
return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date);
}
/**
* 初始化redis keys
*/
public void initRedisKeys() {
if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
return;
}
List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);
if (list != null && !list.isEmpty()) {
Long zkVal;
JedisConfig.JedisConn jedisConn = null;
for (int i = 0; i < list.size(); i++) {
zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i));
if (zkVal != null) {
String requestId = UUID.randomUUID().toString();
boolean tryLock = false;
try {
jedisConn = JedisConfig.getInstance().getConn();
//获得锁才更新,没获得锁就放弃更新
if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) {
jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT));
}
} finally {
if (jedisConn != null) {
if (tryLock) {
jedisConn.unLock(list.get(i), requestId);
}
jedisConn.close();
}
}
}
}
}
}
}
package com.xxx.arch.seq.client.tool;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ZkClient {
private CuratorFramework client;
public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(serverList)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(retryPolicy)
.build();
client.start();
}
public boolean exists(String path) {
try {
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
return false;
}
}
public void writeData(String path, Long value) {
try {
client.setData().forPath(path, value.toString().getBytes());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public void createPersistent(String zkNode, Long value) {
try {
client.create().forPath(zkNode, value.toString().getBytes());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public List<String> getChildren(String path) {
try {
return client.getChildren().forPath(path);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return Collections.emptyList();
}
public Long readData(String path) {
try {
byte[] data = client.getData().forPath(path);
return Long.parseLong(new String(data));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
public void delete(String path) {
try {
client.delete().forPath(path);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
package com.xxx.arch.seq.client.zk;
import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkClientUtil {
private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class);
private static volatile ZkClient zkClient = null;
public static ZkClient getZkClient() {
if (zkClient == null) {
synchronized (ZkClientUtil.class) {
if (zkClient == null) {
initZkClient();
}
}
}
return zkClient;
}
private static void initZkClient() {
try {
String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING;
if (logger.isInfoEnabled()) {
logger.info("zk cluster[" + serverList + "]");
}
if (serverList == null || serverList.trim().isEmpty()) {
throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used");
} else {
zkClient = new ZkClient(serverList, 15000, 60000);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
package com.xxx.arch.seq.client.tool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
/**
* Created by zhangyang on 2016/5/31.
*/
public class StreamCloseAble {
private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class);
/**
* 关闭输入输出流
*
* @param closeAbles
*/
public static void close(Closeable... closeAbles) {
if (closeAbles == null || closeAbles.length <= 0) {
return;
}
for (Closeable closeAble : closeAbles) {
if (closeAble != null) {
try {
closeAble.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}
到此,相信大家对“如何实现基于Jedis+ZK的分布式序列号生成器”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4095038/blog/5013738