zoukankan      html  css  js  c++  java
  • 用ELK 实时处理搜索日志

    转载请标明原处:http://blog.csdn.net/hu948162999/article/details/50563110


    本来这块业务 是放到SolrCloud上去的 , 然后 採用solr的facet统计查询,

    详细代码參考之前写的文章:http://blog.csdn.net/hu948162999/article/details/50162643

       近期遇到SolrCloud 遇到一些问题。。查询db时间过长,SolrCloud的长连接CloudSolrServer老timeout。索引的效率也不够满

    意。了稳定。临时先还原solr单机版本号(上线时,被运维打回来了)。搜索日志就用elasticsearch实时去处理。

       大概流程:

       基于日志系统ELK 的原型下。參考ELK处理nginx日志文章:http://blog.csdn.net/hu948162999/article/details/50502875

    还是用logstash正则去解析搜索日志。搜索日志採用log4j生成。logstash检測到传递给elasticsearch。

    log4j:

    log4j.appender.E.layout.ConversionPattern= %d|%m%n

    logstash配置 

    新增logstash_search.conf:

    input {
        file {
            type => "searchword"
            path => ["/home/work/log/hotword/data"]
        }
    }
    filter {
        grok {
            match => [
               "message", "%{TIMESTAMP_ISO8601:timestamp}|{%{GREEDYDATA:kvs}}"
            ]
        }
        kv {
            source => "kvs"
            field_split => ","
            value_split => "="
            trimkey => " "
        }
        date {
            match => ["timestamp" , "YYYY-MM-dd HH:mm:ss,SSS"]
        }
    }
    output {
        elasticsearch {
            hosts => ["host1:9200", "host2:9200", "host3:9200", "host4:9200"]
            index => "searchword-%{+YYYY.MM.dd}"
        }
    }


      这里要注意聚合操作的时候。

    Logstash 自带有一个优化好的模板。其默认的mapping,string类型都是analyzer。

    也就是说,默认分

    词是採用单字分词的。

      改动默认的logstash mapping模板。參考 http://udn.yyuap.com/doc/logstash-best-practice-cn/output/elasticsearch.html

    结构例如以下:



    启动logstash:

    nohup  bin/logstash -f conf/logstash_search.conf &

    运行搜索測试。

    能够立即在elasticsearch的插件上看到该搜索行为日志的数据索引。这就是elk的实时性了。



    elasticsearch java端

    參考指定mapping和聚合查询代码:

    	Client client=esobj.getClient();
    		SearchResponse response = client.prepareSearch("searchword*").setTypes("searchword").addAggregation(AggregationBuilders.terms("hotword").field("keyword")).execute().actionGet();
            Terms terms = response.getAggregations().get("hotword");

    	
    	/**
    	 * 初始化索引
    	 * @param client
    	 * @param indexName
    	 * @param indexType
    	 * @param cols
    	 * @return 初始化成功,返回true。否则返回false
    	 * @throws Exception
    	 */
    	public static boolean initIndexMapping(Client client, String indexName, String indexType, List<ColumnInfo> cols) throws Exception {
    		if(StringUtil.isEmpty(indexName) || StringUtil.isEmpty(indexType)) {
    			return false;
    		}
    		indexName = indexName.toLowerCase();
            indexType = indexType.toLowerCase();
    		//推断索引库是否存在
    		if(indicesExists(client, indexName)) {
    			 OpenIndexRequestBuilder openIndexBuilder = new OpenIndexRequestBuilder(client.admin().indices(), OpenIndexAction.INSTANCE);
                 openIndexBuilder.setIndices(indexName).execute().actionGet();
    		}else{
    			 //不存在则新建索引库
    			 client.admin().indices().prepareCreate(indexName).execute().actionGet();
    		}
    		
    		TypesExistsRequest ter = new TypesExistsRequest(new String[]{indexName.toLowerCase()}, indexType);
    		boolean typeExists = client.admin().indices().typesExists(ter).actionGet().isExists();
    		//假设 存在 返回!不能覆盖mapping
    		if(typeExists) {
    			return true;
    		}
    		//定义索引字段属性
    		XContentBuilder mapping = jsonBuilder().startObject().startObject(indexType).startObject("properties");
    		for (ColumnInfo col : cols) {
            	String colName = col.getName().toLowerCase().trim();
            	String colType = col.getType().toLowerCase().trim();
            	        	
            	if("string".equals(colType)) {
            		mapping.startObject(colName).field("type", colType).field("store", ""+col.isStore()).field("indexAnalyzer", col.getIndexAnalyzer()).field("searchAnalyzer", col.getSearchAnalyzer()).field("include_in_all", col.isStore()).field("boost", col.getBoost()).endObject();
            	}else if("long".equals(colType)) {        		
            		mapping.startObject(colName).field("type", colType).field("index", "not_analyzed").field("include_in_all", false).endObject();
            	}else if("date".equals(colType)) {
            		mapping.startObject(colName).field("type", colType).field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd").field("index", "not_analyzed").field("include_in_all", false).endObject();
            	}else {
            		mapping.startObject(colName).field("type", "string").field("index", "not_analyzed").endObject();
            	}
                
            }
            mapping.endObject().endObject().endObject();
            PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(indexType).source(mapping);
            PutMappingResponse response = client.admin().indices().putMapping(mappingRequest).actionGet();
    		return response.isAcknowledged();
    	}




  • 相关阅读:
    类与继承
    闭包、原型链和继承
    ajax(下)和“承诺”
    ajax(上)
    Ubuntu电源键软关机设置
    金老师语录摘要(七)
    金老师语录摘要(六)
    金老师语录摘要(四)
    金老师语录摘要(三)
    金老师语录摘要(二)
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7357779.html
Copyright © 2011-2022 走看看