zoukankan      html  css  js  c++  java
  • ElasticSearch大批量数据入库

    最近着手处理大批量数据的任务。

    现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。

    数据分布情况

    13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)

    名称                                                 大小(b)
    gather_1_2014-02-27-14-50-0.txt                      568497
    gather_1_2014-02-27-14-50-1.txt                      568665
    gather_1_2014-02-27-14-50-2.txt                      568172
    gather_1_2014-02-27-14-50-3.txt                      568275

    同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.

    每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。

    针对上述情况需要对目前的储存结构进行优化。

    才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。

    测试平台

    cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz   * 32
    内存: 64G
    硬盘:1.5T
    操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)

    读取文件的目录结构:

    [test@test001 data]$ ls
    0  1  2  3

     简单测试代码:

    public class FileReader
    {
    
    	private File file;
    	private String splitCharactor;
    	private Map<String, Class<?>> colNames;
    	private static final Logger LOG = Logger.getLogger(FileReader.class);
    
    	/**
    	 * @param path
    	 *            文件路径
    	 * @param fileName
    	 *            文件名
    	 * @param splitCharactor
    	 *            拆分字符
    	 * @param colNames
    	 *            主键名称
    	 */
    	public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames)
    	{
    		this.file = file;
    		this.splitCharactor = splitCharactor;
    		this.colNames = colNames;
    	}
    
    	/**
    	 * 读取文件
    	 * 
    	 * @return
    	 * @throws Exception
    	 */
    	public List<Map<String, Object>> readFile() throws Exception
    	{
    		List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
    		if (!file.isFile())
    		{
    			throw new Exception("File not exists." + file.getName());
    		}
    		LineIterator lineIterator = null;
    		try
    		{
    			lineIterator = FileUtils.lineIterator(file, "UTF-8");
    			while (lineIterator.hasNext())
    			{
    				String line = lineIterator.next();
    				String[] values = line.split(splitCharactor);
    				if (colNames.size() != values.length)
    				{
    					continue;
    				}
    				Map<String, Object> map = new HashMap<String, Object>();
    				Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet()
    						.iterator();
    				int count = 0;
    				while (iterator.hasNext())
    				{
    					Entry<String, Class<?>> entry = iterator.next();
    					Object value = values[count];
    					if (!String.class.equals(entry.getValue()))
    					{
    						value = entry.getValue().getMethod("valueOf", String.class)
    								.invoke(null, value);
    					}
    					map.put(entry.getKey(), value);
    					count++;
    				}
    				list.add(map);
    			}
    		}
    		catch (IOException e)
    		{
    			LOG.error("File reading line error." + e.toString(), e);
    		}
    		finally
    		{
    			LineIterator.closeQuietly(lineIterator);
    		}
    		return list;
    	}
    }
    public class StreamIntoEs
    {
    
    	public static class ChildThread extends Thread
    	{
    
    		int number;
    
    		public ChildThread(int number)
    		{
    			this.number = number;
    		}
    
    		@Override
    		public void run()
    		{
    			Settings settings = ImmutableSettings.settingsBuilder()
    					.put("client.transport.sniff", true)
    					.put("client.transport.ping_timeout", 100)
    					.put("cluster.name", "elasticsearch").build();
    			TransportClient client = new TransportClient(settings)
    					.addTransportAddress(new InetSocketTransportAddress("192.168.32.228",
    							9300));
    			File dir = new File("/export/home/es/data/" + number);
    			LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>();
    			colNames.put("aa", Long.class);
    			colNames.put("bb", String.class);
    			colNames.put("cc", String.class);
    			colNames.put("dd", Integer.class);
    			colNames.put("ee", Long.class);
    			colNames.put("ff", Long.class);
    			colNames.put("hh", Long.class);
    			int count = 0;
    			long startTime = System.currentTimeMillis();
    			for (File file : dir.listFiles())
    			{
    				int currentCount = 0;
    				long startCurrentTime = System.currentTimeMillis();
    				FileReader reader = new FileReader(file, "\$", colNames);
    				BulkResponse resp = null;
    				<strong>BulkRequestBuilder bulkRequest = client.prepareBulk();</strong>
    				try
    				{
    					List<Map<String, Object>> results = reader.readFile();
    					for (Map<String, Object> col : results)
    					{
    						bulkRequest.add(client.prepareIndex("flux", "fluxdata")
    								.setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime")));
    						count++;
    						currentCount++;
    					}
    					resp = bulkRequest.execute().actionGet();
    				}
    				catch (Exception e)
    				{
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    				long endCurrentTime = System.currentTimeMillis();
    				System.out.println("[thread-" + number + "-]per count:" + currentCount);
    				System.out.println("[thread-" + number + "-]per time:"
    						+ (endCurrentTime - startCurrentTime));
    				System.out.println("[thread-" + number + "-]per count/s:"
    						+ (float) currentCount / (endCurrentTime - startCurrentTime)
    						* 1000);
    				System.out.println("[thread-" + number + "-]per count/s:"
    						+ resp.toString());
    			}
    			long endTime = System.currentTimeMillis();
    			System.out.println("[thread-" + number + "-]total count:" + count);
    			System.out.println("[thread-" + number + "-]total time:"
    					+ (endTime - startTime));
    			System.out.println("[thread-" + number + "-]total count/s:" + (float) count
    					/ (endTime - startTime) * 1000);
    			// IndexRequest request =
    			// = client.index(request);
    		}
    	}
    
    	public static void main(String args[])
    	{
    		for (int i = 0; i < 4; i++)
    		{
    			ChildThread childThread = new ChildThread(i);
    			childThread.start();
    		}
    	}
    }

     起了4个线程来做入库,每个文件解析完成进行一次批处理。

    初始化脚本:

    curl -XDELETE 'http://192.168.32.228:9200/twitter/'
    curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '
    {
         "index" :{
              "number_of_shards" : 5,
              "number_of_replicas ": 0,
              <strong>"index.refresh_interval": "-1",
             "index.translog.flush_threshold_ops": "100000"</strong>
         }
    }'
    curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '
    {
                 "<span style="font-size: 1em; line-height: 1.5;">twiterdata</span><span style="font-size: 1em; line-height: 1.5;">": {</span>
                        "aa" : {"type" : "long", "index" : "not_analyzed"},
                        "bb" : {"type" : "String", "index" : "not_analyzed"},
                        "cc" : {"type" : "String", "index" : "not_analyzed"},
                        "dd" : {"type" : "integer", "index" : "not_analyzed"},
                        "ee" : {"type" : "long", "index" : "no"},
                        "ff" : {"type" : "long", "index" : "no"},
                        "gg" : {"type" : "long", "index" : "no"},
                        "hh" : {"type" : "long", "index" : "no"},
                        "ii" : {"type" : "long", "index" : "no"},
                        "jj" : {"type" : "long", "index" : "no"},
                        "kk" : {"type" : "long", "index" : "no"},
                    }
    }

     执行效率参考:

    不开启refresh_interval
    [test@test001 bin]$ more StreamIntoEs.out|grep total
    [thread-2-]total count:1199411
    [thread-2-]total time:1223718
    [thread-2-]total count/s:980.1368
    [thread-1-]total count:1447214
    [thread-1-]total time:1393528
    [thread-1-]total count/s:1038.5253
    [thread-0-]total count:1508043
    [thread-0-]total time:1430167
    [thread-0-]total count/s:1054.4524
    [thread-3-]total count:1650576
    [thread-3-]total time:1471103
    [thread-3-]total count/s:1121.9989
    4195.1134
    
    开启refresh_interval
    [test@test001 bin]$ more StreamIntoEs.out |grep total
    [thread-2-]total count:1199411
    [thread-2-]total time:996111
    [thread-2-]total count/s:1204.0938
    [thread-1-]total count:1447214
    [thread-1-]total time:1163207
    [thread-1-]total count/s:1244.1586
    [thread-0-]total count:1508043
    [thread-0-]total time:1202682
    [thread-0-]total count/s:1253.9
    [thread-3-]total count:1650576
    [thread-3-]total time:1236239
    [thread-3-]total count/s:1335.1593
    5037.3117
    
    开启refresh_interval  字段类型转换
    [test@test001 bin]$ more StreamIntoEs.out |grep total
    [thread-2-]total count:1199411
    [thread-2-]total time:1065229
    [thread-2-]total count/s:1125.9653
    [thread-1-]total count:1447214
    [thread-1-]total time:1218342
    [thread-1-]total count/s:1187.8552
    [thread-0-]total count:1508043
    [thread-0-]total time:1230474
    [thread-0-]total count/s:1225.5789
    [thread-3-]total count:1650576
    [thread-3-]total time:1274027
    [thread-3-]total count/s:1295.5581
    4834.9575
    
    开启refresh_interval  字段类型转换 设置id
    [thread-2-]total count:1199411
    [thread-2-]total time:912251
    [thread-2-]total count/s:1314.7817
    [thread-1-]total count:1447214
    [thread-1-]total time:1067117
    [thread-1-]total count/s:1356.1906
    [thread-0-]total count:1508043
    [thread-0-]total time:1090577
    [thread-0-]total count/s:1382.7937
    [thread-3-]total count:1650576
    [thread-3-]total time:1128490
    [thread-3-]total count/s:1462.6412
    5516.4072
    

     580M的数据平均用时大概是20分钟。索引文件大约为1.76G

     相关测试结果可以参考这里:

     elasticsearch 性能测试

     

  • 相关阅读:
  • 原文地址:https://www.cnblogs.com/new0801/p/6175978.html
Copyright © 2011-2022 走看看