Optimize radar command code
This commit is contained in:
parent
f2f829a2f8
commit
5946d692c5
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -110,49 +111,88 @@ public class SarControlContext {
|
||||
*/
|
||||
private void sendUdp(SarControlDTO control) {
|
||||
SarControlTypeEnum controlType = control.getControlType();
|
||||
String ip = control.getIp();
|
||||
log.debug("开始发送雷达控制指令[{}]", controlType);
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
byte[] content = pack(control);
|
||||
socket.connect(new InetSocketAddress(ip, PORT));
|
||||
DatagramPacket packet = new DatagramPacket(content, content.length);
|
||||
int failCount = 0;
|
||||
SarErrorDTO info = null;
|
||||
String cacheKey = CacheKey.getSarControlBack(ip); // skyeye:sar:control + ip
|
||||
while (info == null && failCount < RETRY_MAX) {
|
||||
socket.send(packet);
|
||||
// 每0.1秒取回执,1秒后超时
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTime < ANSWER_TIMEOUT) {
|
||||
// 从 Caffeine 获取
|
||||
Cache shortCache = cacheManager.getCache("sar-short-lived");
|
||||
info = CacheUtil.get(shortCache, cacheKey, SarErrorDTO.class);
|
||||
if (info != null) {
|
||||
shortCache.evict(cacheKey); // 相当于 del,读取后删除
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(POLLING_INTERVAL);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (info == null) { // 超时未收到回执
|
||||
failCount++;
|
||||
} else if (info.getExecStatus() == 1) {
|
||||
// 只有发送的数据结构错误时才会返回错误状态,并不会因为业务不允许返回错误
|
||||
throw new ServiceException("控制指令[" + controlType + "]执行状态错误,请重试");
|
||||
}
|
||||
}
|
||||
if (info == null) {
|
||||
throw new SarConnectException("控制指令[" + controlType + "]发送失败,雷达[" + ip + "]无应答,请重试");
|
||||
} else {
|
||||
log.info("雷达控制指令[{}]发送完毕----------------------", controlType);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw ServiceException.errorLog("控制指令[" + controlType + "]发送失败 " + ex.getMessage());
|
||||
String targetIp = control.getIp();
|
||||
log.info("准备发送雷达控制指令 | 类型:{} | 目标IP:{}", controlType, targetIp);
|
||||
String cacheKey = CacheKey.getSarControlBack(targetIp);
|
||||
byte[] payload;
|
||||
try {
|
||||
payload = pack(control);
|
||||
} catch (Exception e) {
|
||||
log.error("打包控制指令失败 | 类型:{} | ip:{} | {}", controlType, targetIp, e.getMessage(), e);
|
||||
throw new ServiceException("控制指令打包失败:" + e.getMessage());
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
InetSocketAddress address = new InetSocketAddress(targetIp, PORT);
|
||||
socket.connect(address);
|
||||
DatagramPacket packet = new DatagramPacket(payload, payload.length);
|
||||
SarErrorDTO response = null;
|
||||
int retryCount = 0;
|
||||
while (response == null && retryCount < RETRY_MAX) {
|
||||
// 发送
|
||||
try {
|
||||
socket.send(packet);
|
||||
log.info("UDP指令已发送 | 第{}次尝试 | 类型:{} | ip:{}",
|
||||
retryCount + 1, controlType, targetIp);
|
||||
} catch (IOException e) {
|
||||
log.warn("发送UDP失败 | 第{}次尝试 | {}", retryCount + 1, e.getMessage());
|
||||
retryCount++;
|
||||
continue;
|
||||
}
|
||||
// 等待回执(轮询方式)
|
||||
response = waitForResponse(cacheKey, targetIp, controlType);
|
||||
if (response == null) {
|
||||
retryCount++;
|
||||
log.warn("第{}次发送未收到回执 | 类型:{} | ip:{}", retryCount, controlType, targetIp);
|
||||
}
|
||||
}
|
||||
// 最终判断
|
||||
if (response == null) {
|
||||
String msg = String.format("雷达[%s]控制指令[%s]发送失败:超时无应答", targetIp, controlType);
|
||||
log.error(msg);
|
||||
throw new SarConnectException(msg + ",请重试");
|
||||
}
|
||||
// 业务状态判断
|
||||
if (response.getExecStatus() == 1) {
|
||||
String msg = String.format("雷达[%s]控制指令[%s]执行失败:状态码=1(数据结构错误?)",
|
||||
targetIp, controlType);
|
||||
log.error(msg);
|
||||
throw new ServiceException(msg + ",请检查指令内容后重试");
|
||||
}
|
||||
log.info("雷达控制指令发送成功 | 类型:{} | ip:{} | 状态:{}", controlType, targetIp, response.getExecStatus());
|
||||
} catch (IOException e) {
|
||||
String msg = String.format("雷达控制指令[%s]网络异常 | ip:%s | %s", controlType, targetIp, e.getMessage());
|
||||
log.error(msg, e);
|
||||
throw ServiceException.errorLog(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询等待缓存中的回执(带超时)
|
||||
*/
|
||||
private SarErrorDTO waitForResponse(String cacheKey, String ip, SarControlTypeEnum controlType) {
|
||||
Cache sarShortCache = cacheManager.getCache("sar-short-lived");
|
||||
if (sarShortCache == null) {
|
||||
log.error("无法获取 sar-short-lived cache");
|
||||
return null;
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < ANSWER_TIMEOUT) {
|
||||
SarErrorDTO dto = CacheUtil.get(sarShortCache, cacheKey, SarErrorDTO.class);
|
||||
if (dto != null) {
|
||||
// 读后即删,防止重复消费
|
||||
sarShortCache.evict(cacheKey);
|
||||
log.info("收到雷达回执 | ip:{} | 控制类型:{} | 状态:{}", ip, controlType, dto.getExecStatus());
|
||||
return dto;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(POLLING_INTERVAL);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("等待雷达回执时被中断 | ip:{}", ip);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null; // 超时
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Loading…
Reference in New Issue
Block a user