Kafka消息序列化和反序列化(下)-创新互联

有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。对应上面自定义的Company类型的Deserializer就需要实现org.apache.kafka.common.serialization.Deserializer接口,这个接口同样有三个方法:

创新互联是一家集网站建设,武陵企业网站建设,武陵品牌网站建设,网站定制,武陵网站建设报价,网络营销,网络优化,武陵网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

public void configure(Map configs, boolean isKey):用来配置当前类。
public byte[] serialize(String topic, T data):用来执行反序列化。如果data为null建议处理的时候直接返回null而不是抛出一个异常。
public void close():用来关闭当前序列化器。
下面就来看一下DemoSerializer对应的反序列化的DemoDeserializer,详细代码如下:

public class DemoDeserializer implements Deserializer {
    public void configure(Map configs, boolean isKey) {}
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressLen);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name,address);
    }
    public void close() {}
}

有些读者可能对新版的Consumer不是很熟悉,这里顺带着举一个完整的消费示例,并以DemoDeserializer作为消息Value的反序列化器。

Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Arrays.asList(topic));
try {
   while (true) {
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
            System.out.println(info);
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map offsets, Exception exception) {
                if (exception != null) {
                    String error = String.format("Commit failed for offsets {}", offsets, exception);
                    System.out.println(error);
                }
            }
        });
    }
} finally {
    consumer.close();
}

有些时候自定义的类型还可以和Avro、ProtoBuf等联合使用,而且这样更加的方便快捷,比如我们将前面Company的Serializer和Deserializer用Protostuff包装一下,由于篇幅限制,笔者这里只罗列出对应的serialize和deserialize方法,详细参考如下:

public byte[] serialize(String topic, Company data) {
    if (data == null) {
        return null;
    }
    Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    byte[] protostuff = null;
    try {
        protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
    return protostuff;
}

public Company deserialize(String topic, byte[] data) {
    if (data == null) {
        return null;
    }
    Schema schema = RuntimeSchema.getSchema(Company.class);
    Company ans = new Company();
    ProtostuffIOUtil.mergeFrom(data, ans, schema);
    return ans;
}

如果Company的字段很多,我们使用Protostuff进一步封装一下的方式就显得简洁很多。不过这个不是最主要的,而最主要的是经过Protostuff包装之后,这个Serializer和Deserializer可以向前兼容(新加字段采用默认值)和向后兼容(忽略新加字段),这个特性Avro和Protobuf也都具备。

自定义的类型有一个不得不面对的问题就是Kafka Producer和Kafka Consumer之间的序列化和反序列化的兼容性,试想对于StringSerializer来说,Kafka Consumer可以顺其自然的采用StringDeserializer,不过对于Company这种专用类型,某个服务使用DemoSerializer进行了序列化之后,那么下游的消费者服务必须也要实现对应的DemoDeserializer。再者,如果上游的Company类型改变,下游也需要跟着重新实现一个新的DemoSerializer,这个后面所面临的难题可想而知。所以,如无特殊需要,笔者不建议使用自定义的序列化和反序列化器;如有业务需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包装,尽可能的实现得更加通用且向前后兼容。

题外话,对于Kafka的“深耕者”Confluent来说,还有其自身的一套序列化和反序列化解决方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相关资料,读者如有兴趣可以自行扩展学习。


本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

需要更详细思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取

Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


当前题目:Kafka消息序列化和反序列化(下)-创新互联
网站链接:http://csdahua.cn/article/cejcdo.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流