博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习笔记4-使用fanout交换器
阅读量:7026 次
发布时间:2019-06-28

本文共 6890 字,大约阅读时间需要 22 分钟。

fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果。

本里使用实际业务中常见的例子,

订单系统:创建订单,然后发送一个事件消息

积分系统:发送订单的积分奖励

短信平台:发送订单的短信

 

消息生产者SenderWithFanoutExchange

1 package com.yzl.test3; 2  3 import java.util.Date; 4  5 import com.google.gson.Gson; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 /**11  *    使用fanout交换器产生事件,消费者订阅事件做相应的处理12  * @author: yzl13  * @date: 2016-10-2214  */15 public class SenderWithFanoutExchange {16     //交换器名称17     private static final String EXCHANGE_NAME = "myFanoutExchange";18     19     public static void main(String[] args) throws Exception {20         //连接到rabbitmq服务器21         ConnectionFactory factory = new ConnectionFactory();22         factory.setHost("localhost");23         Connection connection = factory.newConnection();24         //创建一个信道25         final Channel channel = connection.createChannel();26         //定义一个名字为topicExchange的fanout类型的exchange27         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");28         29       //创建一个时间的Event对象30         EventObj createOrderEvent = null;31         for(int i=1; i<10; i++){32             createOrderEvent = new EventObj();33             createOrderEvent.setUserId(Long.valueOf(i));34             createOrderEvent.setCreateTime(new Date());35             createOrderEvent.setEventType("create_order");36             //转成JSON37             String msg = new Gson().toJson(createOrderEvent);38             39             System.out.println("send msg:" + msg);40             41             //使用order_event路由键来发送该事件消息42             channel.basicPublish(EXCHANGE_NAME, "order_event", null, msg.getBytes());43             44             Thread.sleep(1000);45         }46         47         channel.close();48         connection.close();49     }50 }

消费消费者ReceiverWithFanoutExchange

1 package com.yzl.test3; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope;10 import com.rabbitmq.client.AMQP.BasicProperties;11 12 /**13  * 使用fanout交换器接收订单事件消息14  * 15  * @author: yzl16  * @date: 2016-10-2217  */18 public class ReceiverWithFanoutExchange {19     // 交换器名称20     private static final String EXCHANGE_NAME = "myFanoutExchange";21     //接收订单事件并发放积分的队列22     private static final String QUEUE_ORDER_REWARD_POINTS = "rewardOrderPoints";23     //发放订单积分的路由键24     private static final String ROUTING_KEY_ORDER_POINTS = "reward_order_points";25     //接收订单事件并发短信的队列26     private static final String QUEUE_ORDER_SEND_SMS = "sendOrderSms";27     //发送订单短信的路由键28     private static final String ROUTING_KEY_ORDER_SMS = "send_order_sms";29     30     private static Channel channel = null;31     32     static{33         try{34             // 连接到rabbitmq服务器35             ConnectionFactory factory = new ConnectionFactory();36             factory.setHost("localhost");37             Connection connection = factory.newConnection();38             // 创建一个信道39             channel = connection.createChannel();40             // 定义一个名字为myFanoutExchange的fanout类型的exchange41             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");42         }catch (Exception e) {43             // TODO: handle exception44         }45     }46     47     /**48      * 发放订单的积分奖励49      */50     public static void rewardPoints() throws Exception {51         channel.queueDeclare(QUEUE_ORDER_REWARD_POINTS, false, false, false, null);52         channel.queueBind(QUEUE_ORDER_REWARD_POINTS, EXCHANGE_NAME, ROUTING_KEY_ORDER_POINTS);53         54         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){55             @Override56             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)57                     throws IOException {58                 String msg = new String(body);59                 System.out.println("积分系统接收到订单创建的事件消息 :" + msg);60                 System.out.println("准备发放积分.....");61             }62         });63     }64     65     /**66      * 发送订单成功的短信67      */68     public static void sendSms() throws Exception {69         channel.queueDeclare(QUEUE_ORDER_SEND_SMS, false, false, false, null);70         channel.queueBind(QUEUE_ORDER_SEND_SMS, EXCHANGE_NAME, ROUTING_KEY_ORDER_SMS);71         72         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){73             @Override74             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)75                     throws IOException {76                 String msg = new String(body);77                 System.out.println("短信平台接收到订单创建的事件消息 :" + msg);78                 System.out.println("准备发送短信.....");79             }80         });81     }82 83     public static void main(String[] args) throws Exception {84         rewardPoints();85         sendSms();86     }87 }

运行结果输出:

1 send msg:{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}2 send msg:{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}3 send msg:{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}4 send msg:{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}5 send msg:{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}6 send msg:{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}7 send msg:{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}8 send msg:{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}9 send msg:{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
1 积分系统接收到订单创建的事件消息 :{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"} 2 准备发放积分..... 3 短信平台接收到订单创建的事件消息 :{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"} 4 准备发送短信..... 5 积分系统接收到订单创建的事件消息 :{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"} 6 准备发放积分..... 7 短信平台接收到订单创建的事件消息 :{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"} 8 准备发送短信..... 9 积分系统接收到订单创建的事件消息 :{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}10 准备发放积分.....11 短信平台接收到订单创建的事件消息 :{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}12 准备发送短信.....13 积分系统接收到订单创建的事件消息 :{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}14 准备发放积分.....15 短信平台接收到订单创建的事件消息 :{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}16 准备发送短信.....17 积分系统接收到订单创建的事件消息 :{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}18 准备发放积分.....

转载地址:http://cpoxl.baihongyu.com/

你可能感兴趣的文章
如何造一个迷你Vue
查看>>
java的8种基础类型
查看>>
在 Android 设备上搭建 Web 服务器
查看>>
Spring工作原理及流程
查看>>
解决1px的border在移动端变粗的问题
查看>>
JavaScript实现数组去重的常见方式
查看>>
中文域名nginx配置
查看>>
Nodejs教程20:WebSocket之二:用原生实现WebSocket应用
查看>>
InterviewMap —— Javascript (二)
查看>>
js数组操作
查看>>
比特币重回4000美元的关口
查看>>
传统短视频直播平台和新兴一对一交友源码力与美的结合
查看>>
表格的表头有斜线
查看>>
LeetCode 206——反转链表
查看>>
撩课大前端-面试宝典-第七篇
查看>>
开源大数据周刊-第3期
查看>>
java版 b2b2c o2o电子商务云商平台spring cloud+springmvc+mybatis
查看>>
区块链100讲:Hyperledger Fabric 区块链多机部署
查看>>
【中后台应用】从表单抽象到表单中台
查看>>
重学前端学习笔记(十九)--JavaScript中的函数
查看>>