zoukankan      html  css  js  c++  java
  • storm安装

    1. 0安装JDK

      wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.rpm

      使用rpm -ivh jdk-8u45-linux-x64.rpm进行安装

      检查安装Javac

      1:centOS安装ZeroMQ所需组件及工具:

      yum install gcc

      yum install gcc-c++

      yum install make

      yum install uuid-devel

      yum install libuuid-devel

      yum install libtool

    2.  

      wget http://mirror.bjtu.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz

      tar -zxvf zookeeper-3.4.6.tar.gz

      cp -R zookeeper-3.4.6 /usr/local/

      ln -s /usr/local/zookeeper-3.4.6/  /usr/local/zookeeper

      vim /etc/profile

      export ZOOKEEPER_HOME="/path/to/zookeeper" #路径指定,存放日志等文件

      cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

      mkdir /tmp/zookeepermkdir /var/log/zookeeper

    3.  

      安装zeromq以及jzmq:

      wget http://download.zeromq.org/zeromq-2.2.0.tar.gz

      tar zxf zeromq-2.2.0.tar.gz

       cd zeromq-2.2.0

      ./configure

      make

      make install

      sudo ldconfig (更新LD_LIBRARY_PATH)zeromq安装完成。

      安装jzmq: (提前安装好java)

      yum install git

       git clone git://github.com/nathanmarz/jzmq.git

      cd jzmq

      ./autogen.sh

      ./configure

      make

      make install然后,jzmq就装好了.注意:在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。

    4.  

      wget http://cloud.github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
      unzip storm-0.8.1.zip
      mv storm-0.8.1 /usr/local/
      ln -s /usr/local/storm-0.8.1/ /usr/local/storm
      vim /etc/profile
      export STORM_HOME=/usr/local/storm-0.8.1
      export PATH=$PATH:$STORM_HOME/bin
      到此为止单机版的Storm就安装完毕了。

    5. 5
      1. 启动ZOOPKEEPER
        zkServer.sh start
      2. 启动NIMBUS
        storm nimbus &
      3. 启动SUPERVISOR
        storm supervisor &
      4. 启动UI
        storm ui &
      5. 部署TOPOLOGY
        storm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt
      6. 删除TOPOLOGY
        storm kill {toponame}
      7. 激活TOPOLOGY
        storm active {toponame}
      8. 不激活TOPOLOGY
        storm deactive {toponame}
      9. 列出所有TOPOLOGY
        storm list

      再查看进程jps查看UI:在浏览器中输入http://localhost:8080

      6:storm进程远程kill

      1. String topologyName = "topology001";  
      2. boolean kill = false;  
      3. Map conf = Utils.readStormConfig();  
      4. //nimbus服务器地址  
      5. conf.put(Config.NIMBUS_HOST, "172.16.1.100");  
      6. //nimbus thrift地址  
      7. conf.put(Config.NIMBUS_THRIFT_PORT, 6627);  
      8. Nimbus.Client client = NimbusClient.getConfiguredClient(conf).getClient();  
      9. List<TopologySummary> topologyList = client.getClusterInfo().get_topologies();  
      10. for(TopologySummary topologySummary : topologyList) {  
      11.     if(topologySummary.get_name().equals(topologyName)) {  
      12.     KillOptions killOpts = new KillOptions();  
      13. //延迟杀死时间,单位秒  
      14.     killOpts.set_wait_secs(5);  
      15.     client.killTopologyWithOpts(topologyName, killOpts);  
      16.     kill = true;  
      17.     System.out.println("killed " + topologyName);  
      18.     }  
      19. }  
      20. if(!kill)  
      21.     System.out.println(topologyName + " not started");  
      使用ThriftAPI监控Storm集群和Topology
      2015-01-15      0 个评论    来源:xcc的博客  
      收藏    我要投稿

      如要监控Storm集群和运行在其上的Topology,该如何做呢?

      Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。

      整体的流程已经清楚了,下面就来实践吧。

      1 安装Thrift

      由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。

      到官网下载好安装包:http://thrift.apache.org/

      编译安装:configure && make && make install

      验证:thrift --version

      如果打印出Thrift version 0.9.2,代表安装成功。

      2 编译Thrift Client代码

      首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz

      解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift

      在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。

      3 使用Thrift Client API

      然后创建一个Maven项目来进行执行监控数据的获取。

      项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。

      以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。

      创建好后,把gen-java生成的代码拷贝进来。

      在pom.xml里引入Thrift对应版本的库:

      1
      2
      3
      4
      5
      <dependency>
          <groupId>org.apache.thrift</groupId>
          <artifactId>libthrift</artifactId>
          <version>0.9.2</version>
      </dependency>

      首先写一些Thrift相关的辅助类。

      ClientInfo.java

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      package com.damacheng009.storm.monitor.thrift;
       
      import org.apache.thrift.protocol.TBinaryProtocol;
      import org.apache.thrift.transport.TFramedTransport;
      import org.apache.thrift.transport.TSocket;
       
      import backtype.storm.generated.Nimbus;
       
      /**
       * 代表一个Thrift Client的信息
       * @author jb-xingchencheng
       *
       */
      public class ClientInfo {
          private TSocket tsocket;
          private TFramedTransport tTransport;
          private TBinaryProtocol tBinaryProtocol;
          private Nimbus.Client client;
       
          public TSocket getTsocket() {
              return tsocket;
          }
       
          public void setTsocket(TSocket tsocket) {
              this.tsocket = tsocket;
          }
       
          public TFramedTransport gettTransport() {
              return tTransport;
          }
       
          public void settTransport(TFramedTransport tTransport) {
              this.tTransport = tTransport;
          }
       
          public TBinaryProtocol gettBinaryProtocol() {
              return tBinaryProtocol;
          }
       
          public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) {
              this.tBinaryProtocol = tBinaryProtocol;
          }
       
          public Nimbus.Client getClient() {
              return client;
          }
       
          public void setClient(Nimbus.Client client) {
              this.client = client;
          }
      }
      ClientManager.java
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      package com.damacheng009.storm.monitor.thrift;
       
      import org.apache.thrift.protocol.TBinaryProtocol;
      import org.apache.thrift.transport.TFramedTransport;
      import org.apache.thrift.transport.TSocket;
      import org.apache.thrift.transport.TTransportException;
       
      import backtype.storm.generated.Nimbus;
       
      /**
       * Thrift Client管理类
       * @author jb-xingchencheng
       *
       */
      public class ClientManager {
          public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException {
              ClientInfo client = new ClientInfo();
              TSocket tsocket = new TSocket(nimbusHost, nimbusPort);
              TFramedTransport tTransport = new TFramedTransport(tsocket);
              TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
              Nimbus.Client c = new Nimbus.Client(tBinaryProtocol);
              tTransport.open();
              client.setTsocket(tsocket);
              client.settTransport(tTransport);
              client.settBinaryProtocol(tBinaryProtocol);
              client.setClient(c);
               
              return client; 
          }
           
          public static void closeClient(ClientInfo client) {
              if (null == client) {
                  return;
              }
               
              if (null != client.gettTransport()) {
                  client.gettTransport().close();
              }
               
              if (null != client.getTsocket()) {
                  client.getTsocket().close();
              }
          }
      }
      然后就可以写自己的逻辑去获取集群和拓扑的数据了,Storm提供的UI界面上展示的数据基本都可以获取到,这里只举出一个简单的例子,我们想获得某个拓扑发生异常的次数,和发生的异常的堆栈。剩下的项目你可以随意的定制。

      下面是入口类:

      Main.java

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      package com.damacheng009.storm.monitor;
       
      import com.damacheng009.storm.monitor.logic.Logic;
       
      /**
       * 入口类
       * @author jb-xingchencheng
       *
       */
      public class Main {
          // NIMBUS的信息
          public static String NIMBUS_HOST = "192.168.180.36";
          public static int NIMBUS_PORT = 6627;
       
          /**
           * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数)
           * @param args
           */
          public static void main(String[] args) {
              if (args.length < 3) {
                  return;
              }
               
              NIMBUS_HOST = args[0];
              NIMBUS_PORT = Integer.parseInt(args[1]);
               
              String cmd = args[2];
              String result = "-1";
              if (cmd.equals("get_topo_exp_size")) {
                  String topoName = args[3];
                  result = Logic.getTopoExpSize(topoName);
              } else if (cmd.equals("get_topo_exp_stack_trace")) {
                  String topoName = args[3];
                  result = Logic.getTopoExpStackTrace(topoName);
              }
               
              System.out.println(result);
          }
      }

      测试的时候把具体的HOST和PORT改一下即可。

      然后是具体的逻辑类。

      Logic.java

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      package com.damacheng009.storm.monitor.logic;
       
      import java.util.Date;
      import java.util.List;
      import java.util.Set;
       
      import com.damacheng009.storm.monitor.Main;
      import com.damacheng009.storm.monitor.thrift.ClientInfo;
      import com.damacheng009.storm.monitor.thrift.ClientManager;
       
      import backtype.storm.generated.ClusterSummary;
      import backtype.storm.generated.ErrorInfo;
      import backtype.storm.generated.TopologyInfo;
      import backtype.storm.generated.TopologySummary;
       
      public class Logic {
          /**
           * 取得某个拓扑的异常个数
           * @param topoName
           * @return
           */
          public static String getTopoExpSize(String topoName) {
              ClientInfo client = null;
              int errorTotal = 0;
               
              try {
                  client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
                   
                  ClusterSummary clusterSummary = client.getClient().getClusterInfo();
                  List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
                  for (TopologySummary ts : topoSummaryList) {
                      if (ts.getName().equals(topoName)) {
                          TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
                          Set<String> errorKeySet = topologyInfo.getErrors().keySet();
                          for (String errorKey : errorKeySet) {
                              List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
                              errorTotal += listErrorInfo.size();
                          }
                          break;
                      }
                  }
                   
                  return String.valueOf(errorTotal);
              } catch (Exception e) {
                  return "-1";
              } finally {
                  ClientManager.closeClient(client);
              }  
          }
       
          /**
           * 返回某个拓扑的异常堆栈
           * @param topoName
           * @return
           */
          public static String getTopoExpStackTrace(String topoName) {
              ClientInfo client = null;
              StringBuilder error = new StringBuilder();
               
              try {
                  client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
                   
                  ClusterSummary clusterSummary = client.getClient().getClusterInfo();
                  List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
                  for (TopologySummary ts : topoSummaryList) {
                      if (ts.getName().equals(topoName)) {
                          TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
                          // 得到错误信息
                          Set<String> errorKeySet = topologyInfo.getErrors().keySet();
                          for (String errorKey : errorKeySet) {
                              List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
                              for (ErrorInfo ei : listErrorInfo) {
                                  // 发生异常的时间
                                  long expTime = (long) ei.getError_time_secs() * 1000;
                                  // 现在的时间
                                  long now = System.currentTimeMillis();
                                   
                                  // 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定
                                  // 如果超过5min,那么就不用记录了,因为5min检查一次
                                  if (now - expTime > 1000 * 60 * 5) {
                                      continue;
                                  }
                                   
                                  error.append(new Date(expTime) + " ");
                                  error.append(ei.getError() + " ");
                              }
                          }
                           
                          break;
                      }
                  }
                   
                  return error.toString().isEmpty() ? "none" : error.toString();
              } catch (Exception e) {
                  return "-1";
              } finally {
                  ClientManager.closeClient(client);
              }
          }
      }

      最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。

      接下来的测试过程先略过。

      对于Storm监控的实践,目前就是这样了。

  • 相关阅读:
    Oracle 中用 update 语句更新timestamp字段的格式
    Oracle 获取本周、本月、本季、本年的第一天和最后一天
    Linux服务器下,java程序上传文件,中文名乱码或显示问号的解决办法
    Java实现 Oracle decode函数 转换为 MySQL 可用的 case when
    C# Ling to Sql 几种模糊查询
    机器学习学习笔记:sklearn.preprocessing.PolynomialFeatures偏置值inlude_bias设置,以及在Pipeline中的设置
    SQL Server更新表(用一张表的数据更新另一张表的数据)
    windows server 2012 R2里IIS配置.net core2.1遇到的坑
    combobox控件重新绑定后会出现下拉后显示值不变
    List<>使用之坑
  • 原文地址:https://www.cnblogs.com/leo3689/p/5158138.html
Copyright © 2011-2022 走看看