本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
⌨️ 引入依赖
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>{project.version}</version></dependency>
⌨️ 使用 Kafka Schema
>>> 添加生产者代码
String topic = "persistent://public/default/test";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());Producer<Integer, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));}producer.close();
>>> 添加消费者代码
String topic = "persistent://public/default/test";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());@SuppressWarnings("resource")Consumer<Integer, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); }); // Commit last offset consumer.commitSync();}
⌨️ 使用 Pulsar Schema
@Data@ToString@EqualsAndHashCodepublic class Foo { @Nullable private String field1; @Nullable private String field2; private int field3;}@Data@ToString@EqualsAndHashCodepublic class Bar { private boolean field1;}
>>> 生产者端代码
String topic = "persistent://public/default/test-avro";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());Bar bar = new Bar();bar.setField1(true);Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar)); log.info("Message {} sent successfully", i);}producer.close();
>>> 消费者端代码
String topic = "persistent://public/default/test-avro";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());Bar bar = new Bar();bar.setField1(true);Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);@SuppressWarnings("resource")Consumer<Foo, Bar> consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema);consumer.subscribe(Arrays.asList(topic));while (true) { ConsumerRecords<Foo, Bar> records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); }); // Commit last offset consumer.commitSync();}
以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/apachepulsar/blog/4466692