fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果。
本里使用实际业务中常见的例子,
订单系统:创建订单,然后发送一个事件消息
积分系统:发送订单的积分奖励
短信平台:发送订单的短信
消息生产者SenderWithFanoutExchange
1 package com.yzl.test3; 2 3 import java.util.Date; 4 5 import com.google.gson.Gson; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 /**11 * 使用fanout交换器产生事件,消费者订阅事件做相应的处理12 * @author: yzl13 * @date: 2016-10-2214 */15 public class SenderWithFanoutExchange {16 //交换器名称17 private static final String EXCHANGE_NAME = "myFanoutExchange";18 19 public static void main(String[] args) throws Exception {20 //连接到rabbitmq服务器21 ConnectionFactory factory = new ConnectionFactory();22 factory.setHost("localhost");23 Connection connection = factory.newConnection();24 //创建一个信道25 final Channel channel = connection.createChannel();26 //定义一个名字为topicExchange的fanout类型的exchange27 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");28 29 //创建一个时间的Event对象30 EventObj createOrderEvent = null;31 for(int i=1; i<10; i++){32 createOrderEvent = new EventObj();33 createOrderEvent.setUserId(Long.valueOf(i));34 createOrderEvent.setCreateTime(new Date());35 createOrderEvent.setEventType("create_order");36 //转成JSON37 String msg = new Gson().toJson(createOrderEvent);38 39 System.out.println("send msg:" + msg);40 41 //使用order_event路由键来发送该事件消息42 channel.basicPublish(EXCHANGE_NAME, "order_event", null, msg.getBytes());43 44 Thread.sleep(1000);45 }46 47 channel.close();48 connection.close();49 }50 }
消费消费者ReceiverWithFanoutExchange
1 package com.yzl.test3; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope;10 import com.rabbitmq.client.AMQP.BasicProperties;11 12 /**13 * 使用fanout交换器接收订单事件消息14 * 15 * @author: yzl16 * @date: 2016-10-2217 */18 public class ReceiverWithFanoutExchange {19 // 交换器名称20 private static final String EXCHANGE_NAME = "myFanoutExchange";21 //接收订单事件并发放积分的队列22 private static final String QUEUE_ORDER_REWARD_POINTS = "rewardOrderPoints";23 //发放订单积分的路由键24 private static final String ROUTING_KEY_ORDER_POINTS = "reward_order_points";25 //接收订单事件并发短信的队列26 private static final String QUEUE_ORDER_SEND_SMS = "sendOrderSms";27 //发送订单短信的路由键28 private static final String ROUTING_KEY_ORDER_SMS = "send_order_sms";29 30 private static Channel channel = null;31 32 static{33 try{34 // 连接到rabbitmq服务器35 ConnectionFactory factory = new ConnectionFactory();36 factory.setHost("localhost");37 Connection connection = factory.newConnection();38 // 创建一个信道39 channel = connection.createChannel();40 // 定义一个名字为myFanoutExchange的fanout类型的exchange41 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");42 }catch (Exception e) {43 // TODO: handle exception44 }45 }46 47 /**48 * 发放订单的积分奖励49 */50 public static void rewardPoints() throws Exception {51 channel.queueDeclare(QUEUE_ORDER_REWARD_POINTS, false, false, false, null);52 channel.queueBind(QUEUE_ORDER_REWARD_POINTS, EXCHANGE_NAME, ROUTING_KEY_ORDER_POINTS);53 54 channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){55 @Override56 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)57 throws IOException {58 String msg = new String(body);59 System.out.println("积分系统接收到订单创建的事件消息 :" + msg);60 System.out.println("准备发放积分.....");61 }62 });63 }64 65 /**66 * 发送订单成功的短信67 */68 public static void sendSms() throws Exception {69 channel.queueDeclare(QUEUE_ORDER_SEND_SMS, false, false, false, null);70 channel.queueBind(QUEUE_ORDER_SEND_SMS, EXCHANGE_NAME, ROUTING_KEY_ORDER_SMS);71 72 channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){73 @Override74 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)75 throws IOException {76 String msg = new String(body);77 System.out.println("短信平台接收到订单创建的事件消息 :" + msg);78 System.out.println("准备发送短信.....");79 }80 });81 }82 83 public static void main(String[] args) throws Exception {84 rewardPoints();85 sendSms();86 }87 }
运行结果输出:
1 send msg:{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}2 send msg:{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}3 send msg:{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}4 send msg:{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}5 send msg:{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}6 send msg:{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}7 send msg:{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}8 send msg:{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}9 send msg:{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
1 积分系统接收到订单创建的事件消息 :{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"} 2 准备发放积分..... 3 短信平台接收到订单创建的事件消息 :{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"} 4 准备发送短信..... 5 积分系统接收到订单创建的事件消息 :{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"} 6 准备发放积分..... 7 短信平台接收到订单创建的事件消息 :{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"} 8 准备发送短信..... 9 积分系统接收到订单创建的事件消息 :{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}10 准备发放积分.....11 短信平台接收到订单创建的事件消息 :{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}12 准备发送短信.....13 积分系统接收到订单创建的事件消息 :{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}14 准备发放积分.....15 短信平台接收到订单创建的事件消息 :{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}16 准备发送短信.....17 积分系统接收到订单创建的事件消息 :{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}18 准备发放积分.....