本篇文章为大家展示了Jedis中怎么实现分布式锁,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
package com.xxx.arch.seq.client.redis;
import java.io.Closeable;
import java.util.*;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.*;
import com.xxx.arch.seq.constant.Constants;
/**
* Jedis配置实例封装类(兼容单节点连接池和集群节点)
*
* @author zhangyang
* @createDate 2019-01-22
* @since 2.x
*/
public class JedisConfig {
private static volatile JedisConfig redisConfig;
//当前模式:1单例,2哨兵 3集群Cluster
private int singleton;
//jedis连接池
private JedisPool jedisPool;
private JedisSentinelPool sentinelPool;
private Jedis jedis;
//jeids集群
private JedisCluster jedisCluster;
private JedisConfig() {
Properties redisProp = new Properties();
redisProp.setProperty("arch.seq.redis.host", Constants.ARCH_SEQ_REDIS_NODES);
redisProp.setProperty("arch.seq.redis.password", Constants.ARCH_SEQ_REDIS_PASSWORD);
redisProp.setProperty("arch.seq.redis.sentinel.master", Constants.ARCH_SEQ_REDIS_SENTINEL_MASTER);
String hostConf = redisProp.getProperty("arch.seq.redis.host");
if (hostConf == null) {
throw new RuntimeException("get redis configuration error");
}
if ("${arch.seq.redis.host}".equals(hostConf)) {
throw new RuntimeException("please check occ var \"arch.seq.redis.host\"");
}
if(!hostConf.contains(",")&&!hostConf.contains(">>")){
singleton = 1;
}else if(hostConf.contains(">>")){
singleton=2;
}else{
singleton=3;
}
if (singleton==1) {
initJedisPool(redisProp);
} else if(singleton==2){
initJedisSentinel(redisProp);
}else{
initJedisCluster(redisProp);
}
}
private void initJedisPool(Properties redisProp) {
String[] hostConf = redisProp.getProperty("arch.seq.redis.host").split(":");
this.jedisPool = new JedisPool(new JedisPoolConfig(), hostConf[0], Integer.valueOf(hostConf[1]),
0, redisProp.getProperty("arch.seq.redis.password"));
}
private void initJedisCluster(Properties redisProp) {
String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(",");
Set<HostAndPort> nodes = new HashSet<>();
String[] hostConf;
for (String hc : hostConfList) {
hostConf = hc.split(":");
nodes.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])));
}
jedisCluster = new JedisCluster(nodes, 0, 0, 4,
redisProp.getProperty("arch.seq.redis.password"), new GenericObjectPoolConfig());
}
private void initJedisSentinel(Properties redisProp) {
String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(">>");
Set sentinels = new HashSet();
String[] hostConf;
for (String hc : hostConfList) {
hostConf= hc.split(":");
sentinels.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])).toString());
}
sentinelPool = new JedisSentinelPool(redisProp.getProperty("arch.seq.redis.sentinel.master"), sentinels,redisProp.getProperty("arch.seq.redis.password"));
jedis = sentinelPool.getResource();
}
public static JedisConfig getInstance() {
if (redisConfig == null) {
synchronized (JedisConfig.class) {
if (redisConfig == null) {
redisConfig = new JedisConfig();
}
}
}
return redisConfig;
}
public JedisConn getConn() {
if(singleton==1){
return new JedisConn(jedisPool.getResource());
}
if(singleton==2){
return new JedisConn(sentinelPool.getResource());
}
if(singleton==3){
return new JedisConn(jedisCluster);
}
return null;
}
/**
* redis连接封装类,支持单机和集群,支持常规操作,支持分布式锁
*/
public static class JedisConn implements Closeable {
private JedisCommands invoke;
public JedisConn(JedisCommands invoke) {
this.invoke = invoke;
}
/**
* 设置一个必须是不存在的值
*
* @param key - 关键字
* @param value
* @return 1-成功 0-失败
*/
public Long setnx(String key, String value) {
return invoke.setnx(key, value);
}
/**
* 获得一个值
*
* @param key - 关键字
* @return
*/
public String get(String key) {
return invoke.get(key);
}
/**
* 更新一个值
*
* @param key - 关键字
* @param value - 值
* @return
*/
public String set(String key, String value) {
return invoke.set(key, value);
}
/**
* 更新一个值,并返回更新前的老值
*
* @param key - 关键字
* @param value - 值
* @return 更新前的老值
*/
public String getSet(String key, String value) {
return invoke.getSet(key, value);
}
/**
* 删除一个值
*
* @param key - 关键字
*/
public void del(String key) {
invoke.del(key);
}
/**
* 递增一个值,并返回最新值
*
* @param key - 关键字
* @return 最新值
*/
public Long incr(String key) {
return invoke.incr(key);
}
/**
* 递增一个值,并返回最新值
*
* @param key - 关键字
* @return 最新值
*/
public Long incr(String key, long total) {
return invoke.incrBy(key, total);
}
/**
* 设置过期时间
*
* @param key - 关键字
* @param expireTime - 过期时间,毫秒
* @return
*/
public Long expire(String key, long expireTime) {
return invoke.pexpire(key, expireTime);
}
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";//NX是不存在时才set
private static final String SET_WITH_EXPIRE_TIME = "PX";//默认毫秒, 解释:EX是秒,PX是毫秒
/**
* 尝试获取分布式锁
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String result = invoke.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public boolean unLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = evalScript(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
private Object evalScript(String script, List<String> keys, List<String> args) {
return (invoke instanceof Jedis)
? ((Jedis)invoke).eval(script, keys, args)
: ((JedisCluster)invoke).eval(script, keys, args);
}
public void close() {
if (invoke instanceof Jedis) {
((Jedis) invoke).close();
}
}
}
}
package com.xxx.arch.seq.core;
import com.xxx.arch.seq.client.redis.JedisConfig;
import com.xxx.arch.seq.task.ContinuationOfLifeTask;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.*;
/**
* 基于redis 的分布式锁
*/
@Slf4j
public final class DistributedLock {
//续命任务延迟队列
private static final DelayQueue<ContinuationOfLifeTask> QUEUE = new DelayQueue<>();
//续命任务映射缓存
private static final Map<String, ContinuationOfLifeTask> CACHE = new ConcurrentHashMap<>();
//延长锁时间的守护线程
private static final ExecutorService CONTINUATION_OF_LIFE_EXECUTOR = Executors.newSingleThreadExecutor();
private static final long TIMEOUT = 1000;
//限制最大长度
private static final int SIZE = 5000;
static {
/**
* 延长锁时间的核心线程代码
*/
CONTINUATION_OF_LIFE_EXECUTOR.execute(() -> {
while (true){
//获取优先级最高的任务
ContinuationOfLifeTask task;
try {
task = QUEUE.take();
} catch (InterruptedException e) {
continue;
}
if (task == null){
continue;
}
//验证是否活跃
long nowTime = System.currentTimeMillis();
if (task.isActive() && !task.isDiscarded(nowTime)){
//是否可以执行
if (task.isExecute(nowTime)){
task.execute();
//验证是否还需要续命
if (task.isActive() && task.checkCount()){
QUEUE.add(task);
}else {
//清理不需要任务的缓存
CACHE.remove(task.getId());
}
}else {
//清理不需要任务的缓存
//如果是时间没到不能执行的 不需要删除,一般不存在
if (nowTime >= task.getEndTime()){
CACHE.remove(task.getId());
}
}
}else {
//清理过期的或者不活跃的任务
CACHE.remove(task.getId());
}
}
});
}
private DistributedLock(){}
/**
* 获得分布式锁
*
* @param lockKey - 分布式锁的key,保证全局唯一
* @param requestId - 本次请求的唯一ID,可用UUID等生成
* @param expireTime - 锁获取后,使用的最长时间,毫秒
* @param flagCount - 延续锁的次数
* @return - 是否成功获取锁
*/
public static boolean getDistributeLock(String lockKey, String requestId, long expireTime,int flagCount) {
JedisConfig.JedisConn conn = null;
try {
conn = JedisConfig.getInstance().getConn();
//获取锁
if (QUEUE.size() < SIZE && conn.tryLock(lockKey, requestId, expireTime)){
//创建一个续命任务
ContinuationOfLifeTask task = ContinuationOfLifeTask.build(lockKey, requestId, expireTime, flagCount);
//如果放入队列超时 或者失败
if (!QUEUE.offer(task, TIMEOUT, TimeUnit.MILLISECONDS)){
//释放锁
releaseDistributeLock(lockKey, requestId);
//返回锁获取失败
return false;
}
//设置缓存
CACHE.put(lockKey + requestId, task);
return true;
}
return false;
} finally {
if (conn != null) {
conn.close();
}
}
}
/**
* 获取分布式锁
* 默认是延长3次锁寿命
* @param lockKey 分布式锁的key,保证全局唯一
* @param requestId 本次请求的唯一ID,可用UUID等生成
* @param expireTime 锁获取后,使用的最长时间,毫秒
* @return
*/
public static boolean getDefaultDistributeLock(String lockKey, String requestId, long expireTime) {
return getDistributeLock(lockKey, requestId, expireTime, 3);
}
/**
* 获取永久分布式锁(默认24小时)
* 使用时候记得一定要释放锁
* @param lockKey
* @param requestId
* @return
*/
public static boolean getPermanentDistributedLock(String lockKey, String requestId){
return getDistributeLock(lockKey, requestId, 10000, 6 * 60 * 24);
}
/**
* 释放分布式锁
*
* @param lockKey - 分布式锁的key,保证全局唯一
* @param requestId - 本次请求的唯一ID,可用UUID等生成
* @return
*/
public static boolean releaseDistributeLock(String lockKey, String requestId) {
JedisConfig.JedisConn conn = null;
try {
ContinuationOfLifeTask task = CACHE.remove(lockKey + requestId);
if (task != null){
task.setActive(false);
QUEUE.remove(task);
}
conn = JedisConfig.getInstance().getConn();
return conn.unLock(lockKey, requestId);
} finally {
if (conn != null) {
conn.close();
}
}
}
}
package com.xxx.arch.seq.task;
import com.xxx.arch.seq.client.redis.JedisConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 续命任务类
*/
@Slf4j
public class ContinuationOfLifeTask implements Delayed {
private String id;
//结束时间 即为需要续命的时间
private long endTime;
//是否还存活
private volatile boolean active;
//锁的key
private String lockKey;
//锁超时时间
private long timeout;
//锁的持续时间
private long expireTime;
//锁的续命次数 -1 代表无限
private int flagCount;
//续命次数统计 count 不能大于 flagCount
private int count;
private ContinuationOfLifeTask(String id, String lockKey, long expireTime, long endTime, long timeout, int flagCount) {
this.id = id;
this.lockKey = lockKey;
this.expireTime = expireTime;
this.endTime = endTime;
this.timeout = timeout;
this.flagCount = flagCount;
this.active = true;
this.count = 0;
}
public void execute() {
//该续命任务是否还存活
if (active) {
JedisConfig.JedisConn conn = null;
// 当前次数是否小于指定续命次数
// 当前时间是否大于结束时间
if (flagCount > count) {
//重试次数
int retryCount = 0;
// 当前时间是否大于过期时间
while (System.currentTimeMillis() >= endTime && retryCount < 3) {
try {
// 续命延期锁的过期时间
(conn = JedisConfig.getInstance().getConn()).expire(lockKey, expireTime);
long expiration = expireTime / 10;
//保证最少提前100毫秒
timeout = System.currentTimeMillis() + expireTime;
//更新结束时间
endTime = timeout - (expiration > 100 ? expiration : 100);
//增加执行次数
count++;
if (log.isDebugEnabled()) {
log.debug("【续命】锁关键字:{},续期:{}毫秒,计数:{}", lockKey, expireTime, count);
}
break;
} catch (Exception e) {
try {
log.error(e.getMessage(), e);
retryCount++;
Thread.sleep(100L);
} catch (InterruptedException ie) {
log.error(e.getMessage(), e);
}
} finally {
if (conn != null) {
conn.close();
}
}
}
}
}
}
/**
* 是否可以执行 必须是活跃且执行次数没有到最大值
* 且时间没有过期的任务才能执行
*
* @return
*/
public boolean isExecute(long nowTime) {
return nowTime >= endTime && nowTime <= timeout && flagCount >= count;
}
/**
* 是否丢弃
*
* @return
*/
public boolean isDiscarded(long nowTime) {
return nowTime > timeout || flagCount <= count;
}
public boolean checkCount() {
return count < flagCount;
}
public static final ContinuationOfLifeTask build(String lockKey, String requestId, long expireTime, int flagCount) {
if (StringUtils.isAnyBlank(lockKey, requestId)) {
throw new IllegalArgumentException("lockKey Can't be blank !");
}
//校验入参如果锁定时间低于 1000 毫秒 延长到 1000 毫秒
if (expireTime < 1000) {
expireTime = 1000;
}
//校验 锁的续命次数 如果小于 -1 则默认等于3
if (flagCount < -1) {
flagCount = 3;
}
long expiration = expireTime / 10;
//保证最少提前100毫秒
long timeout = System.currentTimeMillis() + expireTime;
long endTime = timeout - (expiration > 500 ? expiration : 500);
return new ContinuationOfLifeTask(lockKey + requestId, lockKey, expireTime, endTime, timeout, flagCount);
}
public long getEndTime() {
return endTime;
}
public ContinuationOfLifeTask setEndTime(long endTime) {
this.endTime = endTime;
return this;
}
public boolean isActive() {
return active;
}
public ContinuationOfLifeTask setActive(boolean active) {
this.active = active;
return this;
}
public String getLockKey() {
return lockKey;
}
public ContinuationOfLifeTask setLockKey(String lockKey) {
this.lockKey = lockKey;
return this;
}
public long getExpireTime() {
return expireTime;
}
public ContinuationOfLifeTask setExpireTime(long expireTime) {
this.expireTime = expireTime;
return this;
}
public int getFlagCount() {
return flagCount;
}
public ContinuationOfLifeTask setFlagCount(int flagCount) {
this.flagCount = flagCount;
return this;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((endTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
package com.xxx.arch.seq.constant;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import org.apache.commons.lang3.StringUtils;
public class Constants {
//apollo公共的ZK配置集群NameSpace
public static final String ZK_NAME_SPACE = "33.zk";
public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence-redis";
// public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence";
public static final String ARCH_SEQ_ZOOKEEPER_CONNECT_STRING = getConfig(ZK_NAME_SPACE,"zk.address", "");
public static final String ARCH_SEQ_REDIS_NODES = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.nodes", "");
public static final String ARCH_SEQ_REDIS_SENTINEL_MASTER = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.sentinel.master", "");
public static final String ARCH_SEQ_REDIS_PASSWORD = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.common.key", "");
public static String getConfig(String nameSpace,String key,String defultValue){
if(StringUtils.isBlank(nameSpace)){
return "";
}
Config config = ConfigService.getConfig(nameSpace);
return config.getProperty(key,defultValue);
}
}
上述内容就是Jedis中怎么实现分布式锁,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4095038/blog/5013709