SyncRequestProcessor,该处理器将请求存入磁盘,其将请求批量的存入磁盘以提高效率,请求在写入磁盘之前是不会被转发到下个处理器的。
SyncRequestProcessor维护了ZooKeeperServer实例,其用于获取ZooKeeper的数据库和其他信息;维护了一个处理请求的队列,其用于存放请求;维护了一个处理快照的线程,用于处理快照;维护了一个running标识,标识SyncRequestProcessor是否在运行;同时还维护了一个等待被刷新到磁盘的请求队列。
// Zookeeper服务器
private final ZooKeeperServer zks;
// 请求队列
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
// 下个处理器
private final RequestProcessor nextProcessor;
// 快照处理线程
private Thread snapInProcess = null;
// 是否在运行中
volatile private boolean running;
/**
* Transactions that have been written and are waiting to be flushed to
* disk. Basically this is the list of SyncItems whose callbacks will be
* invoked after flush returns successfully.
*/
// 等待被刷新到磁盘的请求队列
private final LinkedList<Request> toFlush = new LinkedList<Request>();
// 随机数生成器
private final Random r = new Random();
/**
* The number of log entries to log before starting a snapshot
*/
// 快照个数
private static int snapCount = ZooKeeperServer.getSnapCount();
// 结束请求标识
private final Request requestOfDeath = Request.requestOfDeath;
构造函数首先会调用父类的构造函数,然后根据构造函数参数给类的属性赋值,其中会确定下个处理器,并会设置该处理器正在运行的标识。
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
running = true;
}
@Override
public void run() {
try {
// 写日志数量初始化为0
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
// 防止集群中所有机器在同一时刻进行数据快照,对是否进行数据快照增加随机因素
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
// 没有需要刷新到磁盘的请求
if (toFlush.isEmpty()) {
// 从请求队列中取出一个请求,若queuedRequests队列为空会阻塞
si = queuedRequests.take();
} else {
// 从请求队列中取出一个请求,若queuedRequests队列为空,则返回空,不会阻塞
si = queuedRequests.poll();
// 取出的请求为空
if (si == null) {
// 刷新数据磁盘
flush(toFlush);
continue;
}
}
// 在关闭处理器之后,会添加requestOfDeath请求到queuedRequests队列,表示关闭后不再处理请求
if (si == requestOfDeath) {
break;
}
// 请求不为空,处理请求
if (si != null) {
// track the number of records written to the log
// 将写请求添加至事务日志文件 FileTxnSnapLog.append(si)
if (zks.getZKDatabase().append(si)) {
// 日志写入,logCount加1
logCount++;
//确定是否需要进行数据快照
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
// 滚动日志,从当前日志文件滚到下一个日志文件,不是回滚
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) { // 正在进行快照
LOG.warn("Too busy to snap, skipping");
} else {
// 创建线程来处理快照
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
// 进行快照
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
// 开始快照线程处理
snapInProcess.start();
}
// 重置为0
logCount = 0;
}
} else if (toFlush.isEmpty()) {// 读请求会走到这里,查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
// 下个处理器开始处理请求
nextProcessor.proce***equest(si);
// 处理器是Flushable的,刷新数据到磁盘
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
// 将请求添加至被刷新至磁盘队列
toFlush.add(si);
if (toFlush.size() > 1000) {// 队列大小大于1000,直接刷新到磁盘
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
flush将toFlush队列中的请求刷新到磁盘中。
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
// 提交事务至ZK数据库
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
// 从队列移除请求
Request i = toFlush.remove();
// 下个处理器开始处理请求
if (nextProcessor != null) {
nextProcessor.proce***equest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
函数用于关闭SyncRequestProcessor处理器,其首先会在queuedRequests队列中添加一个结束请求requestOfDeath,然后再判断SyncRequestProcessor是否还在运行,若是,则会等待其结束;之后判断toFlush队列是否为空,若不为空,则刷新到磁盘中
public void shutdown() {
LOG.info("Shutting down");
// 添加结束请求请求至队列
queuedRequests.add(requestOfDeath);
try {
// 还在运行
if(running){
this.join();// 等待该线程终止
}
if (!toFlush.isEmpty()) {// 队列不为空,刷新到磁盘
flush(toFlush);
}
} catch(InterruptedException e) {
LOG.warn("Interrupted while wating for " + this + " to finish");
} catch (IOException e) {
LOG.warn("Got IO exception during shutdown");
} catch (RequestProcessorException e) {
LOG.warn("Got request processor exception during shutdown");
}
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。