再看KafkaLag

在《Kafka的Lag计算误区及正确实现》一文中提及了kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,故要将PartitionAssignmentState前的protected修饰符去掉

专注于为中小企业提供网站设计制作、网站制作服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业木兰免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000+企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

可以直接将describeGroup返回的结果转换成JSON然后传至监控页面(supported by YANGliiN oba)。代码如下:

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions options =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(options);
ObjectMapper mapper = new ObjectMapper();
//1. 使用jackson-module-scala_2.12
mapper.registerModule(new DefaultScalaModule());
//2. 反序列化时忽略对象不存在的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//3. 将Scala对象序列化成JSON字符串
String source = mapper.writeValueAsString(kafkaConsumerGroupService.describeGroup()._2.get());

这里需要采用的是jackson-module-scala的包实现,如果直接用普通的JSON序列化方式那么会达不到想要的效果,jackson以及jackson-module-scala对应的Maven库如下:


    com.fasterxml.jackson.core
    jackson-core
    2.9.4


    com.fasterxml.jackson.module
    jackson-module-scala_2.12
    2.9.5

注意如果本地安装的Scala版本与所配置的jackson-module-scala版本不一致的话会报出一些异常。发散一下思维:既然可以序列化为JSON,那么完全可以通过JSON再反序列化会对象,只不过通过JSON作为中间媒介,将访问受限的Scala对象转变为Java对象,上面剩余代码如下:

//4. 将JSON字符串反序列化成Java对象
List target = mapper.readValue(source,
        getCollectionType(mapper,List.class,PartitionAssignmentState.class));
//5. 排序
target.sort((o1, o2) -> o1.getPartition() - o2.getPartition());
//6. 打印
printPasList(target);

如此就可以达到与前面几篇文章中关于获取消费者详情功能同样的效果。这里有两个注意要点:

  1. PartitionAssignmentState中的coordinator是Node类型,这个类型需要自定义,Kafka原生的会报错。
  2. 反序列化时Node会有一个empty的属性不识别,解决方案参考代码中的步骤2.

代码更多细节请参考:代码

通过JSON的序列化和反序列化操作实现了原本不能为之的事情,那么思维再发散一下,也可以序列化成字节流,比如通过ByteBuffer进行转换,只不过编程逻辑变得复杂了。

上面这段陈述有可能会让人觉得Scala与Java之间的互操作起来不容易,其实不然,上面这段陈述只是用来补充一下如何获取消费者详情的另一种方法,Scala与Java之间的互操作还是比较简单的,一般情况下都可以直接使用对方的类。对于集合而言,Scala中还有用于Scala与Java集合的互转的scala.collection.JavaConverters(scala2.8.1开始引入),与此雷同的scala.collection.JavaConversions已被标注为@Deprecated(since 2.12.0)。在scala代码中如果需要集合转换,首先引入scala.collection.JavaConverters._,进而显示调用asJava或者asScala方法完成转型。关于Scala与Java集合互转的介绍会在下一篇文章中呈现。


本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

需要更详细思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取

再看 Kafka Lag
再看 Kafka Lag
再看 Kafka Lag
再看 Kafka Lag


本文题目:再看KafkaLag
URL分享:http://csdahua.cn/article/pedoid.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流