From b48c2d48c192187cf7fb4fb8645819a67da9d981 Mon Sep 17 00:00:00 2001 From: yuyang <2265829957@qq.com> Date: Thu, 16 Sep 2021 15:46:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AE=9E=E6=97=B6=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../order/api/XhpcPileOrderController.java | 51 ++++++++++--------- .../com/xhpc/order/rabbitmq/RecvDemo.java | 8 --- .../com/xhpc/order/rabbitmq/SendDemo.java | 11 ---- .../OrderNotificationWebSocketController.java | 9 ---- .../xhpc/wxma/rabbitm/RabbitmConsumer.java | 12 ----- 5 files changed, 26 insertions(+), 65 deletions(-) diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/api/XhpcPileOrderController.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/api/XhpcPileOrderController.java index 09bd78c4..d0bcd78b 100644 --- a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/api/XhpcPileOrderController.java +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/api/XhpcPileOrderController.java @@ -3,15 +3,13 @@ package com.xhpc.order.api; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSONObject; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; +import com.xhpc.common.api.WebSocketService; import com.xhpc.common.core.domain.R; import com.xhpc.common.core.web.controller.BaseController; import com.xhpc.common.data.redis.CacheOrderData; import com.xhpc.common.data.redis.CacheRealtimeData; import com.xhpc.common.domain.XhpcRate; import com.xhpc.common.redis.service.RedisService; -import com.xhpc.common.util.ConnectionRabbitMQUtil; import com.xhpc.order.domain.XhpcChargeOrder; import com.xhpc.order.domain.XhpcHistoryOrder; import com.xhpc.order.service.IXhpcChargeOrderService; @@ -43,6 +41,9 @@ public class XhpcPileOrderController extends BaseController { @Autowired private IXhpcRealTimeOrderService xhpcRealTimeOrderService; + @Autowired + private WebSocketService webSocketService; + private static final Logger logger = LoggerFactory.getLogger(XhpcPileOrderController.class); /** * 测试 @@ -81,7 +82,7 @@ public class XhpcPileOrderController extends BaseController { map.put("message", remark); JSONObject json = new JSONObject(map); //消息对了内容 - rabbimt(userId + "##" + json); + webSocketService.getMessage(userId+"",json.toString()); return R.ok(); } @@ -165,7 +166,7 @@ public class XhpcPileOrderController extends BaseController { map.put("code", code); JSONObject json = new JSONObject(map); //消息对了内容 - rabbimt(userId + "##" + json); + webSocketService.getMessage(userId+"",json.toString()); return R.ok(); } @@ -192,14 +193,14 @@ public class XhpcPileOrderController extends BaseController { Map map = xhpcRealTimeOrderService.addOrderTime(cacheRealtimeData, xhpcChargeOrder, orderNo, 1); JSONObject json = new JSONObject(map); //消息对了内容 - rabbimt(userId + "##" + json); + webSocketService.getMessage(userId+"",json.toString()); } catch (Exception e) { Map map = new HashMap<>(); map.put("code", 500); map.put("userId", xhpcChargeOrder.getUserId()); JSONObject json = new JSONObject(map); //消息对了内容 - rabbimt(userId + "##" + json); + webSocketService.getMessage(userId+"",json.toString()); return R.fail(500,"无实时数据"); } return R.ok(); @@ -328,7 +329,7 @@ public class XhpcPileOrderController extends BaseController { map.put("code", 500); map.put("userId", userId); JSONObject json = new JSONObject(map); - rabbimt(userId + "##" + json); + webSocketService.getMessage(userId+"",json.toString()); }catch (Exception e){ e.printStackTrace(); return R.fail(500,"添加订单回调失败"); @@ -367,22 +368,22 @@ public class XhpcPileOrderController extends BaseController { return userId; } - private void rabbimt(String message) { - //发送消息队列 - try { - // 1、获取到连接 - Connection connection = ConnectionRabbitMQUtil.getConnection(); - // 2、从连接中创建通道,使用通道才能完成消息相关的操作 - Channel channel = connection.createChannel(); - // 3、声明(创建)队列 - channel.queueDeclare("webSocket", false, false, false, null); - // 4、消息内容 - channel.basicPublish("", "webSocket", null, message.getBytes()); - channel.close(); - connection.close(); - } catch (Exception e) { - - } - } +// private void rabbimt(String message) { +// //发送消息队列 +// try { +// // 1、获取到连接 +// Connection connection = ConnectionRabbitMQUtil.getConnection(); +// // 2、从连接中创建通道,使用通道才能完成消息相关的操作 +// Channel channel = connection.createChannel(); +// // 3、声明(创建)队列 +// channel.queueDeclare("webSocket", false, false, false, null); +// // 4、消息内容 +// channel.basicPublish("", "webSocket", null, message.getBytes()); +// channel.close(); +// connection.close(); +// } catch (Exception e) { +// +// } +// } } diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java index 899b2e2a..1b93aa8d 100644 --- a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java @@ -20,8 +20,6 @@ public class RecvDemo { Connection connection = ConnectionRabbitMQUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); - // 声明队列 - //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments /** * 参数明细 * 1、queue 队列名称 @@ -44,12 +42,6 @@ public class RecvDemo { */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - //测试自动ack,模拟异常 - //int i=1/0; - //交换机 - String exchange = envelope.getExchange(); - //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 - long deliveryTag = envelope.getDeliveryTag(); // body 即消息体 String msg = new String(body,"utf-8"); System.out.println(" [x] received : " + msg + "!"); diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java index fc693952..9111d0e1 100644 --- a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java @@ -37,8 +37,6 @@ public class SendDemo { channel.queueDeclare(NAME, false, false, false, null); // 4、消息内容 String message = "Hello World!!!!"; - // 向指定的队列中发送消息 - //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") @@ -47,15 +45,6 @@ public class SendDemo { * 4、body,消息内容 */ channel.basicPublish("", NAME, null, message.getBytes()); -// // 循环发布任务 -// for (int i = 0; i < 50; i++) { -// // 消息内容 -// String message = "task .. " + i; -// channel.basicPublish("", NAME, null, message.getBytes()); -// System.out.println(" [x] Sent '" + message + "'"); -// -// Thread.sleep(i * 2); -// } //关闭通道和连接(资源关闭最好用try-catch-finally语句处理) channel.close(); connection.close(); diff --git a/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/controller/OrderNotificationWebSocketController.java b/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/controller/OrderNotificationWebSocketController.java index ba7930a4..f8f5d1e7 100644 --- a/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/controller/OrderNotificationWebSocketController.java +++ b/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/controller/OrderNotificationWebSocketController.java @@ -1,18 +1,12 @@ package com.xhpc.wxma.controller; -import cn.hutool.core.date.DateUtil; import com.xhpc.common.core.domain.R; import com.xhpc.common.core.web.controller.BaseController; import com.xhpc.wxma.socket.OrderNotificationWebSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import java.util.Calendar; - /** * @author yuyang * @date 2021/8/11 17:47 @@ -20,7 +14,6 @@ import java.util.Calendar; @RestController public class OrderNotificationWebSocketController extends BaseController { - private static final Logger logger = LoggerFactory.getLogger(OrderNotificationWebSocketController.class); @GetMapping("/test") public void test(@RequestParam String userId){ OrderNotificationWebSocket.sendMessage(userId,"有新订单啦"); @@ -28,9 +21,7 @@ public class OrderNotificationWebSocketController extends BaseController { @GetMapping("/orderWebSocket/getMessage") public R getMessage(@RequestParam String userId, @RequestParam String message){ - logger.info("<<<<1111<<<<<<接收时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>"); OrderNotificationWebSocket.sendMessage(userId,message); - logger.info("<<<<1111<<<<发送时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>"); return R.ok(); } diff --git a/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/rabbitm/RabbitmConsumer.java b/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/rabbitm/RabbitmConsumer.java index a57dc9fd..e96bfaf9 100644 --- a/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/rabbitm/RabbitmConsumer.java +++ b/xhpc-modules/xhpc-wxma/src/main/java/com/xhpc/wxma/rabbitm/RabbitmConsumer.java @@ -1,17 +1,13 @@ package com.xhpc.wxma.rabbitm; -import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.*; import com.xhpc.common.util.ConnectionRabbitMQUtil; import com.xhpc.wxma.socket.OrderNotificationWebSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.io.IOException; -import java.util.Calendar; /** * @author yuyang @@ -22,8 +18,6 @@ public class RabbitmConsumer implements ApplicationRunner { //队列名称 private final static String NAME="webSocket"; - private static final Logger logger = LoggerFactory.getLogger(RabbitmConsumer.class); - @Override public void run(ApplicationArguments args) throws Exception { try { @@ -38,15 +32,9 @@ public class RabbitmConsumer implements ApplicationRunner { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - String exchange = envelope.getExchange(); - //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 - //long deliveryTag = envelope.getDeliveryTag(); - // body 即消息体 - logger.info("<<<<<<<<<<收到消息队列时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>"); String msg = new String(body,"utf-8"); String[] split = msg.split("##"); OrderNotificationWebSocket.sendMessage(split[0],split[1]); - logger.info("<<<<<<<<<<消息队列发送时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>"); } }; channel.basicConsume(NAME, true, consumer);