温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka简单客户端编程的示例分析

发布时间:2021-08-05 14:38:25 来源:亿速云 阅读:139 作者:小新 栏目:编程语言

这篇文章将为大家详细讲解有关Kafka简单客户端编程的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka; 
 
/** 
 * 配置项 
 * @author liuyazhuang 
 * 
 */ 
public class Config { 
  
 /** 
  * 话题 
  */ 
 public static final String TOPIC = "wordcount"; 
 /** 
  * 线程数 
  */ 
 public static final Integer THREADS = 1; 
}

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka; 
 
import java.util.Properties; 
 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
 
/** 
 * 生产者实例 
 * @author liuyazhuang 
 * 
 */ 
public class ProducerDemo { 
 public static void main(String[] args) throws Exception { 
  Properties props = new Properties(); 
  props.put("zk.connect", "192.168.209.121:2181"); 
  props.put("metadata.broker.list","192.168.209.121:9092"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
  ProducerConfig config = new ProducerConfig(props); 
  Producer<String, String> producer = new Producer<String, String>(config); 
 
  // 发送业务消息 
  // 读取文件 读取内存数据库 读socket端口 
  for (int i = 1; i <= 100; i++) { 
   Thread.sleep(500); 
   producer.send(new KeyedMessage<String, String>(Config.TOPIC, 
     "this number ===>>> " + i)); 
  } 
 
 } 
}

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka; 
 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 
 
/** 
 * 消费者实例 
 * @author liuyazhuang 
 * 
 */ 
public class ConsumerDemo { 
  
 
 public static void main(String[] args) { 
   
  Properties props = new Properties(); 
  props.put("zookeeper.connect", "192.168.209.121:2181"); 
  props.put("group.id", "1111"); 
  props.put("auto.offset.reset", "smallest"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
 
  ConsumerConfig config = new ConsumerConfig(props); 
  ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); 
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
  topicCountMap.put(Config.TOPIC, Config.THREADS); 
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); 
   
  for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ 
   new Thread(new Runnable() { 
    @Override 
    public void run() { 
     for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ 
      String msg = new String(mm.message()); 
      System.out.println(msg); 
     } 
    } 
    
   }).start(); 
   
  } 
 } 
}

四、运行实例

首先,运行消费者类ConsumerDemo
运行结果如下:

Kafka简单客户端编程的示例分析

没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:

Kafka简单客户端编程的示例分析

打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。

关于“Kafka简单客户端编程的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI