在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。
Java代码
def hasNext(): Boolean = {
if(state == FAILED) //处于FAILED状态时,另外线程访问会直接异常
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
def maybeComputeNext(): Boolean = {
state = FAILED //重置了状态
nextItem = Some(makeNext())
if(state == DONE) {
false
} else {
state = READY
true
}
}
下载
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take //channel是BlockingQueue这里会阻塞
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
//省略部分代码
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。