代码:
namespace RabbitMQDemo { public partial class PublishSubscribe : Form { private string exchangeName = "logs"; private string exchangeType = ExchangeType.Fanout;//广播模式 Action<string, TextBox> SetText; private readonly static PublishSubscribe _PublishSubscribe; static PublishSubscribe() { _PublishSubscribe = new PublishSubscribe(); } /// <summary> /// 单例模式 /// </summary> public static PublishSubscribe SingleForm { get { return _PublishSubscribe; } } private PublishSubscribe() { CheckForIllegalCrossThreadCalls = false; InitializeComponent(); ReceiveMsg(txtConsumer1);//消费者1 ReceiveMsg(txtConsumer2);//消费者2 SetText += OnSetText; } private void btnSendMsg_Click(object sender, EventArgs e) { SendMsg(); } /// <summary> /// 发送消息 /// </summary> private void SendMsg() { string message = txtPublisher.Text; if (message.Trim().Length <= 0) { MessageBox.Show("请输入要发送的消息"); } var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare( exchange: exchangeName, type: exchangeType); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body); } } /// <summary> /// 接收消息 /// </summary> private void ReceiveMsg(TextBox box) { try { var factory = new ConnectionFactory() { HostName = "localhost" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); //声明交换机 channel.ExchangeDeclare( exchange: exchangeName, type: exchangeType); //rabbitmq随机生成队列名 durable=false exclusive=true, autodelete=true //exchangeName发来的消息在接收端永远都是新的队列在接收 var queueName = channel.QueueDeclare().QueueName; //绑定队列和交换机 //必须绑定了名为exchangeName的queueName队列才能收到消息 channel.QueueBind( queue: queueName, exchange: exchangeName, routingKey: ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msg = Encoding.UTF8.GetString(ea.Body); txtConsumer1.Invoke(SetText, msg, box); }; channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer); } catch (Exception ex) { MessageBox.Show(ex.ToString()); } } private void OnSetText(string text, TextBox box) { box.Text += string.Format("{0} ", text); } } }
界面:
大概流程:
生产者发送一条消息通过exchange交换机绑定到2个队列上,两个队列上都会有这一条消息,消费者1,2向队列取出消息并做处理
测试结果: