-
0安装JDK
wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.rpm
使用rpm -ivh jdk-8u45-linux-x64.rpm进行安装
检查安装Javac
1:centOS安装ZeroMQ所需组件及工具:
yum install gcc
yum install gcc-c++
yum install make
yum install uuid-devel
yum install libuuid-devel
yum install libtool
-
wget http://mirror.bjtu.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
cp -R zookeeper-3.4.6 /usr/local/
ln -s /usr/local/zookeeper-3.4.6/ /usr/local/zookeeper
vim /etc/profile
export ZOOKEEPER_HOME="/path/to/zookeeper" #路径指定,存放日志等文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
mkdir /tmp/zookeepermkdir /var/log/zookeeper
-
安装zeromq以及jzmq:
wget http://download.zeromq.org/zeromq-2.2.0.tar.gz
tar zxf zeromq-2.2.0.tar.gz
cd zeromq-2.2.0
./configure
make
make install
sudo ldconfig (更新LD_LIBRARY_PATH)zeromq安装完成。
安装jzmq: (提前安装好java)
yum install git
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install然后,jzmq就装好了.注意:在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。
-
wget http://cloud.github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
mv storm-0.8.1 /usr/local/
ln -s /usr/local/storm-0.8.1/ /usr/local/storm
vim /etc/profile
export STORM_HOME=/usr/local/storm-0.8.1
export PATH=$PATH:$STORM_HOME/bin
到此为止单机版的Storm就安装完毕了。 -
- 启动ZOOPKEEPER
zkServer.sh start - 启动NIMBUS
storm nimbus & - 启动SUPERVISOR
storm supervisor & - 启动UI
storm ui & - 部署TOPOLOGY
storm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt - 删除TOPOLOGY
storm kill {toponame} - 激活TOPOLOGY
storm active {toponame} - 不激活TOPOLOGY
storm deactive {toponame} - 列出所有TOPOLOGY
storm list
再查看进程jps查看UI:在浏览器中输入http://localhost:8080
6:storm进程远程kill
- String topologyName = "topology001";
- boolean kill = false;
- Map conf = Utils.readStormConfig();
- //nimbus服务器地址
- conf.put(Config.NIMBUS_HOST, "172.16.1.100");
- //nimbus thrift地址
- conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
- Nimbus.Client client = NimbusClient.getConfiguredClient(conf).getClient();
- List<TopologySummary> topologyList = client.getClusterInfo().get_topologies();
- for(TopologySummary topologySummary : topologyList) {
- if(topologySummary.get_name().equals(topologyName)) {
- KillOptions killOpts = new KillOptions();
- //延迟杀死时间,单位秒
- killOpts.set_wait_secs(5);
- client.killTopologyWithOpts(topologyName, killOpts);
- kill = true;
- System.out.println("killed " + topologyName);
- }
- }
- if(!kill)
- System.out.println(topologyName + " not started");
- 使用ThriftAPI监控Storm集群和Topology
-
如要监控Storm集群和运行在其上的Topology,该如何做呢?
Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。
整体的流程已经清楚了,下面就来实践吧。
1 安装Thrift
由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。
到官网下载好安装包:http://thrift.apache.org/
编译安装:configure && make && make install
验证:thrift --version
如果打印出Thrift version 0.9.2,代表安装成功。
2 编译Thrift Client代码
首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz
解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift
在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。
3 使用Thrift Client API
然后创建一个Maven项目来进行执行监控数据的获取。
项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。
以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。
创建好后,把gen-java生成的代码拷贝进来。
在pom.xml里引入Thrift对应版本的库:
12345<dependency><groupId>org.apache.thrift</groupId><artifactId>libthrift</artifactId><version>0.9.2</version></dependency>首先写一些Thrift相关的辅助类。
ClientInfo.java
ClientManager.java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051packagecom.damacheng009.storm.monitor.thrift;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TSocket;importbacktype.storm.generated.Nimbus;/*** 代表一个Thrift Client的信息* @author jb-xingchencheng**/publicclassClientInfo {privateTSocket tsocket;privateTFramedTransport tTransport;privateTBinaryProtocol tBinaryProtocol;privateNimbus.Client client;publicTSocket getTsocket() {returntsocket;}publicvoidsetTsocket(TSocket tsocket) {this.tsocket = tsocket;}publicTFramedTransport gettTransport() {returntTransport;}publicvoidsettTransport(TFramedTransport tTransport) {this.tTransport = tTransport;}publicTBinaryProtocol gettBinaryProtocol() {returntBinaryProtocol;}publicvoidsettBinaryProtocol(TBinaryProtocol tBinaryProtocol) {this.tBinaryProtocol = tBinaryProtocol;}publicNimbus.Client getClient() {returnclient;}publicvoidsetClient(Nimbus.Client client) {this.client = client;}}
然后就可以写自己的逻辑去获取集群和拓扑的数据了,Storm提供的UI界面上展示的数据基本都可以获取到,这里只举出一个简单的例子,我们想获得某个拓扑发生异常的次数,和发生的异常的堆栈。剩下的项目你可以随意的定制。1234567891011121314151617181920212223242526272829303132333435363738394041424344packagecom.damacheng009.storm.monitor.thrift;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransportException;importbacktype.storm.generated.Nimbus;/*** Thrift Client管理类* @author jb-xingchencheng**/publicclassClientManager {publicstaticClientInfo getClient(String nimbusHost,intnimbusPort)throwsTTransportException {ClientInfo client =newClientInfo();TSocket tsocket =newTSocket(nimbusHost, nimbusPort);TFramedTransport tTransport =newTFramedTransport(tsocket);TBinaryProtocol tBinaryProtocol =newTBinaryProtocol(tTransport);Nimbus.Client c =newNimbus.Client(tBinaryProtocol);tTransport.open();client.setTsocket(tsocket);client.settTransport(tTransport);client.settBinaryProtocol(tBinaryProtocol);client.setClient(c);returnclient;}publicstaticvoidcloseClient(ClientInfo client) {if(null== client) {return;}if(null!= client.gettTransport()) {client.gettTransport().close();}if(null!= client.getTsocket()) {client.getTsocket().close();}}}下面是入口类:
Main.java
123456789101112131415161718192021222324252627282930313233343536373839packagecom.damacheng009.storm.monitor;importcom.damacheng009.storm.monitor.logic.Logic;/*** 入口类* @author jb-xingchencheng**/publicclassMain {// NIMBUS的信息publicstaticString NIMBUS_HOST ="192.168.180.36";publicstaticintNIMBUS_PORT =6627;/*** 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数)* @param args*/publicstaticvoidmain(String[] args) {if(args.length <3) {return;}NIMBUS_HOST = args[0];NIMBUS_PORT = Integer.parseInt(args[1]);String cmd = args[2];String result ="-1";if(cmd.equals("get_topo_exp_size")) {String topoName = args[3];result = Logic.getTopoExpSize(topoName);}elseif(cmd.equals("get_topo_exp_stack_trace")) {String topoName = args[3];result = Logic.getTopoExpStackTrace(topoName);}System.out.println(result);}}测试的时候把具体的HOST和PORT改一下即可。
然后是具体的逻辑类。Logic.java
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100packagecom.damacheng009.storm.monitor.logic;importjava.util.Date;importjava.util.List;importjava.util.Set;importcom.damacheng009.storm.monitor.Main;importcom.damacheng009.storm.monitor.thrift.ClientInfo;importcom.damacheng009.storm.monitor.thrift.ClientManager;importbacktype.storm.generated.ClusterSummary;importbacktype.storm.generated.ErrorInfo;importbacktype.storm.generated.TopologyInfo;importbacktype.storm.generated.TopologySummary;publicclassLogic {/*** 取得某个拓扑的异常个数* @param topoName* @return*/publicstaticString getTopoExpSize(String topoName) {ClientInfo client =null;interrorTotal =0;try{client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);ClusterSummary clusterSummary = client.getClient().getClusterInfo();List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();for(TopologySummary ts : topoSummaryList) {if(ts.getName().equals(topoName)) {TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());Set<String> errorKeySet = topologyInfo.getErrors().keySet();for(String errorKey : errorKeySet) {List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);errorTotal += listErrorInfo.size();}break;}}returnString.valueOf(errorTotal);}catch(Exception e) {return"-1";}finally{ClientManager.closeClient(client);}}/*** 返回某个拓扑的异常堆栈* @param topoName* @return*/publicstaticString getTopoExpStackTrace(String topoName) {ClientInfo client =null;StringBuilder error =newStringBuilder();try{client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);ClusterSummary clusterSummary = client.getClient().getClusterInfo();List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();for(TopologySummary ts : topoSummaryList) {if(ts.getName().equals(topoName)) {TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());// 得到错误信息Set<String> errorKeySet = topologyInfo.getErrors().keySet();for(String errorKey : errorKeySet) {List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);for(ErrorInfo ei : listErrorInfo) {// 发生异常的时间longexpTime = (long) ei.getError_time_secs() *1000;// 现在的时间longnow = System.currentTimeMillis();// 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定// 如果超过5min,那么就不用记录了,因为5min检查一次if(now - expTime >1000*60*5) {continue;}error.append(newDate(expTime) +" ");error.append(ei.getError() +" ");}}break;}}returnerror.toString().isEmpty() ?"none": error.toString();}catch(Exception e) {return"-1";}finally{ClientManager.closeClient(client);}}}最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。
接下来的测试过程先略过。
对于Storm监控的实践,目前就是这样了。
- 启动ZOOPKEEPER
我要投稿