zoukankan      html  css  js  c++  java
  • flume与Mosquitto的集成

    文章来自:http://www.cnblogs.com/hark0623/p/4173714.html   转发请注明 

    因业务需求,需要flume收集MQTT(Mosquitto)的数据。  方法就是flume自定义source,source中来订阅(subscribe)MQTT

    flume source的java代码如下:

    package com.yhx.sensor.flume.source;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDrivenSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.source.AbstractSource;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    
    public class MQTTSource extends AbstractSource implements EventDrivenSource,
            Configurable {
        /**
         * The initialization method for the Source. The context contains all the
         * Flume configuration info, and can be used to retrieve any configuration
         * values necessary to set up the Source.
         */
        @Override
        public void configure(Context arg0) {
            // TODO Auto-generated method stub
    
        }
    
        SimpleMqttClient client = null;
    
        /**
         * Start any dependent systems and begin processing events.
         */
        @Override
        public void start() {
            // TODO Auto-generated method stub
            // super.start();
            client = new SimpleMqttClient();
            client.runClient();
        }
    
        /**
         * Stop processing events and shut any dependent systems down.
         */
        @Override
        public void stop() {
            // TODO Auto-generated method stub
            // super.stop();
            if (client != null) {
                client.closeConn();
            }
        }
    
        // public static void main(String[] args) {
        // SimpleMqttClient smc = new SimpleMqttClient();
        // smc.runClient();
        // }
    
        public class SimpleMqttClient implements MqttCallback {
    
            MqttClient myClient;
            MqttConnectOptions connOpt;
    
            String BROKER_URL = "tcp://192.168.116.128:1883";
            String M2MIO_DOMAIN = "192.168.116.128";
            String M2MIO_STUFF = "yhx";
            String M2MIO_THING = "yhx_flume";
            // String M2MIO_USERNAME = "<m2m.io username>";
            // String M2MIO_PASSWORD_MD5 =
            // "<m2m.io password (MD5 sum of password)>";
    
            Boolean subscriber = true;
            Boolean publisher = false;
    
            /**
             * 
             * connectionLost This callback is invoked upon losing the MQTT
             * connection.
             * 
             */
            @Override
            public void connectionLost(Throwable t) {
                System.out.println("Connection lost!");
                // code to reconnect to the broker would go here if desired
            }
    
            public void closeConn() {
                if (myClient != null) {
                    if (myClient.isConnected()) {
                        try {
                            myClient.disconnect();
                        } catch (MqttException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
    
            /**
             * 
             * deliveryComplete This callback is invoked when a message published by
             * this client is successfully received by the broker.
             * 
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // System.out.println("Pub complete" + new
                // String(token.getMessage().getPayload()));
            }
    
            /**
             * 
             * messageArrived This callback is invoked when a message is received on
             * a subscribed topic.
             * 
             */
            @Override
            public void messageArrived(String topic, MqttMessage message)
                    throws Exception {
                // System.out
                // .println("-------------------------------------------------");
                // // System.out.println("| Topic:" + topic.getName());
                // System.out.println("| Topic:" + topic);
                // System.out
                // .println("| Message: " + new String(message.getPayload()));
                // System.out
                // .println("-------------------------------------------------");
    
    
                Map<String, String> headers = new HashMap<String, String>();
                //headers.put("curDate", df.format(new Date()));
    
                Event flumeEvent = EventBuilder.withBody(message.getPayload(),
                        headers);
                try {
                    getChannelProcessor().processEvent(flumeEvent);
                } catch (Exception e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
    
            }
    
            /**
             * 
             * runClient The main functionality of this simple example. Create a
             * MQTT client, connect to broker, pub/sub, disconnect.
             * 
             */
            public void runClient() {
                // setup MQTT Client
                String clientID = M2MIO_THING;
                connOpt = new MqttConnectOptions();
    
                connOpt.setCleanSession(true);
                connOpt.setKeepAliveInterval(3000);
                // connOpt.setUserName(M2MIO_USERNAME);
                // connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());
    
                // Connect to Broker
                try {
                    myClient = new MqttClient(BROKER_URL, clientID);
                    myClient.setCallback(this);
                    myClient.connect(connOpt);
                } catch (MqttException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
    
                System.out.println("Connected to " + BROKER_URL);
    
                // setup topic
                // topics on m2m.io are in the form <domain>/<stuff>/<thing>
                String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"
                        + M2MIO_THING;
                System.out.println("myTopic:" + myTopic);
                MqttTopic topic = myClient.getTopic(myTopic);
    
                // subscribe to topic if subscriber
                if (subscriber) {
                    try {
                        int subQoS = 0;
                        myClient.subscribe(myTopic, subQoS);
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                // publish messages if publisher
                if (publisher) {
                    for (int i = 1; i <= 10; i++) {
                        String pubMsg = "{"pubmsg":" + i + "}";
                        int pubQoS = 0;
                        MqttMessage message = new MqttMessage(pubMsg.getBytes());
                        message.setQos(pubQoS);
                        message.setRetained(false);
    
                        // Publish the message
                        System.out.println("Publishing to topic "" + topic
                                + "" qos " + pubQoS);
                        MqttDeliveryToken token = null;
                        try {
                            // publish message to broker
                            token = topic.publish(message);
                            // Wait until the message has been delivered to the
                            // broker
                            token.waitForCompletion();
                            Thread.sleep(100);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
    
                // disconnect
                try {
                    // wait to ensure subscribed messages are delivered
                    if (subscriber) {
                        while (true) {
                            Thread.sleep(5000);
                        }
                    }
                    // myClient.disconnect();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                }
            }
    
        }
    
    }

    打JAR包注意要把Class-Path写上,如下:

    Manifest-Version: 1.0
    Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar

    将打好的JAR包放到flume的lib目录(注意,class-path说明的jar包在lib一定要有。 如果没有,则放上去)

    接着修改一下flume的配置文件,如下(主要是sourceMqtt ,看这个。  因为我这块同时还监听了UDP):

    a1.sources = sourceMqtt sourceUdp
    a1.sinks = sinkMqtt sinkUdp
    a1.channels = channelMqtt channelUdp
    
    # Describe/configure the source
    a1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource
    
    # Describe the sink
    a1.sinks.sinkMqtt.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.channelMqtt.type = memory
    a1.channels.channelMqtt.capacity = 1000
    a1.channels.channelMqtt.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.sourceMqtt.channels = channelMqtt
    a1.sinks.sinkMqtt.channel = channelMqtt
    
    
    
    # a2.sources = sourceUdp
    # a2.sinks = sinkUdp
    # a2.channels = channelUdp
    
    # Describe/configure the source
    a1.sources.sourceUdp.type = syslogudp
    a1.sources.sourceUdp.host = 0.0.0.0
    a1.sources.sourceUdp.port = 12459
    a1.sources.sourceUdp.interceptors=interceptorUdp
    
    a1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder
    
    # Describe the sink
    a1.sinks.sinkUdp.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.channelUdp.type = memory
    a1.channels.channelUdp.capacity = 1000
    a1.channels.channelUdp.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.sourceUdp.channels = channelUdp
    a1.sinks.sinkUdp.channel = channelUdp

    配置文件保存至flume目录下的conf,叫flume.conf

    然后flume启动命令如下 

    bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1
  • 相关阅读:
    〖Linux〗zigbee实验之cc2430移植tinyos2.x的步骤(Ubuntu13.10)
    〖Linux〗clang3.4的编译与安装
    〖Linux〗打开qtcreater出现错误的解决方法
    〖Linux〗gvim使用alt+1,2,3..进行标签页切换
    〖Linux〗Ubuntu13.10 安装qt开发环境
    c#, 输出二进制
    unity shader在小米2s上的问题
    unity, animtion倒放
    反射矩阵(reflection matrix)推导
    unity, 用unity profiler进行真机profile,需要退出360
  • 原文地址:https://www.cnblogs.com/hark0623/p/4173714.html
Copyright © 2011-2022 走看看