zoukankan      html  css  js  c++  java
  • (一个)kafka-jstorm集群实时日志分析 它 ---------kafka实时日志处理

    package com.doctor.logbackextend;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * zookeeper 和kafka环境准备好。

    本地端口号默认设置 * * @author doctor * * @time 2014年10月24日 下午3:14:01 */ public class KafkaAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class); /** 先启动此測试方法,模拟log日志输出到kafka */ @Test public void test_log_producer() { while(true){ LOG.info("test_log_producer : " + RandomStringUtils.random(3, "hello doctro,how are you,and you")); } } /** 再启动此測试方法。模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */ @Test public void test_comsumer(){ Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); props.put("group.id", "kafkatest-group"); // props.put("zookeeper.session.timeout.ms", "400"); // props.put("zookeeper.sync.time.ms", "200"); // props.put("auto.commit.interval.ms", "1000"); ConsumerConfig paramConsumerConfig = new ConsumerConfig(props ); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig ); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put("kafka-test", new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test"); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String("test_comsumer: " + new String(it.next().message()))); } } }


          为了实时日志处理互联网系统的日志,对于电商来说具有非常重要的意义,比方,淘宝购物时候,你浏览某些商品的时候。系统后台实时日志处理分析后,系统能够向用户实时推荐给用户相关商品。来引导用户的选择等等。

            为了实时日志处理。我们选择kafka集群,日志的处理分析选择jstorm集群,至于jstorm处理的结果,你能够选择保存到数据库里。入hbase、mysql。maridb等。

    系统的日志接口选择了slf4j,logback组合,为了让系统的日志可以写入kafka集群,选择扩展logback Appender。在logback里配置一下。就行自己主动输出日志到kafka集群。

    kafka的集群安装,在此不介绍了,为了模拟真实性,zookeeper本地集群也安装部署了。


    以下是怎样扩展logback Appender

    package com.doctor.logbackextend;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import ch.qos.logback.classic.spi.ILoggingEvent;
    import ch.qos.logback.core.AppenderBase;
    
    public class KafkaAppender extends AppenderBase<ILoggingEvent> {
    
    	private String topic;
    	private String zookeeperHost;
    	
    
    	private String broker;
    	private Producer<String, String> producer;
    	private Formatter formatter;
    	
    	public String getBroker() {
    		return broker;
    	}
    
    	public void setBroker(String broker) {
    		this.broker = broker;
    	}
    	@Override
    	protected void append(ILoggingEvent eventObject) {
    		String message = this.formatter.formate(eventObject);
    		this.producer.send(new KeyedMessage<String, String>(this.topic, message));
    
    	}
    
    	@Override
    	public void start() {
    		if (this.formatter == null) {
    			this.formatter = new MessageFormatter();
    		}
    		
    		super.start();
    		Properties props = new Properties();
    		props.put("zk.connect", this.zookeeperHost);
    		props.put("metadata.broker.list", this.broker);
    		props.put("serializer.class", "kafka.serializer.StringEncoder");
    		
    		ProducerConfig config = new ProducerConfig(props);
    		this.producer = new Producer<String, String>(config);
    	}
    
    	@Override
    	public void stop() {
    		super.stop();
    		this.producer.close();
    	}
    
    	
    	
    	public String getTopic() {
    		return topic;
    	}
    
    	public void setTopic(String topic) {
    		this.topic = topic;
    	}
    
    	public String getZookeeperHost() {
    		return zookeeperHost;
    	}
    
    	public void setZookeeperHost(String zookeeperHost) {
    		this.zookeeperHost = zookeeperHost;
    	}
    
    	public Producer<String, String> getProducer() {
    		return producer;
    	}
    
    	public void setProducer(Producer<String, String> producer) {
    		this.producer = producer;
    	}
    
    
    	public Formatter getFormatter() {
    		return formatter;
    	}
    
    	public void setFormatter(Formatter formatter) {
    		this.formatter = formatter;
    	}
    	
    	
    	
    	/**
    	 * 格式化日志格式
    	 * @author doctor
    	 *
    	 * @time   2014年10月24日 上午10:37:17
    	 */
    	interface Formatter{
    		String formate(ILoggingEvent event);
    	}
    	
    	public static class MessageFormatter implements Formatter{
    
    		@Override
    		public String formate(ILoggingEvent event) {
    			
    			return event.getFormattedMessage();
    		}
    		
    	}
    }
    


    对于日志的输出格式MessageFormatter没有特殊处理,由于仅仅是模拟一下,你能够制定你的格式,入json等。

    在logback.xml的配置例如以下:

    <appender name="kafka" class="com.doctor.logbackextend.KafkaAppender">
     		<topic>kafka-test</topic>
     		<!-- <zookeeperHost>127.0.0.1:2181</zookeeperHost> -->
     		<!-- <broker>127.0.0.1:9092</broker> -->
     		<zookeeperHost>127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183</zookeeperHost>
     		<broker>127.0.0.1:9092,127.0.0.1:9093</broker>
     	</appender>
     	
     	
    	<root level="all">
    		<appender-ref ref="stdout" />
    		<appender-ref ref="defaultAppender" />
    		<appender-ref ref="kafka" />
    	</root>

      <zookeeperHost>
        我本地启动了三个zookeer。依据配置。你能够知道是怎样配置的吧。

       kafka集群的broker我配置了两个,都是在本地机器。


    測试代码:

    package com.doctor.logbackextend;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * zookeeper 和kafka环境准备好。本地端口号默认设置
     * 
     * @author doctor
     *
     * @time   2014年10月24日 下午3:14:01
     */
    public class KafkaAppenderTest {
    	private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);
    	
    
    	/** 先启动此測试方法,模拟log日志输出到kafka */
    	@Test
    	public void test_log_producer() {
    		while(true){
    			LOG.info("test_log_producer : "  + RandomStringUtils.random(3, "hello doctro,how are you,and you"));
    		}
    	}
    	
    	
    	/** 再启动此測试方法,模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */
    	@Test
    	public void test_comsumer(){
    		Properties props = new Properties();
    		props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
    		props.put("group.id", "kafkatest-group");
    //		props.put("zookeeper.session.timeout.ms", "400");
    //		props.put("zookeeper.sync.time.ms", "200");
    //		props.put("auto.commit.interval.ms", "1000");
    		ConsumerConfig paramConsumerConfig = new ConsumerConfig(props );
    		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig );
    		
    		Map<String, Integer> topicCountMap = new HashMap<>();
    		topicCountMap.put("kafka-test", new Integer(1));
    		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap);
    		List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test");
    		
    		for (KafkaStream<byte[], byte[]> stream : streams) {
    			ConsumerIterator<byte[], byte[]> it = stream.iterator();
    			while(it.hasNext())
    			System.out.println(new String("test_comsumer: " + new String(it.next().message())));
    		}
    		
    		
    	}
    
    }
    


    结果,明天再附上截图。

    版权声明:本文博客原创文章,博客,未经同意,不得转载。

  • 相关阅读:
    第 1 章 第 11 题 图纸传递问题
    第 1 章 第 10 题 主键查找问题 哈希表实现
    第 1 章 第 9 题 使用未初始化数组问题 设立辅助数组实现
    第 1 章 第 8 题 分批排序问题( 扩展 ) 位向量实现
    第 1 章 第 7 题 位向量中的异常处理问题
    JAVA实现多线程处理批量发送短信、APP推送
    转载的一些面试题
    使用Flexible实现手淘H5页面的终端适配
    2016前端代码总结
    移动前端的一些坑和解决方法(外观表现)
  • 原文地址:https://www.cnblogs.com/yxwkf/p/4614262.html
Copyright © 2011-2022 走看看