zoukankan      html  css  js  c++  java
  • Put queue for MemoryTransaction of capacity 10000 full, consider committing more frequently, increasing capacity or increasing thread count flume capacity 时间数

    package com.test;

    import org.apache.http.*;
    import org.apache.http.entity.ContentType;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.DefaultBHttpClientConnection;
    import org.apache.http.impl.DefaultConnectionReuseStrategy;
    import org.apache.http.message.BasicHttpEntityEnclosingRequest;
    import org.apache.http.protocol.*;

    import java.net.Socket;
    import java.util.UUID;

    public class ElementalHttpPost {
    public static void main(String[] args) throws Exception {
    String strBatch = "[";
    String s = "http://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.javahttp://hc.apache.org/httpcomponents-core-ga/httpcore/examples/org/apache/http/examples/ElementalHttpPost.java";
    try {
    for (int i = 0; i < 30000; i++) {
    System.out.println(i);

    strBatch += ",{"headers":{"timestamp":"434324343","host":"random_host.example.com"},"body":"abc" + i + "---" + System.currentTimeMillis() + "-" + UUID.randomUUID() + "---" + s + ""}";
    }
    strBatch += "]";
    strBatch = strBatch.replace("[,", "[");
    System.out.println(strBatch);
    HttpEntity httpEntity = new StringEntity(strBatch, ContentType.create("application/json", Consts.UTF_8));
    m(httpEntity);
    Thread.sleep(1000);
    System.out.println(Thread.currentThread());
    } finally {
    }
    }

    static void m(HttpEntity httpEntity) throws Exception {
    HttpProcessor processor = HttpProcessorBuilder.create()
    .add(new RequestContent())
    .add(new RequestTargetHost())
    .add(new ResponseConnControl())
    .add(new RequestUserAgent("Test/1.1"))
    .add(new RequestExpectContinue(true))
    .build();
    HttpRequestExecutor requestExecutor = new HttpRequestExecutor();
    HttpCoreContext coreContext = HttpCoreContext.create();
    HttpHost host = new HttpHost("101.201.41.72", 50000);
    coreContext.setTargetHost(host);

    DefaultBHttpClientConnection conn = new DefaultBHttpClientConnection(8 * 10254 * 1024);
    ConnectionReuseStrategy reuseStrategy = DefaultConnectionReuseStrategy.INSTANCE;
    Socket socket = new Socket(host.getHostName(), host.getPort());
    conn.bind(socket);
    try {
    BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/");
    request.setEntity(httpEntity);
    requestExecutor.preProcess(request, processor, coreContext);
    HttpResponse response = requestExecutor.execute(request, conn, coreContext);
    requestExecutor.postProcess(response, processor, coreContext);
    if (!reuseStrategy.keepAlive(response, coreContext)) {
    conn.close();
    } else {
    System.out.println("Connection kept alive...");
    }
    } finally {
    conn.close();
    }
    }
    }

    [root@d log]# ps -aux | grep java
    root 550 0.2 1.9 6000924 641384 pts/1 Sl+ 17:39 0:13 /usr/java/jdk1.8.0_101/bin/java -Xmx20m -Dflume.root.logger=INFO,console -Xms2048m -Xmx2048m -cp /data/UnifiedLog/flume/conf:/data/UnifiedLog/flume/lib/*:/lib/* -Djava.library.path= org.apache.flume.node.Application -f /data/UnifiedLog/flume/conf/httpSourceApp.conf -n a1
    root 11382 0.0 0.0 112712 964 pts/9 S+ 19:07 0:00 grep --color=auto java
    elsearch 22711 19.3 36.3 140956640 11930396 ? S<l Sep18 16185:36 /usr/java/jdk1.8.0_101/bin/java -Xms10g -Xmx10g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true -Des.path.home=/usr/local/elasticsearch-2.4.1 -cp /usr/local/elasticsearch-2.4.1/lib/elasticsearch-2.4.1.jar:/usr/local/elasticsearch-2.4.1/lib/* org.elasticsearch.bootstrap.Elasticsearch start -d
    root 24280 0.0 0.0 122092 1288 ? Sl Sep18 45:21 /usr/local/cloudmonitor/wrapper/bin/./wrapper /usr/local/cloudmonitor/wrapper/bin/../conf/wrapper.conf wrapper.syslog.ident=cloudmonitor wrapper.pidfile=/usr/local/cloudmonitor/wrapper/bin/./cloudmonitor.pid wrapper.daemonize=TRUE wrapper.name=cloudmonitor wrapper.displayname=cloudmonitor wrapper.statusfile=/usr/local/cloudmonitor/wrapper/bin/./cloudmonitor.status wrapper.java.statusfile=/usr/local/cloudmonitor/wrapper/bin/./cloudmonitor.java.status wrapper.lockfile=/var/lock/subsys/cloudmonitor wrapper.script.version=3.5.27
    root 24282 0.5 0.2 2516408 67812 ? Sl Sep18 487:36 /usr/local/cloudmonitor/jre/bin/java -Djava.compiler=none -XX:-UseGCOverheadLimit -XX:NewRatio=1 -XX:SurvivorRatio=8 -XX:+UseSerialGC -Djava.io.tmpdir=../../tmp -Xms16m -Xmx32m -Djava.library.path=../lib:../../lib -classpath ../lib/wrappertest.jar:../lib/wrapper.jar:../../config:../../lib/agent-commons-1.2.11.jar:../../lib/agent-core-1.2.11.jar:../../lib/agent-model-1.2.11.jar:../../lib/aopalliance-1.0.jar:../../lib/commons-logging-1.2.jar:../../lib/commons-net-3.5.jar:../../lib/gson-2.4.jar:../../lib/jvm-plugin-1.2.11.jar:../../lib/log4j-1.2.16.jar:../../lib/metrics-core-3.0.2.jar:../../lib/sigar-1.6.5.132.jar:../../lib/slf4j-api-1.7.5.jar:../../lib/spring-aop-4.2.4.RELEASE.jar:../../lib/spring-beans-4.2.4.RELEASE.jar:../../lib/spring-context-4.2.4.RELEASE.jar:../../lib/spring-core-4.2.4.RELEASE.jar:../../lib/spring-expression-4.2.4.RELEASE.jar:../../lib/system-plugin-1.2.11.jar:../../lib/updater-1.2.11-jar-with-dependencies.jar -Dwrapper.key=vaz0l5Js_x6xfRzG -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.disable_console_input=TRUE -Dwrapper.pid=24280 -Dwrapper.version=3.5.27 -Dwrapper.native_library=wrapper -Dwrapper.arch=x86 -Dwrapper.service=TRUE -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 com.aliyun.tianji.cloudmonitor.Application
    [root@d log]# cat /data/UnifiedLog/flume/conf/httpSourceApp.conf
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1

    a1.sources.r1.type=http
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=50000
    a1.sources.r1.channels=c1

    a1.sinks.k1.channel=c1
    #a1.sinks.k1.type = com.product.FlumeApp
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /data/UnifiedLog/log
    a1.sinks.k1.batchSize=100
    #a1.sinks.k1.pathManager=%y%m%d%H%M%S
    a1.sinks.k1.pathManager=DEFAULT
    a1.sinks.k1.pathManager.extension=log
    a1.sinks.k1.pathManager.prefix=webTrack
    a1.sinks.k1.rollInterval=30
    #sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
    a1.sinks.k1.sink.serializer = text
    #capacity 100 The maximum number of events stored in the channel
    #transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction

    a1.channels.c1.type=memory
    #a1.channels.c1.capacity=1000
    #a1.channels.c1.transactionCapacity=100
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=10000

    [root@d log]#







  • 相关阅读:
    18个功能强大的HTML5 和JavaScript游戏引擎库
    10 个超棒的 jQuery 视频插件
    HTML5播放视频音频
    CSS代码重构与优化之路
    推荐10款web前端的 HTML5 开发框架和开发工具
    JS中日期相关函数
    技术笔记1:java.sql.SQLException: Access denied for user 'root'@'localhost' (using password)
    图解HTTP阅读笔记(1)-网络基础TCP/IP
    框架和设计模式的区别
    SSH与MVC
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9965515.html
Copyright © 2011-2022 走看看