|
@@ -19,9 +19,12 @@ package org.jeecg.modules.api;
|
|
|
*/
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.aspectj.lang.ProceedingJoinPoint;
|
|
|
+import org.aspectj.lang.annotation.Around;
|
|
|
+import org.aspectj.lang.annotation.Pointcut;
|
|
|
+import org.jeecg.boot.starter.rabbitmq.client.RabbitMqClient;
|
|
|
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
|
|
|
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
|
|
|
import org.jeecg.common.annotation.RabbitComponent;
|
|
@@ -35,22 +38,20 @@ import org.jeecg.modules.admin_order.entity.AdminOrder;
|
|
|
import org.jeecg.modules.admin_order.service.IAdminOrderService;
|
|
|
import org.jeecg.modules.admin_security_check.entity.AdminSecurityCheck;
|
|
|
import org.jeecg.modules.admin_security_check.service.IAdminSecurityCheckService;
|
|
|
-import org.jeecg.modules.admin_user_staff.entity.AdminUserStaff;
|
|
|
+import org.jeecg.modules.utils.ConversionUtil;
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
-import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
import org.springframework.amqp.support.AmqpHeaders;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.Profile;
|
|
|
import org.springframework.messaging.handler.annotation.Header;
|
|
|
import org.w3c.dom.Document;
|
|
|
import org.w3c.dom.Node;
|
|
|
import org.w3c.dom.NodeList;
|
|
|
import org.xml.sax.InputSource;
|
|
|
-import org.xml.sax.SAXException;
|
|
|
|
|
|
import javax.xml.parsers.DocumentBuilderFactory;
|
|
|
-import javax.xml.parsers.ParserConfigurationException;
|
|
|
import javax.xml.transform.OutputKeys;
|
|
|
import javax.xml.transform.Transformer;
|
|
|
import javax.xml.transform.TransformerFactory;
|
|
@@ -60,7 +61,6 @@ import javax.xml.xpath.XPath;
|
|
|
import javax.xml.xpath.XPathConstants;
|
|
|
import javax.xml.xpath.XPathExpression;
|
|
|
import javax.xml.xpath.XPathFactory;
|
|
|
-import java.io.IOException;
|
|
|
import java.io.StringReader;
|
|
|
import java.io.StringWriter;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
@@ -85,7 +85,18 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
private AdminDfdlMqService adminDfdlMqService;
|
|
|
@Autowired
|
|
|
private RedisUtil redisUtil;
|
|
|
+ @Autowired
|
|
|
+ private RabbitMqClient rabbitMqClient;
|
|
|
+
|
|
|
+ @Value("${airport.mq.msgHandlerEvent}")
|
|
|
+ private boolean msgHandlerEvent;
|
|
|
+
|
|
|
+ // 过期时间 7天 单位秒
|
|
|
+ private static final Long EXPIRATION_TIME = 60480L;
|
|
|
|
|
|
+ public DemoRabbitMqListener3(){
|
|
|
+ System.out.println("¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥MQ创建了¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥");
|
|
|
+ }
|
|
|
|
|
|
@RabbitListener(queues = "test")
|
|
|
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
|
@@ -145,12 +156,19 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
}
|
|
|
|
|
|
//3.0版本
|
|
|
- //@RabbitListener(queues = "test3")
|
|
|
+ @RabbitListener(queues = "test3")
|
|
|
public void onMessage3(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
|
|
super.onMessage( message, deliveryTag, channel, new MqListener<Message>() {
|
|
|
@Override
|
|
|
public void handler(Message message2, Channel channel) {
|
|
|
String xmlString = new String( message2.getBody(), StandardCharsets.UTF_8 );
|
|
|
+
|
|
|
+ if (msgHandlerEvent){
|
|
|
+ //转发消息到消息总栈测试, 避免本地测试消费消息
|
|
|
+ BaseMap params = new BaseMap();
|
|
|
+ params.put("xml", xmlString);
|
|
|
+ rabbitMqClient.sendMessage("Test3HandlerEvent", params);
|
|
|
+ }
|
|
|
// 将XML字符串转换为字符流
|
|
|
StringReader sr = new StringReader( xmlString );
|
|
|
// 读取XML文档
|
|
@@ -163,9 +181,12 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
XPath xpath = xPathfactory.newXPath();
|
|
|
// 使用XPath表达式获取 节点的值
|
|
|
String STYP = (String) xpath.evaluate( "/MSG/META/STYP", doc, XPathConstants.STRING );
|
|
|
-
|
|
|
+ log.info( "接收到消息,消息类型STYP={},消息数据:",STYP);
|
|
|
+ log.info("json={}",ConversionUtil.xmlToJson(xmlString));
|
|
|
+ log.info("XML={}",xmlString);
|
|
|
// 航班机位动态信息更新(STLS)
|
|
|
if (STYP.equals( "STLS" )) {
|
|
|
+ log.info("开始处理【STLS】航班机位动态信息更新消息");
|
|
|
//航班标识 (航班号)
|
|
|
String FFID = (String) xpath.evaluate( "/MSG/DFLT/FFID", doc,
|
|
|
XPathConstants.STRING );
|
|
@@ -176,7 +197,7 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
String lastChar = FFID.substring( FFID.length() - 1 );
|
|
|
//这是过滤掉 D出站的
|
|
|
if (lastChar.equals( "A" )) {
|
|
|
- log.info( "STLS::" + xmlString );
|
|
|
+ log.info("执行进站逻辑开始");
|
|
|
//机位编号
|
|
|
String CODE = (String) xpath.evaluate( "/MSG/DFLT/STLS/STND/CODE", doc,
|
|
|
XPathConstants.STRING );
|
|
@@ -227,12 +248,15 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
// one.setType( "使用中" );
|
|
|
adminAircraftPositionService.updateById( one );
|
|
|
}
|
|
|
+ log.info("执行进站逻辑结束");
|
|
|
}
|
|
|
+ log.info("结束处理【STLS】航班机位动态信息更新消息");
|
|
|
}
|
|
|
|
|
|
//航班本站起飞消息(DEPE) 飞机起飞事件
|
|
|
if (STYP.equals( "DEPE" )) {
|
|
|
- log.info( "DEPE::" + xmlString );
|
|
|
+ //log.info( "DEPE::" + xmlString );
|
|
|
+ log.info("开始处理【DEPE】航班本站起飞消息");
|
|
|
//航班唯一编号
|
|
|
String FLID = (String) xpath.evaluate( "/MSG/DFLT/FLID", doc,
|
|
|
XPathConstants.STRING );
|
|
@@ -287,11 +311,12 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
}
|
|
|
}
|
|
|
// }
|
|
|
+ log.info("结束处理【DEPE】航班本站起飞消息");
|
|
|
}
|
|
|
|
|
|
//动态航班整表同步事件(DFDL)
|
|
|
if (STYP.equals( "DFDL" )) {
|
|
|
- log.info( "DFDL::" + xmlString );
|
|
|
+ log.info("开始处理【DFDL】动态航班整表同步事件消息");
|
|
|
redisUtil.set( "DFDL", xmlString );
|
|
|
AdminDfdlMq adminDfdlMq = new AdminDfdlMq();
|
|
|
adminDfdlMq.setId( "1" );
|
|
@@ -313,11 +338,12 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
adminAircraftPositionService.updateById( adminAircraftPosition );
|
|
|
//存入缓存 key为关联航班ID(起飞用) value为机位id 为后续清除机位存入缓存
|
|
|
// if (!redisUtil.hasKey( adminAircraftPosition.getFlightAfid() )) {
|
|
|
- redisUtil.set( adminAircraftPosition.getFlightAfid(), adminAircraftPosition.getId() );
|
|
|
+ redisUtil.set( adminAircraftPosition.getFlightAfid(), adminAircraftPosition.getId(), EXPIRATION_TIME );
|
|
|
// }
|
|
|
}
|
|
|
log.info( "同步后::" + adminAircraftPosition.toString() );
|
|
|
}
|
|
|
+ log.info("结束处理【DFDL】动态航班整表同步事件消息");
|
|
|
|
|
|
/* XPathExpression expr = xpath.compile( "//DFLT" );
|
|
|
NodeList nodes = (NodeList) expr.evaluate( doc, XPathConstants.NODESET );
|
|
@@ -432,7 +458,8 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
//航班到达本站消息(ARRE)
|
|
|
|
|
|
if (STYP.equals( "ARRE" )) {
|
|
|
- log.info( "ARRE::" + xmlString );
|
|
|
+ log.info("开始处理【ARRE】航班到达本站消息");
|
|
|
+ //log.info( "ARRE::" + xmlString );
|
|
|
//航班唯一编号
|
|
|
String FLID = (String) xpath.evaluate( "/MSG/DFLT/FLID", doc,
|
|
|
XPathConstants.STRING );
|
|
@@ -456,10 +483,10 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
String dfdl = (String) redisUtil.get( "DFDL" );
|
|
|
xmldfdl = XMLDFDL( dfdl, FLID );
|
|
|
}
|
|
|
- //变化
|
|
|
+ //变化 判断是否有航班变更
|
|
|
if (redisUtil.hasKey( "CFCE_" + FLID )) {
|
|
|
String CFCE = (String) redisUtil.get( "CFCE_" + FLID );
|
|
|
- log.info( "取变更____CFCE______" + CFCE );
|
|
|
+ log.info( "有航班变更,取变更____CFCE______" + CFCE );
|
|
|
xmldfdl2 = XMLCFCE( CFCE );
|
|
|
xmldfdl.setAircraftNum( xmldfdl2.getAircraftNum() );
|
|
|
xmldfdl.setFlightNum( xmldfdl2.getFlightNum() );
|
|
@@ -477,7 +504,7 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
// }
|
|
|
//存入缓存 key为关联航班ID(起飞用) value为机位id 为后续清除机位存入缓存
|
|
|
// if (!redisUtil.hasKey( xmldfdl.getFlightAfid() )) {
|
|
|
- redisUtil.set( xmldfdl.getFlightAfid(), one.getId() );
|
|
|
+ redisUtil.set( xmldfdl.getFlightAfid(), one.getId(),EXPIRATION_TIME );
|
|
|
// }
|
|
|
if (xmldfdl != null) {
|
|
|
one.setAircraftNum( xmldfdl.getAircraftNum() );
|
|
@@ -488,6 +515,7 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
one.setType( "使用中" );
|
|
|
adminAircraftPositionService.updateById( one );
|
|
|
}
|
|
|
+ log.info("结束处理【ARRE】航班到达本站消息");
|
|
|
}
|
|
|
|
|
|
//航班动态信息更新(DFUE)
|
|
@@ -504,15 +532,18 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
|
|
|
//航班更换飞机消息(CFCE)
|
|
|
if (STYP.equals( "CFCE" )) {
|
|
|
- log.info( "CFCE::" + xmlString );
|
|
|
+ log.info("开始处理【CFCE】航班更换飞机消息");
|
|
|
+ //log.info( "CFCE::" + xmlString );
|
|
|
//航班唯一编号
|
|
|
String FLID = (String) xpath.evaluate( "/MSG/DFLT/FLID", doc,
|
|
|
XPathConstants.STRING );
|
|
|
//存入缓存 待落地是取计划里面的数据时变更
|
|
|
// if (!redisUtil.hasKey( "CFCE_" + FLID )) {
|
|
|
//
|
|
|
- redisUtil.set( "CFCE_" + FLID, xmlString );
|
|
|
+ redisUtil.set( "CFCE_" + FLID, xmlString, EXPIRATION_TIME );
|
|
|
+ log.info("存入Redis key: CFCE_{}", FLID);
|
|
|
// }
|
|
|
+ log.info("结束处理【CFCE】航班更换飞机消息");
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -591,7 +622,7 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
// 航班唯一编号
|
|
|
// String FLID = (String) xpath.evaluate( "/DFLT/FLID", doc2, XPathConstants.STRING );
|
|
|
// 关联航班ID AFID
|
|
|
-// String AFID = (String) xpath.evaluate( "/MSG/DFLT/AFID", doc, XPathConstants.STRING );
|
|
|
+ String AFID = (String) xpath.evaluate( "/MSG/DFLT/AFID", doc, XPathConstants.STRING );
|
|
|
//航班标识
|
|
|
String FFID = (String) xpath.evaluate( "/MSG/DFLT/FFID", doc, XPathConstants.STRING );
|
|
|
String[] parts = FFID.split( "-" ); // 分割字符串
|
|
@@ -601,7 +632,7 @@ public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<Object> {
|
|
|
|
|
|
adminAircraftPosition.setAircraftNum( CFNO );
|
|
|
adminAircraftPosition.setFlightNum( HBH );
|
|
|
-// adminAircraftPosition.setFlightAfid( AFID );
|
|
|
+ adminAircraftPosition.setFlightAfid( AFID );
|
|
|
return adminAircraftPosition;
|
|
|
|
|
|
} catch (Exception e) {
|