示例代码 在这可以下载
好了 上面即是qpid的参考手册 很多有用的资料都在上面结果链接里
还是看看 示例代码吧 HelloWorld开始
package com.undergrowth.qpid; import java.io.InputStream; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class QpidHelloWorld { public static void main(String[] args) { QpidHelloWorld hello = new QpidHelloWorld(); hello.runTest(); } private void runTest() { try { InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties"); Properties properties = new Properties(); properties.load(resourceAsStream); //使用配置文件创建JNDI的上下文 这里指的是PropertiesFileInitialContextFactory Context context = new InitialContext(properties); //从JNDI中获取连接工厂 qpidConnectionfactory ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory"); //使用连接工厂创建连接 amqp://guest:guest@test/?brokerlist='tcp://localhost:5672' //格式如下 amqp://[<user>:<pass>@][<clientid>]<virtualhost>[?<option>='<value>'[&<option>='<value>']] //brokerlist的格式 如下 brokerlist=<transport>://<host>[:<port>](?<param>='<value>')(&<param>='<value>')* Connection connection = connectionFactory.createConnection(); connection.start(); //在连接内创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //从JNDI中获取目的地址 这里的目的地址是 amq.topic //This exchange type is used to support the classic publish/subscribe paradigm. //amp.topic 类似于 jms的 发布/订阅 模式 Destination destination = (Destination) context.lookup("topicExchange"); //从会话中产生生产者与消费者 MessageProducer messageProducer = session.createProducer(destination); MessageConsumer messageConsumer = session.createConsumer(destination); //产生文本消息 TextMessage message = session.createTextMessage("Hello world!"); //发送文本消息 messageProducer.send(message); //接收消息 //This call blocks indefinitely until a message is produced or until this message consumer is closed. message = (TextMessage)messageConsumer.receive(); System.out.println(message.getText()); //关闭资源连接 connection.close(); context.close(); } catch (Exception exp) { exp.printStackTrace(); } } }
配置文件 hello.properties
# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] connectionfactory.qpidConnectionfactory = amqp://guest:guest@test/?brokerlist='tcp://localhost:5672' # Register an AMQP destination in JNDI # destination.[jniName] = [Address Format] destination.topicExchange = amq.topic
上面的代码 都加了必要的注释 就不多少了 详情在上面的参考网址上都可以找到
当然 上面的代码运行时 需要先运行qpid-server 脚本 启动qpid代理才行
启动完 qpid-server 后 控制台即可访问 http://localhost:8080/management
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package com.undergrowth.qpid; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; public class MapSender { public static void main(String[] args) throws Exception { Connection connection = new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //create: always 表示如果队列不存在 则创建 Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); MessageProducer producer = session.createProducer(queue); MapMessage m = session.createMapMessage(); m.setIntProperty("Id", 987654321); m.setStringProperty("name", "Widget"); m.setDoubleProperty("price", 0.99); List<String> colors = new ArrayList<String>(); colors.add("red"); colors.add("green"); colors.add("white"); m.setObject("colours", colors); Map<String,Double> dimensions = new HashMap<String,Double>(); dimensions.put("length",10.2); dimensions.put("width",5.1); dimensions.put("depth",2.0); m.setObject("dimensions",dimensions); List<List<Integer>> parts = new ArrayList<List<Integer>>(); parts.add(Arrays.asList(new Integer[] {1,2,5})); parts.add(Arrays.asList(new Integer[] {8,2,5})); m.setObject("parts", parts); Map<String,Object> specs = new HashMap<String,Object>(); specs.put("colours", colors); specs.put("dimensions", dimensions); specs.put("parts", parts); m.setObject("specs",specs); producer.send(m); connection.close(); } }
</pre></p><p><pre name="code" class="java">/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package com.undergrowth.qpid; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; public class MapReceiver { public static void main(String[] args) throws Exception { Connection connection = new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); MessageConsumer consumer = session.createConsumer(queue); MapMessage m = (MapMessage)consumer.receive(); System.out.println(m); connection.close(); } }
对于地址参数 附加一张图
在上面的源码上 还有另外几个例子 如传递ListMessage 还有带参数的传递之类的
对于broker的概念 下面几张图 比较好 就截出来了