RabbitMQ Demo

This commit is contained in:
yuyang 2021-08-07 17:29:45 +08:00
parent 2656a69b3e
commit c1b585936e
9 changed files with 805 additions and 1 deletions

View File

@ -21,7 +21,11 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<!-- SpringCloud Alibaba Nacos --> <!-- SpringCloud Alibaba Nacos -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>

View File

@ -0,0 +1,350 @@
package com.xhpc.order.domain;
import com.xhpc.common.core.web.domain.BaseEntity;
/**
* @author yuyang
* @date 2021/8/7 15:06
*/
public class XhpcRealTimeOrder extends BaseEntity {
/**
* 充电订单实时数据
*/
private Long realTimeOrderId;
/**
* 充电订单id
*/
private Long chargingOrderId;
/**
* 交易流水号
*/
private String transactionNumber;
/**
* 桩编号
*/
private String pileNumber;
/**
* 枪编号
*/
private String gunNumber;
/**
* 枪是否归位0否 1
*/
private Integer pileGunStatus;
/**
* 是否插枪0否 1是
*/
private Long vehicleGunStatus;
/**
* 电压
*/
private Double voltage;
/**
* 电流
*/
private Double electricCurrent;
/**
* 枪线温度
*/
private Double gunLineTemperature;
/**
* 枪线编码
*/
private Double gunLineNumber;
/**
* SOC
*/
private String soc;
/**
* 电池组最高温度
*/
private Double maxTemperature;
/**
* 累计充电时间
*/
private String chargingTime;
/**
* 剩余时间
*/
private String remainingTime;
/**
* 充电度数
*/
private Double chargingDegree;
/**
* 计损充电度数
*/
private Double lossChargingDegree;
/**
* 已充金额
*/
private Double amountCharged;
/**
* 硬件故障
*/
private String hardwareFault;
/**
* (状态0离线 1故障 2空闲 3充电)
*/
private Integer status;
/**
* 备注
*/
private String remark;
/**
* 充电用户id
*/
private Long userId;
/**
* 场站id
*/
private Long chargingStationId;
/**
* 0桩停止充电 1 远程停止充电 2充电中
*/
private Integer type;
public Long getChargingOrderId() {
return chargingOrderId;
}
public void setChargingOrderId(Long chargingOrderId) {
this.chargingOrderId = chargingOrderId;
}
public String getTransactionNumber() {
return transactionNumber;
}
public void setTransactionNumber(String transactionNumber) {
this.transactionNumber = transactionNumber;
}
public String getPileNumber() {
return pileNumber;
}
public void setPileNumber(String pileNumber) {
this.pileNumber = pileNumber;
}
public Integer getPileGunStatus() {
return pileGunStatus;
}
public void setPileGunStatus(Integer pileGunStatus) {
this.pileGunStatus = pileGunStatus;
}
public Long getVehicleGunStatus() {
return vehicleGunStatus;
}
public void setVehicleGunStatus(Long vehicleGunStatus) {
this.vehicleGunStatus = vehicleGunStatus;
}
public Double getVoltage() {
return voltage;
}
public void setVoltage(Double voltage) {
this.voltage = voltage;
}
public Double getElectricCurrent() {
return electricCurrent;
}
public void setElectricCurrent(Double electricCurrent) {
this.electricCurrent = electricCurrent;
}
public Double getGunLineTemperature() {
return gunLineTemperature;
}
public void setGunLineTemperature(Double gunLineTemperature) {
this.gunLineTemperature = gunLineTemperature;
}
public Double getGunLineNumber() {
return gunLineNumber;
}
public void setGunLineNumber(Double gunLineNumber) {
this.gunLineNumber = gunLineNumber;
}
public String getSoc() {
return soc;
}
public void setSoc(String soc) {
this.soc = soc;
}
public Double getMaxTemperature() {
return maxTemperature;
}
public void setMaxTemperature(Double maxTemperature) {
this.maxTemperature = maxTemperature;
}
public String getChargingTime() {
return chargingTime;
}
public void setChargingTime(String chargingTime) {
this.chargingTime = chargingTime;
}
public String getRemainingTime() {
return remainingTime;
}
public void setRemainingTime(String remainingTime) {
this.remainingTime = remainingTime;
}
public Double getChargingDegree() {
return chargingDegree;
}
public void setChargingDegree(Double chargingDegree) {
this.chargingDegree = chargingDegree;
}
public Double getLossChargingDegree() {
return lossChargingDegree;
}
public void setLossChargingDegree(Double lossChargingDegree) {
this.lossChargingDegree = lossChargingDegree;
}
public Double getAmountCharged() {
return amountCharged;
}
public void setAmountCharged(Double amountCharged) {
this.amountCharged = amountCharged;
}
public String getHardwareFault() {
return hardwareFault;
}
public void setHardwareFault(String hardwareFault) {
this.hardwareFault = hardwareFault;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
@Override
public String getRemark() {
return remark;
}
@Override
public void setRemark(String remark) {
this.remark = remark;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getChargingStationId() {
return chargingStationId;
}
public void setChargingStationId(Long chargingStationId) {
this.chargingStationId = chargingStationId;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public Long getRealTimeOrderId() {
return realTimeOrderId;
}
public void setRealTimeOrderId(Long realTimeOrderId) {
this.realTimeOrderId = realTimeOrderId;
}
public String getGunNumber() {
return gunNumber;
}
public void setGunNumber(String gunNumber) {
this.gunNumber = gunNumber;
}
}

View File

@ -0,0 +1,17 @@
package com.xhpc.order.mapper;
import com.xhpc.order.domain.XhpcRealTimeOrder;
/**
* @author yuyang
* @date 2021/8/7 15:09
*/
public interface XhpcRealTimeOrderMapper {
/**
* 添加实时充电订单
* @param xhpcRealTimeOrder
* @return
*/
int addXhpcRealTimeOrder(XhpcRealTimeOrder xhpcRealTimeOrder);
}

View File

@ -0,0 +1,77 @@
package com.xhpc.order.rabbitmq;
import com.rabbitmq.client.*;
import com.xhpc.order.util.ConnectionRabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author yuyang
* @date 2021/8/7 16:45
*/
public class RecvDemo {
//队列名称
private final static String NAME="xhpc";
public static void main(String[] args) {
try {
// 获取到连接
Connection connection = ConnectionRabbitMQUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1queue 队列名称
* 2durable 是否持久化如果持久化mq重启后队列还在
* 3exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除
* 5arguments 参数可以设置一个队列的扩展参数比如可设置存活时间
*/
channel.queueDeclare(NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume
* @param envelope 信封通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//测试自动ack模拟异常
//int i=1/0;
//交换机
String exchange = envelope.getExchange();
//消息idmq在channel中用来标识消息的id可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!");
}
};
// 监听队列第二个参数是否自动进行消息确认
//参数String queue, boolean autoAck, Consumer callback
/**
* 参数明细
* 1queue 队列名称
* 2autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复
* 3callback消费方法当消费者接收到消息要执行的方法
*/
channel.basicConsume(NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,71 @@
package com.xhpc.order.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xhpc.order.util.ConnectionRabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author yuyang
* @date 2021/8/7 16:37
*/
public class SendDemo {
//队列名称
private final static String NAME = "xhpc";
public static void main(String[] args) {
try {
// 1获取到连接
Connection connection = ConnectionRabbitMQUtil.getConnection();
// 2从连接中创建通道使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3声明创建队列
/**
* 参数明细
* 1queue 队列名称
* 2durable 是否持久化如果持久化mq重启后队列还在
* 3exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除
* 5arguments 参数可以设置一个队列的扩展参数比如可设置存活时间
*/
//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(NAME, false, false, false, null);
// 4消息内容
String message = "Hello World!!!!";
// 向指定的队列中发送消息
//参数String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细
* 1exchange交换机如果不指定将使用mq的默认交换机设置为""
* 2routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称
* 3props消息的属性
* 4body消息内容
*/
channel.basicPublish("", NAME, null, message.getBytes());
// // 循环发布任务
// for (int i = 0; i < 50; i++) {
// // 消息内容
// String message = "task .. " + i;
// channel.basicPublish("", NAME, null, message.getBytes());
// System.out.println(" [x] Sent '" + message + "'");
//
// Thread.sleep(i * 2);
// }
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,16 @@
package com.xhpc.order.service;
/**
* @author yuyang
* @date 2021/8/7 15:07
*/
public interface IXhpcRealTimeOrderService {
/**
* 添加实时订单数据
* @param orderNo 交易流水号
* @param status
*/
public void addXhpcRealTimeOrder(String orderNo,Integer status);
}

View File

@ -0,0 +1,28 @@
package com.xhpc.order.service.impl;
import com.xhpc.common.redis.service.RedisService;
import com.xhpc.order.service.IXhpcRealTimeOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author yuyang
* @date 2021/8/7 15:07
*/
@Service
public class XhpcRealTimeOrderServiceImpl implements IXhpcRealTimeOrderService {
@Autowired
private RedisService redisService;
/**
* 添加实时数据
* @param orderNo 交易流水号
* @param status
*/
@Override
public void addXhpcRealTimeOrder(String orderNo, Integer status) {
}
}

View File

@ -0,0 +1,31 @@
package com.xhpc.order.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author yuyang
* @date 2021/8/6 16:04
*/
public class ConnectionRabbitMQUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("118.24.137.203");
//端口
factory.setPort(5673);
//设置账号信息用户名密码vhost
//factory.setVirtualHost("/guest");//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mq
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}

View File

@ -0,0 +1,210 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xhpc.order.mapper.XhpcRealTimeOrderMapper">
<resultMap type="com.xhpc.order.domain.XhpcRealTimeOrder" id="XhpcRealTimeOrderMap">
<result column="real_time_order_id" property="realTimeOrderId"/>
<result column="charging_order_id" property="chargingOrderId"/>
<result column="transaction_number" property="transactionNumber"/>
<result column="pile_number" property="pileNumber"/>
<result column="gun_number" property="gunNumber"/>
<result column="pile_gun_status" property="pileGunStatus"/>
<result column="vehicle_gun_status" property="vehicleGunStatus"/>
<result column="voltage" property="voltage"/>
<result column="electric_current" property="electricCurrent"/>
<result column="gun_line_temperature" property="gunLineTemperature"/>
<result column="gun_line_number" property="gunLineNumber"/>
<result column="soc" property="soc"/>
<result column="max_temperature" property="maxTemperature"/>
<result column="charging_time" property="chargingTime"/>
<result column="remaining_time" property="remainingTime"/>
<result column="charging_degree" property="chargingDegree"/>
<result column="loss_charging_degree" property="lossChargingDegree"/>
<result column="amount_charged" property="amountCharged"/>
<result column="hardware_fault" property="hardwareFault"/>
<result column="status" property="status"/>
<result column="create_time" property="createTime"/>
<result column="create_by" property="createBy"/>
<result column="update_time" property="updateTime"/>
<result column="update_by" property="updateBy"/>
<result column="remark" property="remark"/>
<result column="user_id" property="userId"/>
<result column="type" property="type"/>
<result column="charging_station_id" property="chargingStationId"/>
</resultMap>
<insert id="addXhpcRealTimeOrder" parameterType="com.xhpc.order.domain.XhpcRealTimeOrder" useGeneratedKeys="true"
keyProperty="realTimeOrderId">
insert into xhpc_real_time_order
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="null != chargingOrderId ">
charging_order_id,
</if>
<if test="null != transactionNumber ">
transaction_number,
</if>
<if test="null != pileNumber ">
pile_number,
</if>
<if test="null != gunNumber ">
gun_number,
</if>
<if test="null != pileGunStatus ">
pile_gun_status,
</if>
<if test="null != vehicleGunStatus ">
vehicle_gun_status,
</if>
<if test="null != voltage ">
voltage,
</if>
<if test="null != electricCurrent ">
electric_current,
</if>
<if test="null != gunLineTemperature ">
gun_line_temperature,
</if>
<if test="null != gunLineNumber ">
gun_line_number,
</if>
<if test="null != soc ">
soc,
</if>
<if test="null != maxTemperature ">
max_temperature,
</if>
<if test="null != chargingTime ">
charging_time,
</if>
<if test="null != remainingTime ">
remaining_time,
</if>
<if test="null != chargingDegree ">
charging_degree,
</if>
<if test="null != lossChargingDegree ">
loss_charging_degree,
</if>
<if test="null != amountCharged ">
amount_charged,
</if>
<if test="null != hardwareFault ">
hardware_fault,
</if>
<if test="null != status ">
status,
</if>
<if test="null != createTime ">
create_time,
</if>
<if test="null != createBy ">
create_by,
</if>
<if test="null != updateTime ">
update_time,
</if>
<if test="null != updateBy ">
update_by,
</if>
<if test="null != remark ">
remark,
</if>
<if test="null != userId ">
user_id,
</if>
<if test="null != type ">
type,
</if>
<if test="null != chargingStationId ">
charging_station_id
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="null != chargingOrderId ">
#{chargingOrderId},
</if>
<if test="null != transactionNumber ">
#{transactionNumber},
</if>
<if test="null != pileNumber ">
#{pileNumber},
</if>
<if test="null != gunNumber ">
#{gunNumber},
</if>
<if test="null != pileGunStatus ">
#{pileGunStatus},
</if>
<if test="null != vehicleGunStatus ">
#{vehicleGunStatus},
</if>
<if test="null != voltage ">
#{voltage},
</if>
<if test="null != electricCurrent ">
#{electricCurrent},
</if>
<if test="null != gunLineTemperature ">
#{gunLineTemperature},
</if>
<if test="null != gunLineNumber ">
#{gunLineNumber},
</if>
<if test="null != soc ">
#{soc},
</if>
<if test="null != maxTemperature ">
#{maxTemperature},
</if>
<if test="null != chargingTime ">
#{chargingTime},
</if>
<if test="null != remainingTime ">
#{remainingTime},
</if>
<if test="null != chargingDegree ">
#{chargingDegree},
</if>
<if test="null != lossChargingDegree ">
#{lossChargingDegree},
</if>
<if test="null != amountCharged ">
#{amountCharged},
</if>
<if test="null != hardwareFault ">
#{hardwareFault},
</if>
<if test="null != status ">
#{status},
</if>
<if test="null != createTime ">
#{createTime},
</if>
<if test="null != createBy ">
#{createBy},
</if>
<if test="null != updateTime ">
#{updateTime},
</if>
<if test="null != updateBy ">
#{updateBy},
</if>
<if test="null != remark ">
#{remark},
</if>
<if test="null != userId ">
#{userId},
</if>
<if test="null != type ">
#{type},
</if>
<if test="null != chargingStationId ">
#{chargingStationId}
</if>
</trim>
</insert>
</mapper>