From c1b585936e1f4f4d4dd549ee8f4f9a326b4ea1de Mon Sep 17 00:00:00 2001 From: yuyang <2265829957@qq.com> Date: Sat, 7 Aug 2021 17:29:45 +0800 Subject: [PATCH] RabbitMQ Demo --- xhpc-modules/xhpc-order/pom.xml | 6 +- .../xhpc/order/domain/XhpcRealTimeOrder.java | 350 ++++++++++++++++++ .../order/mapper/XhpcRealTimeOrderMapper.java | 17 + .../com/xhpc/order/rabbitmq/RecvDemo.java | 77 ++++ .../com/xhpc/order/rabbitmq/SendDemo.java | 71 ++++ .../service/IXhpcRealTimeOrderService.java | 16 + .../impl/XhpcRealTimeOrderServiceImpl.java | 28 ++ .../order/util/ConnectionRabbitMQUtil.java | 31 ++ .../mapper/XhpcRealTimeOrderMapper.xml | 210 +++++++++++ 9 files changed, 805 insertions(+), 1 deletion(-) create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/domain/XhpcRealTimeOrder.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/mapper/XhpcRealTimeOrderMapper.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/IXhpcRealTimeOrderService.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/impl/XhpcRealTimeOrderServiceImpl.java create mode 100644 xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/util/ConnectionRabbitMQUtil.java create mode 100644 xhpc-modules/xhpc-order/src/main/resources/mapper/XhpcRealTimeOrderMapper.xml diff --git a/xhpc-modules/xhpc-order/pom.xml b/xhpc-modules/xhpc-order/pom.xml index 650d1fa3..3f0686e1 100644 --- a/xhpc-modules/xhpc-order/pom.xml +++ b/xhpc-modules/xhpc-order/pom.xml @@ -21,7 +21,11 @@ - + + com.rabbitmq + amqp-client + 5.7.1 + com.alibaba.cloud diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/domain/XhpcRealTimeOrder.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/domain/XhpcRealTimeOrder.java new file mode 100644 index 00000000..2d1468c7 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/domain/XhpcRealTimeOrder.java @@ -0,0 +1,350 @@ +package com.xhpc.order.domain; + +import com.xhpc.common.core.web.domain.BaseEntity; + +/** + * @author yuyang + * @date 2021/8/7 15:06 + */ +public class XhpcRealTimeOrder extends BaseEntity { + + /** + * 充电订单实时数据 + */ + private Long realTimeOrderId; + /** + * 充电订单id + */ + private Long chargingOrderId; + /** + * 交易流水号 + */ + private String transactionNumber; + /** + * 桩编号 + */ + private String pileNumber; + /** + * 枪编号 + */ + private String gunNumber; + /** + * 枪是否归位(0否 1 是) + */ + private Integer pileGunStatus; + /** + * 是否插枪(0否 1是) + */ + private Long vehicleGunStatus; + /** + * 电压 + */ + private Double voltage; + /** + * 电流 + */ + private Double electricCurrent; + /** + * 枪线温度 + */ + private Double gunLineTemperature; + /** + * 枪线编码 + */ + private Double gunLineNumber; + /** + * SOC + */ + private String soc; + /** + * 电池组最高温度 + */ + private Double maxTemperature; + /** + * 累计充电时间 + */ + private String chargingTime; + /** + * 剩余时间 + */ + private String remainingTime; + /** + * 充电度数 + */ + private Double chargingDegree; + /** + * 计损充电度数 + */ + private Double lossChargingDegree; + /** + * 已充金额 + */ + private Double amountCharged; + /** + * 硬件故障 + */ + private String hardwareFault; + /** + * (状态(0离线 1故障 2空闲 3充电)) + */ + private Integer status; + /** + * 备注 + */ + private String remark; + /** + * 充电用户id + */ + private Long userId; + /** + * 场站id + */ + private Long chargingStationId; + /** + * 0桩停止充电 1 远程停止充电 2充电中 + */ + private Integer type; + + public Long getChargingOrderId() { + + return chargingOrderId; + } + + public void setChargingOrderId(Long chargingOrderId) { + + this.chargingOrderId = chargingOrderId; + } + + public String getTransactionNumber() { + + return transactionNumber; + } + + public void setTransactionNumber(String transactionNumber) { + + this.transactionNumber = transactionNumber; + } + + public String getPileNumber() { + + return pileNumber; + } + + public void setPileNumber(String pileNumber) { + + this.pileNumber = pileNumber; + } + + public Integer getPileGunStatus() { + + return pileGunStatus; + } + + public void setPileGunStatus(Integer pileGunStatus) { + + this.pileGunStatus = pileGunStatus; + } + + public Long getVehicleGunStatus() { + + return vehicleGunStatus; + } + + public void setVehicleGunStatus(Long vehicleGunStatus) { + + this.vehicleGunStatus = vehicleGunStatus; + } + + public Double getVoltage() { + + return voltage; + } + + public void setVoltage(Double voltage) { + + this.voltage = voltage; + } + + public Double getElectricCurrent() { + + return electricCurrent; + } + + public void setElectricCurrent(Double electricCurrent) { + + this.electricCurrent = electricCurrent; + } + + public Double getGunLineTemperature() { + + return gunLineTemperature; + } + + public void setGunLineTemperature(Double gunLineTemperature) { + + this.gunLineTemperature = gunLineTemperature; + } + + public Double getGunLineNumber() { + + return gunLineNumber; + } + + public void setGunLineNumber(Double gunLineNumber) { + + this.gunLineNumber = gunLineNumber; + } + + public String getSoc() { + + return soc; + } + + public void setSoc(String soc) { + + this.soc = soc; + } + + public Double getMaxTemperature() { + + return maxTemperature; + } + + public void setMaxTemperature(Double maxTemperature) { + + this.maxTemperature = maxTemperature; + } + + public String getChargingTime() { + + return chargingTime; + } + + public void setChargingTime(String chargingTime) { + + this.chargingTime = chargingTime; + } + + public String getRemainingTime() { + + return remainingTime; + } + + public void setRemainingTime(String remainingTime) { + + this.remainingTime = remainingTime; + } + + public Double getChargingDegree() { + + return chargingDegree; + } + + public void setChargingDegree(Double chargingDegree) { + + this.chargingDegree = chargingDegree; + } + + public Double getLossChargingDegree() { + + return lossChargingDegree; + } + + public void setLossChargingDegree(Double lossChargingDegree) { + + this.lossChargingDegree = lossChargingDegree; + } + + public Double getAmountCharged() { + + return amountCharged; + } + + public void setAmountCharged(Double amountCharged) { + + this.amountCharged = amountCharged; + } + + public String getHardwareFault() { + + return hardwareFault; + } + + public void setHardwareFault(String hardwareFault) { + + this.hardwareFault = hardwareFault; + } + + public Integer getStatus() { + + return status; + } + + public void setStatus(Integer status) { + + this.status = status; + } + + @Override + public String getRemark() { + + return remark; + } + + @Override + public void setRemark(String remark) { + + this.remark = remark; + } + + public Long getUserId() { + + return userId; + } + + public void setUserId(Long userId) { + + this.userId = userId; + } + + public Long getChargingStationId() { + + return chargingStationId; + } + + public void setChargingStationId(Long chargingStationId) { + + this.chargingStationId = chargingStationId; + } + + public Integer getType() { + + return type; + } + + public void setType(Integer type) { + + this.type = type; + } + + public Long getRealTimeOrderId() { + + return realTimeOrderId; + } + + public void setRealTimeOrderId(Long realTimeOrderId) { + + this.realTimeOrderId = realTimeOrderId; + } + + public String getGunNumber() { + + return gunNumber; + } + + public void setGunNumber(String gunNumber) { + + this.gunNumber = gunNumber; + } + +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/mapper/XhpcRealTimeOrderMapper.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/mapper/XhpcRealTimeOrderMapper.java new file mode 100644 index 00000000..d89b8c51 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/mapper/XhpcRealTimeOrderMapper.java @@ -0,0 +1,17 @@ +package com.xhpc.order.mapper; + +import com.xhpc.order.domain.XhpcRealTimeOrder; + +/** + * @author yuyang + * @date 2021/8/7 15:09 + */ +public interface XhpcRealTimeOrderMapper { + + /** + * 添加实时充电订单 + * @param xhpcRealTimeOrder + * @return + */ + int addXhpcRealTimeOrder(XhpcRealTimeOrder xhpcRealTimeOrder); +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java new file mode 100644 index 00000000..2700a8ed --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/RecvDemo.java @@ -0,0 +1,77 @@ +package com.xhpc.order.rabbitmq; + +import com.rabbitmq.client.*; +import com.xhpc.order.util.ConnectionRabbitMQUtil; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author yuyang + * @date 2021/8/7 16:45 + */ +public class RecvDemo { + //队列名称 + private final static String NAME="xhpc"; + + public static void main(String[] args) { + try { + // 获取到连接 + Connection connection = ConnectionRabbitMQUtil.getConnection(); + //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 + Channel channel = connection.createChannel(); + // 声明队列 + //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments + /** + * 参数明细 + * 1、queue 队列名称 + * 2、durable 是否持久化,如果持久化,mq重启后队列还在 + * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 + * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) + * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 + */ + channel.queueDeclare(NAME, false, false, false, null); + //实现消费方法 + DefaultConsumer consumer = new DefaultConsumer(channel){ + // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 + /** + * 当接收到消息后此方法将被调用 + * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume + * @param envelope 信封,通过envelope + * @param properties 消息属性 + * @param body 消息内容 + * @throws IOException + */ + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + //测试自动ack,模拟异常 + //int i=1/0; + //交换机 + String exchange = envelope.getExchange(); + //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 + long deliveryTag = envelope.getDeliveryTag(); + // body 即消息体 + String msg = new String(body,"utf-8"); + System.out.println(" [x] received : " + msg + "!"); + } + }; + + // 监听队列,第二个参数:是否自动进行消息确认。 + //参数:String queue, boolean autoAck, Consumer callback + /** + * 参数明细: + * 1、queue 队列名称 + * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 + * 3、callback,消费方法,当消费者接收到消息要执行的方法 + */ + channel.basicConsume(NAME, true, consumer); + + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + }catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java new file mode 100644 index 00000000..8f23eb51 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/rabbitmq/SendDemo.java @@ -0,0 +1,71 @@ +package com.xhpc.order.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.xhpc.order.util.ConnectionRabbitMQUtil; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; +/** + * @author yuyang + * @date 2021/8/7 16:37 + */ +public class SendDemo { + + //队列名称 + private final static String NAME = "xhpc"; + + + public static void main(String[] args) { + + try { + // 1、获取到连接 + Connection connection = ConnectionRabbitMQUtil.getConnection(); + // 2、从连接中创建通道,使用通道才能完成消息相关的操作 + Channel channel = connection.createChannel(); + // 3、声明(创建)队列 + + /** + * 参数明细 + * 1、queue 队列名称 + * 2、durable 是否持久化,如果持久化,mq重启后队列还在 + * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 + * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) + * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 + */ + //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments + channel.queueDeclare(NAME, false, false, false, null); + // 4、消息内容 + String message = "Hello World!!!!"; + // 向指定的队列中发送消息 + //参数:String exchange, String routingKey, BasicProperties props, byte[] body + /** + * 参数明细: + * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") + * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 + * 3、props,消息的属性 + * 4、body,消息内容 + */ + channel.basicPublish("", NAME, null, message.getBytes()); +// // 循环发布任务 +// for (int i = 0; i < 50; i++) { +// // 消息内容 +// String message = "task .. " + i; +// channel.basicPublish("", NAME, null, message.getBytes()); +// System.out.println(" [x] Sent '" + message + "'"); +// +// Thread.sleep(i * 2); +// } + //关闭通道和连接(资源关闭最好用try-catch-finally语句处理) + channel.close(); + connection.close(); + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + + } +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/IXhpcRealTimeOrderService.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/IXhpcRealTimeOrderService.java new file mode 100644 index 00000000..6553855b --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/IXhpcRealTimeOrderService.java @@ -0,0 +1,16 @@ +package com.xhpc.order.service; + +/** + * @author yuyang + * @date 2021/8/7 15:07 + */ +public interface IXhpcRealTimeOrderService { + + /** + * 添加实时订单数据 + * @param orderNo 交易流水号 + * @param status + */ + public void addXhpcRealTimeOrder(String orderNo,Integer status); + +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/impl/XhpcRealTimeOrderServiceImpl.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/impl/XhpcRealTimeOrderServiceImpl.java new file mode 100644 index 00000000..6db457d3 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/service/impl/XhpcRealTimeOrderServiceImpl.java @@ -0,0 +1,28 @@ +package com.xhpc.order.service.impl; + +import com.xhpc.common.redis.service.RedisService; +import com.xhpc.order.service.IXhpcRealTimeOrderService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author yuyang + * @date 2021/8/7 15:07 + */ +@Service +public class XhpcRealTimeOrderServiceImpl implements IXhpcRealTimeOrderService { + + @Autowired + private RedisService redisService; + + /** + * 添加实时数据 + * @param orderNo 交易流水号 + * @param status + */ + @Override + public void addXhpcRealTimeOrder(String orderNo, Integer status) { + + } + +} diff --git a/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/util/ConnectionRabbitMQUtil.java b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/util/ConnectionRabbitMQUtil.java new file mode 100644 index 00000000..6532d177 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/java/com/xhpc/order/util/ConnectionRabbitMQUtil.java @@ -0,0 +1,31 @@ +package com.xhpc.order.util; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * @author yuyang + * @date 2021/8/6 16:04 + */ +public class ConnectionRabbitMQUtil { + /** + * 建立与RabbitMQ的连接 + * @return + * @throws Exception + */ + public static Connection getConnection() throws Exception { + //定义连接工厂 + ConnectionFactory factory = new ConnectionFactory(); + //设置服务地址 + factory.setHost("118.24.137.203"); + //端口 + factory.setPort(5673); + //设置账号信息,用户名、密码、vhost + //factory.setVirtualHost("/guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq + factory.setUsername("admin"); + factory.setPassword("admin"); + // 通过工厂获取连接 + Connection connection = factory.newConnection(); + return connection; + } +} diff --git a/xhpc-modules/xhpc-order/src/main/resources/mapper/XhpcRealTimeOrderMapper.xml b/xhpc-modules/xhpc-order/src/main/resources/mapper/XhpcRealTimeOrderMapper.xml new file mode 100644 index 00000000..452124d0 --- /dev/null +++ b/xhpc-modules/xhpc-order/src/main/resources/mapper/XhpcRealTimeOrderMapper.xml @@ -0,0 +1,210 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + insert into xhpc_real_time_order + + + charging_order_id, + + + transaction_number, + + + pile_number, + + + gun_number, + + + pile_gun_status, + + + vehicle_gun_status, + + + voltage, + + + electric_current, + + + gun_line_temperature, + + + gun_line_number, + + + soc, + + + max_temperature, + + + charging_time, + + + remaining_time, + + + charging_degree, + + + loss_charging_degree, + + + amount_charged, + + + hardware_fault, + + + status, + + + create_time, + + + create_by, + + + update_time, + + + update_by, + + + remark, + + + user_id, + + + type, + + + charging_station_id + + + + + #{chargingOrderId}, + + + #{transactionNumber}, + + + #{pileNumber}, + + + #{gunNumber}, + + + #{pileGunStatus}, + + + #{vehicleGunStatus}, + + + #{voltage}, + + + #{electricCurrent}, + + + #{gunLineTemperature}, + + + #{gunLineNumber}, + + + #{soc}, + + + #{maxTemperature}, + + + #{chargingTime}, + + + #{remainingTime}, + + + #{chargingDegree}, + + + #{lossChargingDegree}, + + + #{amountCharged}, + + + #{hardwareFault}, + + + #{status}, + + + #{createTime}, + + + #{createBy}, + + + #{updateTime}, + + + #{updateBy}, + + + #{remark}, + + + #{userId}, + + + #{type}, + + + #{chargingStationId} + + + + + + \ No newline at end of file