zoukankan      html  css  js  c++  java
  • storm1.1.0 drpc 部署和调用测试

    一、配置集群storm.yaml文件,配置drpc.server

     

    二、部署到linux上,开启nimbus,drpc,supervisor 等服务 

    /opt/module/storm-1.1.0/bin/storm nimbus &

    /opt/module/storm-1.1.0/bin/storm drpc &

    /opt/module/storm-1.1.0/bin/storm supervisor &

    /opt/module/storm-1.1.0/bin/storm ui &

    三、编写DrpcTopology程序。如下:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.eyecool.framework.olive.compute;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.LocalDRPC;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    import com.eyecool.framework.olive.compute.bolt.ExclamationBolt;
    
    public class XFaceTopologyTest {
    
        public static void main(String[] args) throws Exception {
            run(args);
        }
    
        public static String spout_name = "raw-spout";
    
        protected static int run(String[] args) throws Exception {
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("lookup");
            builder.addBolt(new ExclamationBolt(),3);
            Config conf = new Config();
            conf.setDebug(false);
            conf.setNumWorkers(1);
            if (args != null && args.length > 0) {
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
            } else {
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("drpc-XFace", conf, builder.createLocalTopology(drpc));
                System.out.println("Results for 'hello':" + drpc.execute("lookup", "hello"));
                System.out.println("Results for 'hello':" + drpc.execute("lookup", "hello12"));
    
                Utils.sleep(1000000000);
                cluster.killTopology("drpc-XFace");
                cluster.shutdown();
            }
            return 0;
        }
    }
    View Code
    package com.eyecool.framework.olive.compute.bolt;
    
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class ExclamationBolt extends BaseBasicBolt {
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "return-info"));
        }
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            Object arg = tuple.getValue(0);
            String retInfo = tuple.getString(1);
            System.out.println("v0: "+arg +" v1: "+retInfo);
            collector.emit(new Values(arg, retInfo + "!!!"));
        }
    
    }
    View Code

    四、提交执行任务

    /opt/module/storm-1.1.0/bin/storm jar olive-computeservice-storm-drpc-1.0.0-jar-with-dependencies.jar com.eyecool.framework.olive.compute.XFaceTopologyTest olive

    五、页面可以看到系统成功运行

    六、客户端代码调用测试,常遇到以下错误:

    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:108)
    at org.apache.storm.security.auth.ThriftClient.<init>(ThriftClient.java:69)
    at org.apache.storm.utils.DRPCClient.<init>(DRPCClient.java:44)
    at org.apache.storm.utils.DRPCClient.<init>(DRPCClient.java:39)
    at ClientTest.main(ClientTest.java:16)
    Caused by: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.storm.security.auth.AuthUtils.GetTransportPlugin(AuthUtils.java:267)
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:89)
    ... 4 more
    Caused by: java.lang.NullPointerException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.storm.security.auth.AuthUtils.GetTransportPlugin(AuthUtils.java:263)
    ... 5 more

    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.utils.DRPCClient;
    import org.apache.storm.utils.Utils;
    
    public class ClientTest {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            try {
                Config conf = new Config();
                conf.setDebug(false);
                Map config = Utils.readDefaultConfig();
                @SuppressWarnings("resource")
                DRPCClient client = new DRPCClient(conf, "192.168.0.188", 3772);// drpc
                String result = client.execute("lookup", "hello world ");// 调用drpcTest函数,传递参数为hello
            
                System.out.println(result);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    View Code

    正确的读取storm yaml 默认配置文件

    Map config = Utils.readDefaultConfig();

     

     就这么简单,完成!!!

  • 相关阅读:
    js获取Session问题 dodo
    复制一个datatable的指定行到另外一个datatable dodo
    sqlserver数据库备份与还原语句 dodo
    net软件测试实战技术大全 dodo
    AJAX 浏览器支持 dodo
    使用 vs2005进行负载测试 dodo
    sql使用in批量删除 dodo
    各种浏览器兼容存在的方法:Xenocode Browser Sandbox dodo
    C#调用Windows API函数 dodo
    ewebeditor在ie8下报错 dodo
  • 原文地址:https://www.cnblogs.com/eyecool/p/7264974.html
Copyright © 2011-2022 走看看