zoukankan      html  css  js  c++  java
  • 框架安装

    kafka

    下载
    http://kafka.apache.org/downloads.html
    解压
    tar -zxvf kafka_2.10-0.8.1.1.tgz
    启动服务
    首先启动zookeeper服务
    bin/zookeeper-server-start.sh config/zookeeper.properties
    启动Kafka
    bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
    创建topic
    创建一个"test"的topic,一个分区一个副本
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    查看主题
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    查看主题详情
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    删除主题
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

    创建生产者 producer
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    创建消费者 consumer
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class TestProducer {
    
    	public static void main(String[] args) {
    		Properties originalProps = new Properties();
    		//broker
    		originalProps.put("metadata.broker.list", "192.168.1.111:9092");
    		//把数据序列化到broker
    		originalProps.put("serializer.class", "kafka.serializer.StringEncoder");
    		originalProps.put("request.required.acks", "1");
    		Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(originalProps ));
    		for(int j = 0; j < 100; j++) {
    			producer.send(new KeyedMessage<String, String>("testkafka", null, j+"kafka"));
    		}
    		producer.close();
    	}
    package testkafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    public class TestConsumer {
    	public static void main(String[] args) {
    		Properties originalProps = new Properties();
    		originalProps.put("zookeeper.connect", "192.168.1.111:2181");
    		originalProps.put("group.id", "234");
    		originalProps.put("serializer.class", "kafka.serializer.StringEncoder");
    		ConsumerConnector consumer = Consumer
    				.createJavaConsumerConnector(new ConsumerConfig(originalProps));
    
    		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    		topicCountMap.put("testkafka", 1);
    		StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    		StringDecoder valueDecoder = new StringDecoder(
    				new VerifiableProperties());
    		Map<String, List<KafkaStream<String, String>>> topicMessageStreams = consumer
    				.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    		KafkaStream<String, String> kafkaStream = topicMessageStreams.get(
    				"testkafka").get(0);
    		ConsumerIterator<String, String> iterator = kafkaStream.iterator();
    		while (iterator.hasNext()) {
    			MessageAndMetadata<String, String> next = iterator.next();
    			System.out.println(next.message());
    		}
    	}
    }
    }

      

    flume

    下载上传解压

    $ cp conf/flume-conf.properties.template conf/flume.conf
    $ cp conf/flume-env.sh.template conf/flume-env.sh
    配置
    JAVA_HOME
    启动
    bin/flume-ng agent-conf -f ./conf/agent1.conf -n agent1 -Dflume.root.logger=DEBUG,console

    启动客户端
    $ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F ~/.bashrc

    ——————————————————————————————————————————————————————————

    storm安装部署

    网址: http://www.cnblogs.com/panfeng412/archive/2012/11/30/how-to-install-and-deploy-storm-cluster.html

    2.4 修改storm.yaml配置文件

    Storm发行版本解压目录下有一个conf/storm.yaml文件,用于配置Storm。默认配置在这里可以查看。conf/storm.yaml中的配置选项将覆盖defaults.yaml中的默认配置。以下配置选项是必须在conf/storm.yaml中进行配置的:

    1) storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址,其格式如下:

    storm.zookeeper.servers:
      - "111.222.333.444"
      - "555.666.777.888"

    如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项

    2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:

    storm.local.dir: "/home/admin/storm/workdir"

    3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为"/usr/local/lib:/opt/local/lib:/usr/lib",一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。

    4) nimbus.host: Storm集群Nimbus机器地址,各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件,如:

    nimbus.host: "111.222.333.444"

    5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:

    supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703

    2.5 启动Storm各个后台进程

    最后一步,启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Storm才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Supervisors被重启,运行中的Topologies不会受到影响。

    以下是启动Storm各个后台进程的方式:

    1. Nimbus: 在Storm主控节点上运行"bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行;
    2. Supervisor: 在Storm各个工作节点上运行"bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;
    3. UI: 在Storm主控节点上运行"bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。

    注意事项:

    1. Storm后台进程被启动后,将在Storm安装部署目录下的logs/子目录下生成各个进程的日志文件。
    2. 经测试,Storm UI必须和Storm Nimbus部署在同一台机器上,否则UI无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。
    3. 为了方便使用,可以将bin/storm加入到系统环境变量中。

    至此,Storm集群已经部署、配置完毕,可以向集群提交拓扑运行了。

    bin/storm nimbus >/dev/null 2>&1 &

    bin/storm supervisor >/dev/null 2>&1 &

    bin/storm ui >/dev/null 2>&1 &

    3. 向集群提交任务

    1)启动Storm Topology:

    storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

    其中,allmycode.jar是包含Topology实现代码的jar包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3为org.me.MyTopology执行时需要传入的参数。

    2)停止Storm Topology:

    storm kill {toponame}

    其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称。

    4. 参考资料

    1. https://github.com/nathanmarz/storm/wiki/Tutorial

    2. https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster

  • 相关阅读:
    HDU 1677
    HDU 1672 Cuckoo Hashing
    HDU 2586 + HDU 4912 最近公共祖先
    最大流 Dinic + Sap 模板
    网络流算法小结(转)
    Malformed network data报错解决方法
    js window.open 参数设置
    java图片高质量缩放类
    struts2 I18n问题 国际化
    java.lang.Exception: Socket bind failed 服务器端口冲突-->修改端口
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5605568.html
Copyright © 2011-2022 走看看