优化实时数据

This commit is contained in:
yuyang 2021-09-16 15:46:05 +08:00
parent 24f55a7626
commit b48c2d48c1
5 changed files with 26 additions and 65 deletions

View File

@ -3,15 +3,13 @@ package com.xhpc.order.api;
import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel; import com.xhpc.common.api.WebSocketService;
import com.rabbitmq.client.Connection;
import com.xhpc.common.core.domain.R; import com.xhpc.common.core.domain.R;
import com.xhpc.common.core.web.controller.BaseController; import com.xhpc.common.core.web.controller.BaseController;
import com.xhpc.common.data.redis.CacheOrderData; import com.xhpc.common.data.redis.CacheOrderData;
import com.xhpc.common.data.redis.CacheRealtimeData; import com.xhpc.common.data.redis.CacheRealtimeData;
import com.xhpc.common.domain.XhpcRate; import com.xhpc.common.domain.XhpcRate;
import com.xhpc.common.redis.service.RedisService; import com.xhpc.common.redis.service.RedisService;
import com.xhpc.common.util.ConnectionRabbitMQUtil;
import com.xhpc.order.domain.XhpcChargeOrder; import com.xhpc.order.domain.XhpcChargeOrder;
import com.xhpc.order.domain.XhpcHistoryOrder; import com.xhpc.order.domain.XhpcHistoryOrder;
import com.xhpc.order.service.IXhpcChargeOrderService; import com.xhpc.order.service.IXhpcChargeOrderService;
@ -43,6 +41,9 @@ public class XhpcPileOrderController extends BaseController {
@Autowired @Autowired
private IXhpcRealTimeOrderService xhpcRealTimeOrderService; private IXhpcRealTimeOrderService xhpcRealTimeOrderService;
@Autowired
private WebSocketService webSocketService;
private static final Logger logger = LoggerFactory.getLogger(XhpcPileOrderController.class); private static final Logger logger = LoggerFactory.getLogger(XhpcPileOrderController.class);
/** /**
* 测试 * 测试
@ -81,7 +82,7 @@ public class XhpcPileOrderController extends BaseController {
map.put("message", remark); map.put("message", remark);
JSONObject json = new JSONObject(map); JSONObject json = new JSONObject(map);
//消息对了内容 //消息对了内容
rabbimt(userId + "##" + json); webSocketService.getMessage(userId+"",json.toString());
return R.ok(); return R.ok();
} }
@ -165,7 +166,7 @@ public class XhpcPileOrderController extends BaseController {
map.put("code", code); map.put("code", code);
JSONObject json = new JSONObject(map); JSONObject json = new JSONObject(map);
//消息对了内容 //消息对了内容
rabbimt(userId + "##" + json); webSocketService.getMessage(userId+"",json.toString());
return R.ok(); return R.ok();
} }
@ -192,14 +193,14 @@ public class XhpcPileOrderController extends BaseController {
Map<String, Object> map = xhpcRealTimeOrderService.addOrderTime(cacheRealtimeData, xhpcChargeOrder, orderNo, 1); Map<String, Object> map = xhpcRealTimeOrderService.addOrderTime(cacheRealtimeData, xhpcChargeOrder, orderNo, 1);
JSONObject json = new JSONObject(map); JSONObject json = new JSONObject(map);
//消息对了内容 //消息对了内容
rabbimt(userId + "##" + json); webSocketService.getMessage(userId+"",json.toString());
} catch (Exception e) { } catch (Exception e) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
map.put("code", 500); map.put("code", 500);
map.put("userId", xhpcChargeOrder.getUserId()); map.put("userId", xhpcChargeOrder.getUserId());
JSONObject json = new JSONObject(map); JSONObject json = new JSONObject(map);
//消息对了内容 //消息对了内容
rabbimt(userId + "##" + json); webSocketService.getMessage(userId+"",json.toString());
return R.fail(500,"无实时数据"); return R.fail(500,"无实时数据");
} }
return R.ok(); return R.ok();
@ -328,7 +329,7 @@ public class XhpcPileOrderController extends BaseController {
map.put("code", 500); map.put("code", 500);
map.put("userId", userId); map.put("userId", userId);
JSONObject json = new JSONObject(map); JSONObject json = new JSONObject(map);
rabbimt(userId + "##" + json); webSocketService.getMessage(userId+"",json.toString());
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
return R.fail(500,"添加订单回调失败"); return R.fail(500,"添加订单回调失败");
@ -367,22 +368,22 @@ public class XhpcPileOrderController extends BaseController {
return userId; return userId;
} }
private void rabbimt(String message) { // private void rabbimt(String message) {
//发送消息队列 // //发送消息队列
try { // try {
// 1获取到连接 // // 1获取到连接
Connection connection = ConnectionRabbitMQUtil.getConnection(); // Connection connection = ConnectionRabbitMQUtil.getConnection();
// 2从连接中创建通道使用通道才能完成消息相关的操作 // // 2从连接中创建通道使用通道才能完成消息相关的操作
Channel channel = connection.createChannel(); // Channel channel = connection.createChannel();
// 3声明创建队列 // // 3声明创建队列
channel.queueDeclare("webSocket", false, false, false, null); // channel.queueDeclare("webSocket", false, false, false, null);
// 4消息内容 // // 4消息内容
channel.basicPublish("", "webSocket", null, message.getBytes()); // channel.basicPublish("", "webSocket", null, message.getBytes());
channel.close(); // channel.close();
connection.close(); // connection.close();
} catch (Exception e) { // } catch (Exception e) {
//
} // }
} // }
} }

View File

@ -20,8 +20,6 @@ public class RecvDemo {
Connection connection = ConnectionRabbitMQUtil.getConnection(); Connection connection = ConnectionRabbitMQUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成 //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel(); Channel channel = connection.createChannel();
// 声明队列
//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/** /**
* 参数明细 * 参数明细
* 1queue 队列名称 * 1queue 队列名称
@ -44,12 +42,6 @@ public class RecvDemo {
*/ */
@Override @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//测试自动ack模拟异常
//int i=1/0;
//交换机
String exchange = envelope.getExchange();
//消息idmq在channel中用来标识消息的id可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体 // body 即消息体
String msg = new String(body,"utf-8"); String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!"); System.out.println(" [x] received : " + msg + "!");

View File

@ -37,8 +37,6 @@ public class SendDemo {
channel.queueDeclare(NAME, false, false, false, null); channel.queueDeclare(NAME, false, false, false, null);
// 4消息内容 // 4消息内容
String message = "Hello World!!!!"; String message = "Hello World!!!!";
// 向指定的队列中发送消息
//参数String exchange, String routingKey, BasicProperties props, byte[] body
/** /**
* 参数明细 * 参数明细
* 1exchange交换机如果不指定将使用mq的默认交换机设置为"" * 1exchange交换机如果不指定将使用mq的默认交换机设置为""
@ -47,15 +45,6 @@ public class SendDemo {
* 4body消息内容 * 4body消息内容
*/ */
channel.basicPublish("", NAME, null, message.getBytes()); channel.basicPublish("", NAME, null, message.getBytes());
// // 循环发布任务
// for (int i = 0; i < 50; i++) {
// // 消息内容
// String message = "task .. " + i;
// channel.basicPublish("", NAME, null, message.getBytes());
// System.out.println(" [x] Sent '" + message + "'");
//
// Thread.sleep(i * 2);
// }
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理) //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
channel.close(); channel.close();
connection.close(); connection.close();

View File

@ -1,18 +1,12 @@
package com.xhpc.wxma.controller; package com.xhpc.wxma.controller;
import cn.hutool.core.date.DateUtil;
import com.xhpc.common.core.domain.R; import com.xhpc.common.core.domain.R;
import com.xhpc.common.core.web.controller.BaseController; import com.xhpc.common.core.web.controller.BaseController;
import com.xhpc.wxma.socket.OrderNotificationWebSocket; import com.xhpc.wxma.socket.OrderNotificationWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.Calendar;
/** /**
* @author yuyang * @author yuyang
* @date 2021/8/11 17:47 * @date 2021/8/11 17:47
@ -20,7 +14,6 @@ import java.util.Calendar;
@RestController @RestController
public class OrderNotificationWebSocketController extends BaseController { public class OrderNotificationWebSocketController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(OrderNotificationWebSocketController.class);
@GetMapping("/test") @GetMapping("/test")
public void test(@RequestParam String userId){ public void test(@RequestParam String userId){
OrderNotificationWebSocket.sendMessage(userId,"有新订单啦"); OrderNotificationWebSocket.sendMessage(userId,"有新订单啦");
@ -28,9 +21,7 @@ public class OrderNotificationWebSocketController extends BaseController {
@GetMapping("/orderWebSocket/getMessage") @GetMapping("/orderWebSocket/getMessage")
public R getMessage(@RequestParam String userId, @RequestParam String message){ public R getMessage(@RequestParam String userId, @RequestParam String message){
logger.info("<<<<1111<<<<<<接收时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>");
OrderNotificationWebSocket.sendMessage(userId,message); OrderNotificationWebSocket.sendMessage(userId,message);
logger.info("<<<<1111<<<<发送时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>");
return R.ok(); return R.ok();
} }

View File

@ -1,17 +1,13 @@
package com.xhpc.wxma.rabbitm; package com.xhpc.wxma.rabbitm;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.*; import com.rabbitmq.client.*;
import com.xhpc.common.util.ConnectionRabbitMQUtil; import com.xhpc.common.util.ConnectionRabbitMQUtil;
import com.xhpc.wxma.socket.OrderNotificationWebSocket; import com.xhpc.wxma.socket.OrderNotificationWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.util.Calendar;
/** /**
* @author yuyang * @author yuyang
@ -22,8 +18,6 @@ public class RabbitmConsumer implements ApplicationRunner {
//队列名称 //队列名称
private final static String NAME="webSocket"; private final static String NAME="webSocket";
private static final Logger logger = LoggerFactory.getLogger(RabbitmConsumer.class);
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
try { try {
@ -38,15 +32,9 @@ public class RabbitmConsumer implements ApplicationRunner {
// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用 // 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用
@Override @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
//消息idmq在channel中用来标识消息的id可用于确认消息已接收
//long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
logger.info("<<<<<<<<<<收到消息队列时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>");
String msg = new String(body,"utf-8"); String msg = new String(body,"utf-8");
String[] split = msg.split("##"); String[] split = msg.split("##");
OrderNotificationWebSocket.sendMessage(split[0],split[1]); OrderNotificationWebSocket.sendMessage(split[0],split[1]);
logger.info("<<<<<<<<<<消息队列发送时间<<<<<<<<<<<<<<"+ DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss")+">>>>>>>>>>>>>>>>>");
} }
}; };
channel.basicConsume(NAME, true, consumer); channel.basicConsume(NAME, true, consumer);