温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

kafka high-level consumer 多线程访

发布时间:2020-06-09 04:07:32 来源:网络 阅读:713 作者:jndsdsoft 栏目:移动开发

在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

 

 

Java代码  kafka high-level consumer 多线程访

  1.  def hasNext(): Boolean = {  

  2.     if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常  

  3.       throw new IllegalStateException("Iterator is in failed state")  

  4.     state match {  

  5.       case DONE => false  

  6.       case READY => true  

  7.       case _ => maybeComputeNext()  

  8.     }  

  9.   }  

  10.   

  11.   

  12.   def maybeComputeNext(): Boolean = {  

  13.     state = FAILED              //重置了状态  

  14.     nextItem = Some(makeNext())          

  15.     if(state == DONE) {  

  16.       false  

  17.     } else {  

  18.       state = READY  

  19.       true  

  20.     }  

  21.   }  

  22.   下载

  23.   

  24. protected def makeNext(): MessageAndMetadata[K, V] = {  

  25.     var currentDataChunk: FetchedDataChunk = null  

  26.     // if we don't have an iterator, get one  

  27.     var localCurrent = current.get()  

  28.     if(localCurrent == null || !localCurrent.hasNext) {  

  29.       if (consumerTimeoutMs < 0)  

  30.         currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞  

  31.   

  32.       else {  

  33.         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)  

  34.         if (currentDataChunk == null) {  

  35.           // reset state to make the iterator re-iterable  

  36.           resetState()  

  37.           throw new ConsumerTimeoutException  

  38.         }  

  39.       }  

  40. //省略部分代码  

  41. }  


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI