如何解析Flume与Kafka整合-创新互联

这篇文章给大家介绍如何解析Flume与Kafka整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

创新互联公司主营东阳网站建设的网络公司,主营网站建设方案,成都APP应用开发,东阳h5成都微信小程序搭建,东阳网站营销推广欢迎东阳等地区企业咨询

Flume与Kafka整合


一、概念
1、Flume:Cloudera 开发的分布式日志收集系统,是一种分布式,可靠且可用的服务,用于高效地收集,汇总和移动大量日志数据。 它具有基于流式数据流的简单而灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。Flume分为OG、NG版本,其中Flume OG 的最后一个发行版本 0.94.0,之后为NG版本。
 
2、Kafka:作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。在Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。协议是版本控制的,并保持与旧版本的向后兼容性。Kafka提供Java客户端,但客户端可以使用多种语言。

3、Kafka通常用于两大类应用,如下:
   A、构建可在系统或应用程序之间可靠获取数据的实时流数据管道
   B、构建实时流应用程序,用于转换或响应数据流
   C、Kafka每个记录由一个键,一个值和一个时间戳组成。

二、产述背景
    基于大数据领域实现日志数据时时采集及数据传递等需要,据此需求下试着完成flume+kafka扇入、扇出功能整合,其中扇出包括:复制流、复用流等功能性测试。后续根据实际需要,将完善kafka与spark streaming进行整合整理工作。
    注:此文档仅限于功能性测试,性能优化方面请大家根据实际情况增加。

三、部署安装
1、测试环境说明:
   操作系统:CentOS 7
   Flume版本:flume-ng-1.6.0-cdh6.7.0
   Kafka版本:kafka_2.11-0.10.0.1
   JDK版本:JDK1.8.0
   Scala版本:2.11.8
2、测试步骤:
2.1、flume部署
2.1.1、下载安装介质,并解压:

此处)折叠或打开

此处)折叠或打开

此处)折叠或打开

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3.     netcatagent.sources = netcat_sources

  4.     netcatagent.channels = c1 c2

  5.     netcatagent.sinks = logger_sinks kafka_sinks

  6.     

  7.     netcatagent.sources.netcat_sources.type = netcat

  8.     netcatagent.sources.netcat_sources.bind = 0.0.0.0

  9.     netcatagent.sources.netcat_sources.port = 44444

  10.     

  11.     netcatagent.channels.c1.type = memory

  12.     netcatagent.channels.c1.capacity = 1000

  13.     netcatagent.channels.c1.transactionCapacity = 100

  14.     

  15.     netcatagent.channels.c2.type = memory

  16.     netcatagent.channels.c2.capacity = 1000

  17.     netcatagent.channels.c2.transactionCapacity = 100

  18.     

  19.     netcatagent.sinks.logger_sinks.type = logger

  20.     

  21.     netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  22.     netcatagent.sinks.kafka_sinks.topic = test

  23.     netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  24.     netcatagent.sinks.kafka_sinks.requiredAcks = 0

  25.     ##netcatagent.sinks.kafka_sinks.batchSize = 20

  26.     netcatagent.sinks.kafka_sinks.producer.type=sync

  27.     netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  28.     netcatagent.sinks.kafka_sinks.partition.key=0

  29.     netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  30.     netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  31.     netcatagent.sinks.kafka_sinks.max.message.size=1000000

  32.     

  33.     netcatagent.sources.netcat_sources.selector.type = replicating

  34.     

  35.     netcatagent.sources.netcat_sources.channels = c1 c2

  36.     netcatagent.sinks.logger_sinks.channel = c1

  37.     netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、启动各测试命令:
   A、启动flume的agent(于192.168.137.130):
      flume-ng agent --name netcatagent \
       --conf $FLUME_HOME/conf \
       --conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
       -Dflume.root.logger=INFO,console
    B、启动kafka消费者(于192.168.137.132):
        kafka-console-consumer.sh \
         --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
          --from-beginning --topic test
     C、测试发送(于192.168.137.130与于192.168.137.132)
telnet发送结果

如何解析Flume与Kafka整合

kafka消费结果

如何解析Flume与Kafka整合

最终logger接收结果

如何解析Flume与Kafka整合

         
   至此flume+kafka扇出--复制流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)测试与验证完成。
   
2.5、flume+kafka扇出--复用流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)

   暂无,后续补充



四、部署安装及验证过程中出现的问题

    1、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,一直未收到kafka数据
        主要原因是在做kafka的配置时在配置文件(server.properties)中写成内容:
       zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
       但在创建topics时,使用的是:
       kafka-topics.sh --create \
   --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
 
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在创建topics的时增加了/kafka
最终使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令检查没有topics信息才发现此问题
   
   解决办法:将两个信息同步即可
   
2、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,启动flume的agent时报错。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more


   解决办法:官网资料存在问题,org.apache.flume.source.kafka,KafkaSource其中不应该包括逗号,改为:org.apache.flume.source.kafka.KafkaSource即可。详细官网
如何解析Flume与Kafka整合

关于如何解析Flume与Kafka整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


本文名称:如何解析Flume与Kafka整合-创新互联
URL地址:http://csdahua.cn/article/cdodeg.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流