kafka-Storm中如何将日志文件打印到local

这篇文章给大家介绍kafka-Storm中如何将日志文件打印到local,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

创新互联专业为企业提供雨花台网站建设、雨花台做网站、雨花台网站设计、雨花台网站制作等企业网站建设、网页设计与制作、雨花台企业网站模板建站服务,十多年雨花台做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

阅读前提:

        1 : 您可能需要对  logback 日志系统有所了解

        2 :您可能需要对于 kafka 有初步的了解

        3:请代码查看之前,请您仔细参考系统的业务图解

    由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:

        业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS

    1: 一个正式环境系统的系统设计图解:

                kafka-Storm中如何将日志文件打印到local

              通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:

            其一: 实时通道

            其二:离线通道

       在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。

      也就是:

            如下图所示:

kafka-Storm中如何将日志文件打印到local

      在kafka之中,通常而言,有如下的 代码 用来处理:

         在这里我们针对了2种日志,有两个Consumer用来处理

package com.mixbox.kafka.consumer;

public class logSave {

	public static void main(String[] args) throws Exception {

		Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
		visitlog.start();

		Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
		orderlog.start();

	}
}

     在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。

package com.mixbox.kafka.consumer;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * @author Yin Shuai
 */
public class Consumer_Thread extends Thread {

	// 在事实上我们会依据传递的topic名称,来生成不桐的记录机器
	// private Logger _log_order = LoggerFactory.getLogger("order");
	// private Logger _log_visit = LoggerFactory.getLogger("visit");

	private Logger _log = null;

	private final ConsumerConnector _consumer;
	private final String _topic;

	public Consumer_Thread(String topic) {

		_consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig());
		this._topic = topic;

		_log = LoggerFactory.getLogger(_topic);

		System.err.println("log的名称" + _topic);

	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KafkaProperties.zkConnect);
		// 在这里我们的组ID为logSave
		props.put("group.id", KafkaProperties.logSave);
		props.put("zookeeper.session.timeout.ms", "100000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);

	}

	public void run() {

		Map topicCountMap = new HashMap();
		topicCountMap.put(_topic, new Integer(1));

		Map>> consumerMap = _consumer
				.createMessageStreams(topicCountMap);

		for (KafkaStream kafkaStream : consumerMap.get(_topic)) {
			ConsumerIterator iterator = kafkaStream.iterator();
			while (iterator.hasNext()) {
				MessageAndMetadata next = iterator.next();
				try {

					// 在这里我们分拆了一个Consumer 来处理visit日志
					logFile(next);
					System.out.println("message:"
							+ new String(next.message(), "utf-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void logFile(MessageAndMetadata next)
			throws UnsupportedEncodingException {
		_log.info(new String(next.message(), "utf-8"));
	}

}

    一个简单的小tips:

        logback.xml  ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。




	
	
	

		
		
		
		

		
		
			f:/opt/log/test.%d{yyyy-MM-dd}.log
			
		

		
		
			
				%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n
		
	



	
	

		
			e:/logs/error/error.log
		
		
			
				ERROR
			
			ACCEPT
			DENY
		
		
		
			e:/logs/yuanshi-%d{yyyy-MM-dd}.log
			10
		

		
		
			%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n
		
	


	
	

		E:\logs\file\file.log

		
			INFO
			ACCEPT
			DENY
		

		
		
			e:/logs/venality-%d{yyyy-MM-dd}.log
			
			10
		

		
		
			%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n
		
	


	
		
			E:\logs\visitlog\visit.log
		
		
			%msg%n
		

		
			INFO
		
		
			E:\logs\visit.log.%d{yyyy-MM-dd}
			
		
	
	
		
	


	
		
			E:\logs\orderlog\order.log
		
		
			%msg%n
			
		

		
			INFO
		
		
			E:\logs\order.log.%d{yyyy-MM-dd}
			
		
	
	
		
	


	
		
	

关于kafka-Storm中如何将日志文件打印到local就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


当前题目:kafka-Storm中如何将日志文件打印到local
网站网址:http://csdahua.cn/article/jiccgo.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流