我在《在Kafka中使用Avro编码消息:Producter篇》文章中简单介绍了如何发送 Avro 类型的消息到 Kafka。本文接着上文介绍如何从 Kafka 读取 Avro 格式的消息。关于 Avro 我这就不再介绍了。
从 Kafka 中读取 Avro 格式的消息
从 Kafka 中读取 Avro 格式的消息和读取其他类型的类型一样,都是创建相关的流,然后迭代:
ConsumerConnector consumer = ...; Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { .... }
关键在于如何将读出来的 Avro 类型字节数组转换成我们要的数据。这里还是使用到我们之前介绍的模式解释器:
Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
上面的 USER_SCHEMA
就是我们在《在Kafka中使用Avro编码消息:Producter篇》文章中介绍的消息模式,我们创建了一个 recordInjection
对象,这个对象就可以利用刚刚解析好的模式将读出来的字节数组反序列化成我们写入的数据:
GenericRecord record = recordInjection.invert(message).get();
然后我们就可以通过下面方法获取写入的数据:
record.get("str1") record.get("str2") record.get("int1")
到这里我们差不多就可以完整地写出读取 Avro 数据的 Consumer 了,下面用到的 Kafka 版本是 0.8.2.2:
package com.iteblog.avro; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * Created by https://www.iteblog.com on 2017-09-20. */ public class AvroKafkaConsumer { Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer"); private final ConsumerConnector consumer; private final String topic; public AvroKafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata messageAndMetadata = it.next(); String key = new String((byte[])messageAndMetadata.key()); byte[] message = (byte[]) messageAndMetadata.message(); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); GenericRecord record = recordInjection.invert(message).get(); logger.info("key=" + key + ", str1= " + record.get("str1") + ", str2= " + record.get("str2") + ", int1=" + record.get("int1")); } } consumer.shutdown(); } public static void main(String[] args) { AvroKafkaConsumer simpleConsumer = new AvroKafkaConsumer("www.iteblog.com:2181", "testgroup", "iteblog"); simpleConsumer.testConsumer(); } }
运行
运行的时候同样需要用到 avro-1.8.1.jar,slf4j-api-1.7.21.jar,log4j-1.2.17.jar,slf4j-log4j12-1.7.7.jar 以及 scala-library.jar等相关Jar包,具体运行方式如下:
CLASSPATH=$CLASSPATH: for i in /home/iteblog/lib/*.jar ; do CLASSPATH=$CLASSPATH:$i done java -cp $CLASSPATH:flink-kafka-1.0-SNAPSHOT.jar com.iteblog.avro.AvroKafkaConsumer
当我们运行这个程序的时候,我们就可以打印出如下的数据:
09:33:11,917 INFO AvroKafkaConsumer - key=0, str1= Str 1-0, str2= Str 2-0, int1=0 09:33:11,918 INFO AvroKafkaConsumer - key=5, str1= Str 1-5, str2= Str 2-5, int1=5 09:33:11,919 INFO AvroKafkaConsumer - key=1, str1= Str 1-1, str2= Str 2-1, int1=1 09:33:11,920 INFO AvroKafkaConsumer - key=4, str1= Str 1-4, str2= Str 2-4, int1=4 09:33:11,921 INFO AvroKafkaConsumer - key=2, str1= Str 1-2, str2= Str 2-2, int1=2 09:33:11,922 INFO AvroKafkaConsumer - key=6, str1= Str 1-6, str2= Str 2-6, int1=6 09:33:11,923 INFO AvroKafkaConsumer - key=3, str1= Str 1-3, str2= Str 2-3, int1=3 09:33:11,923 INFO AvroKafkaConsumer - key=7, str1= Str 1-7, str2= Str 2-7, int1=7 09:33:11,924 INFO AvroKafkaConsumer - key=8, str1= Str 1-8, str2= Str 2-8, int1=8 09:33:11,926 INFO AvroKafkaConsumer - key=9, str1= Str 1-9, str2= Str 2-9, int1=9
这些数据就是我们在上文的 Producter 程序中生成的。
Kafka 0.9.x 版本实现
如果咱们想在 Kafka 0.9.x 版本里面读取数据,可以如下使用:
package com.iteblog.avro; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Properties; /** * Created by https://www.iteblog.com on 2017-09-20. */ public class AvroKafkaConsumer09 { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer"); Properties props = new Properties(); props.put("bootstrap.servers", "www.iteblog.com:2181"); props.put("group.id", "testgroup"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); String topic = "iteblog"; consumer.subscribe(Collections.singletonList(topic)); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { GenericRecord genericRecord = recordInjection.invert(record.value()).get(); logger.info("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), " + record.key(), genericRecord.get("str1")); } } } finally { consumer.close(); } } }
下篇文章我将介绍如何通过 Spark 从 Kafka 读取 Avro 的数据,敬请关注。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Kafka中使用Avro编码消息:Consumer篇】(https://www.iteblog.com/archives/2210.html)