今天就跟大家聊聊有关java中怎么利用mongodb实现分布式锁,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
通过线程安全findAndModify 实现锁
实现
定义锁存储对象:
/**
* mongodb 分布式锁
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "distributed-lock-doc")
public class LockDocument {
@Id
private String id;
private long expireAt;
private String token;
}
定义Lock API:
public interface LockService {
String acquire(String key, long expiration);
boolean release(String key, String token);
boolean refresh(String key, String token, long expiration);
}
获取锁:
@Override
public String acquire(String key, long expiration) {
Query query = Query.query(Criteria.where("_id").is(key));
String token = this.generateToken();
Update update = new Update()
.setOnInsert("_id", key)
.setOnInsert("expireAt", System.currentTimeMillis() + expiration)
.setOnInsert("token", token);
FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
.returnNew(true);
LockDocument doc = mongoTemplate.findAndModify(query, update, options,
LockDocument.class);
boolean locked = doc.getToken() != null && doc.getToken().equals(token);
// 如果已过期
if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
DeleteResult deleted = this.mongoTemplate.remove(
Query.query(Criteria.where("_id").is(key)
.and("token").is(doc.getToken())
.and("expireAt").is(doc.getExpireAt())),
LockDocument.class);
if (deleted.getDeletedCount() >= 1) {
// 成功释放锁, 再次尝试获取锁
return this.acquire(key, expiration);
}
}
log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
key, token, locked);
return locked ? token : null;
}
原理:
先尝试upsert锁对象,如果成功且token一致,说明拿到锁
否则加锁失败
如果未拿到锁,但是锁已过期,尝试删除锁
如果删除成功,再次尝试拿锁
如果失败,说明锁可能已经续期了
释放和续期锁:
@Override
public boolean release(String key, String token) {
Query query = Query.query(Criteria.where("_id").is(key)
.and("token").is(token));
DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
boolean released = deleted.getDeletedCount() == 1;
if (released) {
log.debug("Remove query successfully affected 1 record for key {} with token {}",
key, token);
} else if (deleted.getDeletedCount() > 0) {
log.error("Unexpected result from release for key {} with token {}, released {}",
key, token, deleted);
} else {
log.error("Remove query did not affect any records for key {} with token {}",
key, token);
}
return released;
}
@Override
public boolean refresh(String key, String token,
long expiration) {
Query query = Query.query(Criteria.where("_id").is(key)
.and("token").is(token));
Update update = Update.update("expireAt",
System.currentTimeMillis() + expiration);
UpdateResult updated =
mongoTemplate.updateFirst(query, update, LockDocument.class);
final boolean refreshed = updated.getModifiedCount() == 1;
if (refreshed) {
log.debug("Refresh query successfully affected 1 record for key {} " +
"with token {}", key, token);
} else if (updated.getModifiedCount() > 0) {
log.error("Unexpected result from refresh for key {} with token {}, " +
"released {}", key, token, updated);
} else {
log.warn("Refresh query did not affect any records for key {} with token {}. " +
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key, token);
}
return refreshed;
}
private LockService lockService;
private void tryAcquireLockAndSchedule() {
while (!this.stopSchedule) {
// 尝试拿锁
this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
if (this.token != null) {
// 拿到锁
} else {
// 等待LOCK_EXPIRATION, 再次尝试
Thread.sleep(LOCK_EXPIRATION);
}
}
}
先尝试拿锁,如果获取到token,说明拿锁成功
否则可以sleep一段时间后再拿锁
看完上述内容,你们对java中怎么利用mongodb实现分布式锁有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。