估计有2年没搞storm了,今天由于工作需要重新搞一次,
不过需要使用最新版本的storm了。使用的是1.2.3版本,
写了程序,报了一个错误,如下
13886 [main] INFO o.a.s.d.s.Supervisor - Starting supervisor with id 87f0e450-46bf-4545-b86f-2a9f961ad24d at host vm1. Exception in thread "main" java.lang.IllegalStateException: Spout 'ThingSpout' contains a non-serializable field of type rexel.topo.ThingSpout$$Lambda$1/109961541, which was instantiated prior to topology creation. rexel.topo.ThingSpout$$Lambda$1/109961541 should be instantiated within the prepare method of 'ThingSpout at the earliest. at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:143) at rexel.topo.MainTopology.main(MainTopology.java:34) Caused by: java.lang.RuntimeException: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541 at org.apache.storm.utils.Utils.javaSerialize(Utils.java:240) at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:138) ... 1 more Caused by: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.storm.utils.Utils.javaSerialize(Utils.java:236) ... 2 more
工程结构如下:
MainTopology代码如下:
package rexel.topo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import rexel.bean.PropertiesBean; import rexel.utils.PropertiesUtils; public class MainTopology { private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance(); public static void main(String[] args) throws Exception { PropertiesBean propertiesBean = propertiesUtils.readProperties(); if (propertiesBean == null) { return; } TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ThingSpout", new ThingSpout(), 1); builder.setBolt("ThingBolt", new ThingBolt(), 1).shuffleGrouping("ThingSpout"); Config config = new Config(); config.setNumWorkers(1); config.setMessageTimeoutSecs(60); config.setMaxSpoutPending(100); config.setNumAckers(1);if (Boolean.valueOf(args[0])) { StormSubmitter.submitTopology("MainTopology", config, builder.createTopology()); } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("MainTopology", config, builder.createTopology()); } } }
ThingSpout代码如下:
package rexel.topo; import java.net.URI; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Hashtable; import java.util.Map; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import org.apache.commons.codec.binary.Base64; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import rexel.bean.PropertiesBean; import rexel.utils.CommonUtils; import rexel.utils.PropertiesUtils; public class ThingSpout extends BaseRichSpout { private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance(); private static final long serialVersionUID = 1L; private static LinkedBlockingQueue<byte[]> queue; private static SpoutOutputCollector collector = null; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { PropertiesBean propertiesBean = propertiesUtils.readProperties(); if (propertiesBean == null) { return; } collector = spoutOutputCollector; queue = new LinkedBlockingQueue<>(1000); //参数说明,请参见:AMQP客户端接入说明。 String accessKey = propertiesBean.getAccessKey(); String accessSecret = propertiesBean.getAccessSecret(); String uid = propertiesBean.getUid(); String regionId = propertiesBean.getRegionId(); String consumerGroupId = "AsgdwvkMT3ygC2IwT9GD000100"; long timeStamp = System.currentTimeMillis(); //签名方法:支持hmacmd5,hmacsha1和hmacsha256 String signMethod = "hmacsha1"; //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。 //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。 String clientId = CommonUtils.getDeviceUnique(); //UserName组装方法,请参见文档:AMQP客户端接入说明。 String userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod + ",timestamp=" + timeStamp + ",authId=" + accessKey + ",consumerGroupId=" + consumerGroupId + "|"; //password组装方法,请参见文档:AMQP客户端接入说明。 String signContent = "authId=" + accessKey + "×tamp=" + timeStamp; String password = doSign(signContent,accessSecret, signMethod); //按照qpid-jms的规范,组装连接URL。 String connectionUrl = "failover:(amqps://" + uid + ".iot-amqp." + regionId + ".aliyuncs.com:5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30"; Hashtable<String, String> hashtable = new Hashtable<>(); hashtable.put("connectionfactory.SBCF",connectionUrl); hashtable.put("queue.QUEUE", "default"); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); try { Context context = new InitialContext(hashtable); ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); Destination queue = (Destination)context.lookup("QUEUE"); // Create Connection Connection connection = cf.createConnection(userName, password); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); // Create Session // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge() // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // Create Receiver Link MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(messageListener); } catch (Exception e) { e.printStackTrace(); } } @Override public void nextTuple() { try { byte[] body = queue.take(); String uuid = UUID.randomUUID().toString(); collector.emit(new Values((Object) body), uuid); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("body")); } private MessageListener messageListener = message -> { try { byte[] body = message.getBody(byte[].class); String content = new String(body); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); System.out.println("receive message" + ", topic = " + topic + ", msgId = " + messageId + ", content = " + content); queue.put(body); } catch (Exception e) { e.printStackTrace(); } }; private JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { /** * 连接成功建立。 */ @Override public void onConnectionEstablished(URI remoteURI) { System.out.println("onConnectionEstablished, remoteUri:{}" + remoteURI + "}"); } /** * 尝试过最大重试次数之后,最终连接失败。 */ @Override public void onConnectionFailure(Throwable error) { System.out.println("onConnectionFailure, {" + error.getMessage() + "}"); } /** * 连接中断。 */ @Override public void onConnectionInterrupted(URI remoteURI) { System.out.println("onConnectionInterrupted, remoteUri:{" + remoteURI + "}"); } /** * 连接中断后又自动重连上。 */ @Override public void onConnectionRestored(URI remoteURI) { System.out.println("onConnectionRestored, remoteUri:{" + remoteURI + "}"); } @Override public void onInboundMessage(JmsInboundMessageDispatch envelope) {} @Override public void onSessionClosed(Session session, Throwable cause) {} @Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} @Override public void onProducerClosed(MessageProducer producer, Throwable cause) {} }; private String doSign(String toSignString, String secret, String signMethod) { SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); byte[] rawHmac = null; try { Mac mac = Mac.getInstance(signMethod); mac.init(signingKey); rawHmac = mac.doFinal(toSignString.getBytes()); } catch (NoSuchAlgorithmException | InvalidKeyException e) { e.printStackTrace(); } return Base64.encodeBase64String(rawHmac); } }
ThingBolt代码如下:
package rexel.topo; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class ThingBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } @Override public void execute(Tuple tuple) { String content = new String(tuple.getBinaryByField("body")); System.out.println("receive message: {" + content + "}"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
错误的原因是在Spout中有没有序列化的代码,将以下两个对象前面加上static之后,问题解决。
private MessageListener messageListener → private static MessageListener messageListener
private JmsConnectionListener myJmsConnectionListener → private static JmsConnectionListener myJmsConnectionListener