本篇内容介绍了“怎么用C语言与java实现kafka avro生产者和消费者”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
原始数据格式
请求IP 应答IP 域名 类型
3183375114 3729673322 "mx.hc.spdrb.com" A
以上数据是test文件的内容
schema定义如下
{
"type":"record",
"name":"data",
"fields":
[
{"name":"qip","type":"long"},
{"name":"aip","type":"long"},
{"name":"domain","type":"string"},
{"name":"type","type":"string"}
]
}
C语言生产者代码如下
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include "avro.h"
#include "producer.h"
const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";
const char *file = "avro_file.dat";
const char *brokers = "xxxxx:9092"; const char *topic = "topic1";
void print_avro_value(avro_value_t *value) { char *json; if (!avro_value_to_json(value, 1, &json)) { printf("%s\n", json); free(json); } }
if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA),
&test_schema, &error)) {
fprintf(stderr, "schema error\n");
exit(EXIT_FAILURE);
}
return test_schema;
avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error;
}
void add_data(avro_writer_t writer, avro_schema_t schema, int64_t qip, uint64_t aip, const char* domain, const char* type)
{
avro_datum_t data = avro_record(schema);
avro_datum_t dqip = avro_int64(qip);
avro_datum_t daip = avro_int64(aip);
avro_datum_t ddomain = avro_string(domain);
avro_datum_t dtype = avro_string(type);
avro_record_set(data, "qip", dqip);
avro_record_set(data, "aip", daip);
avro_record_set(data, "domain", ddomain);
avro_record_set(data, "type", dtype);
avro_write_data(writer, NULL, f2c);
avro_datum_decref(dqip);
avro_datum_decref(daip);
avro_datum_decref(ddomain);
avro_datum_decref(dtype);
avro_datum_decref(data);
}
int main(int argc, char* argv[])
{
int len = 0;
avro_schema_t schema;
avro_writer_t mem_writer;
char buf[1024];
char tmp[4][500]={{0x00}};
FILE *fp = fopen("test","r");
if(!fp)
{
printf("open test file error!\n");
return -1;
}
schema = init_schema();
mem_writer = avro_writer_memory(buf, 1024);
while(fgets(buf, 1024,fp)!=NULL)
{
if(buf[strlen(buf)] == '\n') buf[strlen(buf)] = '\0';
if(sscanf(buf, "%s%s%s%s", tmp[0],tmp[1],tmp[2],tmp[3])!=4) continue;
add_data(mem_writer,schema,atol(tmp[0]),atol(tmp[1]),tmp[2],tmp[3]);
printf("data len = %ld\n", avro_writer_tell(mem_writer));
len = avro_writer_tell(mem_writer);
kafka_putdata(buf, len,brokers,topic);//librdkafka实现的生产者代码 未列出
memset(tmp, 0x00, sizeof(tmp));
memset(buf, 0x00, sizeof(buf));
avro_writer_reset(mem_writer);
}
fclose(fp);
avro_writer_free(mem_writer);
return 0;
}
C语言实现的消费者如下
#include "consumer.h"
#include "avro.h"
#include <time.h>
#include <unistd.h>
const char *brokers = "xxxx:9092"; const char *topic = "topic1"; const char *group = "avrotest"; const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";
avro_schema_t init_schema()
{
avro_schema_t test_schema;
avro_schema_error_t error;
if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA),
&test_schema, &error)) {
fprintf(stderr, "schema error\n");
exit(EXIT_FAILURE);
}
return test_schema;
}
void print_data(avro_reader_t reader, avro_schema_t schema) { avro_datum_t data; if(avro_read_data(reader, schema, schema, &data) == 0)
{
int64_t qip;
int64_t aip;
char *domain;
char *type;
avro_datum_t q_datum,a_datum,d_datum,t_datum;
avro_record_get(data, "qip", &q_datum);
avro_int64_get(q_datum, &qip);
avro_record_get(data, "aip", &a_datum);
avro_int64_get(a_datum, &aip);
avro_record_get(data, "domain", &d_datum);
avro_string_get(d_datum, &domain);
avro_record_get(data, "type", &t_datum);
avro_string_get(t_datum, &type);
printf("qip: %lld, aip: %lld,domain: %s,type:%s\n", qip,aip,domain,type);
avro_datum_decref(data);
}
int main(int argc, char* argv[])
{
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *topics;
if(initKafka(&rk, brokers, group, topic, &topics)<0){return -1;}
char buf[1024] = {0x00};
int len = 0;
avro_schema_t schema;
avro_reader_t mem_reader;
schema = init_schema();
mem_reader = avro_reader_memory(buf, 1024);
while(1)
{
get_consumer_msg(rk, buf, &len); //librdkafka实现的消费者 代码未列出
if(len == 0) continue;
printf("len=%d\n",len);
print_data(mem_reader,schema);
avro_reader_reset(mem_reader);
memset(buf, 0x00, sizeof(buf));
}
return 0;
}
C编译的Makefile如下 两个C程序通用
TARGET=avro-test
INCLUDE=./avrolib/include/
SLIB=./avrolib/lib/libavro.a
DLIB=-lz -llzma -lrdkafka
INC = -I. -I./avrolib/include SOURCES =$(wildcard *.c)
OBJECTS =$(SOURCES:.c=.o)
RM=rm -rf
CC=gcc -g
CFLAGS= -Wall $(INC)
all:$(TARGET)
$(TARGET): $(OBJECTS)
$(CC) -o $@ $? $(SLIB) $(DLIB) $(CFLAGS)
:$(SOURCES)
$(CC) -c
clean:
$(RM) $(TARGET) $(OBJECTS) *~
java消费者 gradle配置
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.avro', name: 'avro', version: '1.9.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
}
avro解析 借鉴别人 言作者未知 请作者见谅
package zc;
import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader;
import java.io.IOException;
public class MyRecordDecoder {
public static GenericRecord genericRecord;
datumReader; static MyRecordDecoder myRecordDecoder = new MyRecordDecoder();
final String USER_SCHEMA = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";
public MyRecordDecoder() {
Schema schema = null; schema = new Schema.Parser().parse(USER_SCHEMA);
datumReader = new SpecificDatumReader<GenericRecord>(schema);
}
public GenericRecord getGenericRecord(BinaryDecoder decoder, byte[] value) throws IOException{
return datumReader.read(null, decoder);
}
public static MyRecordDecoder getInstance() {
if (myRecordDecoder==null)
myRecordDecoder = new MyRecordDecoder();
return myRecordDecoder;
}
}
java消费者 package zc;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections; import java.util.Properties;
public class KafkaMessageAvro{ public static void main(String[] args) throws Exception {
String inTopic = args[0];
Properties props = new Properties();
props.setProperty("bootstrap.servers", "xxxxx:9092");
props.setProperty("group.id", "flink-topn-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(inTopic));
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
byte[] ss = record.value();
if (ss==null) {
continue;
}
System.out.println(ss.toString());
GenericRecord genericRecord = null;
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(ss, null);
while (!decoder.isEnd()) {
genericRecord = MyRecordDecoder.getInstance().getGenericRecord(decoder, ss);
System.out.println(genericRecord.get("qip").toString()+" "+genericRecord.get("aip").toString()+" "+genericRecord.get("domain").toString()+" "+genericRecord.get("type").toString());
}
}
}
} finally {
consumer.close();
}
}
“怎么用C语言与java实现kafka avro生产者和消费者”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/2534959/blog/3119260