温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka Java客户端代码的示例分析

发布时间:2021-10-29 18:00:34 来源:亿速云 阅读:193 作者:柒染 栏目:编程语言

这篇文章将为大家详细讲解有关Kafka Java客户端代码的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

kafka是一种高吞吐量的分布式发布订阅消息系统

kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

kafka_2.10-0.8.1.1 3个节点做的集群

zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

import java.util.Collections;  import java.util.Date;  import java.util.Properties;  import java.util.Random;     import kafka.javaapi.producer.Producer;  import kafka.producer.KeyedMessage;  import kafka.producer.ProducerConfig;     /**   * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example   * @author Fung   *   */ public class ProducerDemo {      public static void main(String[] args) {          Random rnd = new Random();          int events=100;             // 设置配置属性          Properties props = new Properties();          props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");          props.put("serializer.class", "kafka.serializer.StringEncoder");          // key.serializer.class默认为serializer.class          props.put("key.serializer.class", "kafka.serializer.StringEncoder");          // 可选配置,如果不配置,则使用默认的partitioner          props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");          // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失          // 值为0,1,-1,可以参考          // http://kafka.apache.org/08/configuration.html          props.put("request.required.acks", "1");          ProducerConfig config = new ProducerConfig(props);             // 创建producer          Producer<String, String> producer = new Producer<String, String>(config);          // 产生并发送消息          long start=System.currentTimeMillis();          for (long i = 0; i < events; i++) {              long runtime = new Date().getTime();              String ip = "192.168.2." + i;//rnd.nextInt(255);              String msg = runtime + ",www.example.com," + ip;              //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0              KeyedMessage<String, String> data = new KeyedMessage<String, String>(                      "page_visits", ip, msg);              producer.send(data);          }          System.out.println("耗时:" + (System.currentTimeMillis() - start));          // 关闭producer          producer.close();      }  }

消息消费者代码示例

import java.util.HashMap;  import java.util.List;  import java.util.Map;  import java.util.Properties;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;     import kafka.consumer.Consumer;  import kafka.consumer.ConsumerConfig;  import kafka.consumer.KafkaStream;  import kafka.javaapi.consumer.ConsumerConnector;     /**   * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example   *    * @author Fung   *   */ public class ConsumerDemo {      private final ConsumerConnector consumer;      private final String topic;      private ExecutorService executor;         public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {          consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));          this.topic = a_topic;      }         public void shutdown() {          if (consumer != null)              consumer.shutdown();          if (executor != null)              executor.shutdown();      }         public void run(int numThreads) {          Map<String, Integer> topicCountMap = new HashMap<String, Integer>();          topicCountMap.put(topic, new Integer(numThreads));          Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer                  .createMessageStreams(topicCountMap);          List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);             // now launch all the threads          executor = Executors.newFixedThreadPool(numThreads);             // now create an object to consume the messages          //          int threadNumber = 0;          for (final KafkaStream stream : streams) {              executor.submit(new ConsumerMsgTask(stream, threadNumber));              threadNumber++;          }      }         private static ConsumerConfig createConsumerConfig(String a_zookeeper,              String a_groupId) {          Properties props = new Properties();          props.put("zookeeper.connect", a_zookeeper);          props.put("group.id", a_groupId);          props.put("zookeeper.session.timeout.ms", "400");          props.put("zookeeper.sync.time.ms", "200");          props.put("auto.commit.interval.ms", "1000");             return new ConsumerConfig(props);      }         public static void main(String[] arg) {          String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };          String zooKeeper = args[0];          String groupId = args[1];          String topic = args[2];          int threads = Integer.parseInt(args[3]);             ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);          demo.run(threads);             try {              Thread.sleep(10000);          } catch (InterruptedException ie) {             }          demo.shutdown();      }  }

消息处理类

import kafka.consumer.ConsumerIterator;  import kafka.consumer.KafkaStream;     public class ConsumerMsgTask implements Runnable {      private KafkaStream m_stream;      private int m_threadNumber;         public ConsumerMsgTask(KafkaStream stream, int threadNumber) {          m_threadNumber = threadNumber;          m_stream = stream;      }         public void run() {          ConsumerIterator<byte[], byte[]> it = m_stream.iterator();          while (it.hasNext())              System.out.println("Thread " + m_threadNumber + ": "                     + new String(it.next().message()));          System.out.println("Shutting down Thread: " + m_threadNumber);      }  }

Partitioner类示例

import kafka.producer.Partitioner;  import kafka.utils.VerifiableProperties;     public class PartitionerDemo implements Partitioner {      public PartitionerDemo(VerifiableProperties props) {         }         @Override     public int partition(Object obj, int numPartitions) {          int partition = 0;          if (obj instanceof String) {              String key=(String)obj;              int offset = key.lastIndexOf('.');              if (offset > 0) {                  partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;              }          }else{              partition = obj.toString().length() % numPartitions;          }                     return partition;      }     }

关于Kafka Java客户端代码的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI