package com.smartrm.smartrmmonolith.device.adapter.cloud;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.smartrm.smartrmmonolith.device.domain.event.DeviceFailure;
import com.smartrm.smartrmmonolith.device.domain.event.DeviceFailureCode;
import com.smartrm.smartrmmonolith.device.domain.event.DeviceFailureEvent;
import com.smartrm.smartrmmonolith.device.domain.event.PopSuccessEvent;
import com.smartrm.smartrmmonolith.device.domain.iot.IoTDeviceId;
import com.smartrm.smartrmmonolith.device.domain.repository.VendingMachineRepository;
import com.smartrm.smartrmmonolith.device.domain.slot.SlotVendingMachine;
import com.smartrm.smartrmmonolith.infracore.event.DomainEventBus;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class AmqpClient {
private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue(50000));
//iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。
private static String iotInstanceId = "";
// 指定单个进程启动的连接数
// 单个连接消费速率有限,请参考使用限制,最大64个连接
// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接
private static int connectionCount = 4;
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}
@Override
public void onSessionClosed(Session session, Throwable cause) {
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
}
};
@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecret}")
private String accessSecret;
@Value("${aliyun.iot.consumerGroup}")
private String consumerGroupId;
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
private String clientId = UUID.randomUUID().toString();
//${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
@Value("${aliyun.iot.amqp.host}")
private String host;
@Autowired
private VendingMachineRepository machineRepository;
@Autowired
private DomainEventBus eventBus;
private List<Connection> connections = new ArrayList<>();
private MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(final Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(new Runnable() {
@Override
public void run() {
processMessage(message);
}
});
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod)
throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
@PostConstruct
public void start() throws Exception {
//参数说明,请参见AMQP客户端接入说明文档。
for (int i = 0; i < connectionCount; i++) {
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent, accessSecret, signMethod);
String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(C
没有合适的资源?快使用搜索试试~ 我知道了~
领域驱动设计——实战落地代码【基于SpringBoot的单体项目】.zip
共460个文件
java:271个
jpg:58个
png:55个
需积分: 5 0 下载量 45 浏览量
2024-08-14
10:06:16
上传
评论
收藏 7.31MB ZIP 举报
温馨提示
项目工程资源经过严格测试可直接运行成功且功能正常的情况才上传,可轻松copy复刻,拿到资料包后可轻松复现出一样的项目,本人系统开发经验充足(全栈开发),有任何使用问题欢迎随时与我联系,我会及时为您解惑,提供帮助 【资源内容】:项目具体内容可查看/点击本页面下方的*资源详情*,包含完整源码+工程文件+说明(若有)等。【若无积分,此资源可私信获取】 【本人专注IT领域】:有任何使用问题欢迎随时与我联系,我会及时解答,第一时间为您提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 【适合场景】:相关项目设计中,皆可应用在项目开发、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面中 可借鉴此优质项目实现复刻,也可基于此项目来扩展开发出更多功能 #注 1. 本资源仅用于开源学习和技术交流。不可商用等,一切后果由使用者承担 2. 部分字体及插图等来自网络,若是侵权请联系删除,本人不对所涉及的版权问题或内容负法律责任。收取的费用仅用于收集和整理资料耗费时间的酬劳
资源推荐
资源详情
资源评论
收起资源包目录
领域驱动设计——实战落地代码【基于SpringBoot的单体项目】.zip (460个子文件)
.gitignore 2KB
.gitignore 0B
k3cloud-webapi-sdk.7.6.x.jar 258KB
AmqpClient.java 12KB
VendingMachineRepositoryImpl.java 11KB
AppTradeService.java 8KB
SmartrmMonolithApplicationTests.java 8KB
SlotVendingMachine.java 8KB
TradeVendingMachineRepositoryImpl.java 7KB
PayServiceImpl.java 6KB
Payment.java 6KB
FPOOrderEntry.java 6KB
OrderRepositoryImpl.java 6KB
VendingMachineDeviceServiceImpl.java 5KB
CabinetVendingMachine.java 4KB
CabinetVendingMachine.java 4KB
NormalSlotVendingMachine.java 4KB
Order.java 4KB
DevicePurchaseServiceImpl.java 4KB
CabinetVendingMachine.java 4KB
PaymentRepositoryImpl.java 4KB
CommodityInfoDto.java 4KB
DevicePurchaseServiceImpl.java 4KB
DateParser.java 4KB
ExecutorJobScheduler.java 4KB
AppUserServiceImpl.java 4KB
PropertySchema.java 3KB
Commodity.java 3KB
VendingMachineModelDo.java 3KB
VendingMachineModelDo.java 3KB
SimpleEventBusImpl.java 3KB
StockedCommodity.java 3KB
CommodityRepositoryJsonReader.java 3KB
ERPPurchaseOrder.java 3KB
WxPaymentClientImpl.java 3KB
DeviceSimulatorController.java 3KB
SlotVendingMachine.java 3KB
UserAccount.java 3KB
PaymentDo.java 3KB
WebSecurityConfig.java 2KB
WxContractSigningNotification.java 2KB
UserInfo.java 2KB
TradeController.java 2KB
WxPaymentSuccessDto.java 2KB
CommonExceptionHandler.java 2KB
OrderInfo.java 2KB
PaymentController.java 2KB
RetryExecutorBase.java 2KB
UserAuthenticationFilter.java 2KB
IoTDeviceServiceImpl.java 2KB
RefundExecutor.java 2KB
JsonPropertyConvertor.java 2KB
PaymentStateChangeEventHandler.java 2KB
UserController.java 2KB
MockPaymentPlatformClientImpl.java 2KB
OrderSucceededEvent.java 2KB
UserAccountRepositoryImpl.java 2KB
ImageUrlPropertyJsonParser.java 2KB
MonolithTradePayServiceImpl.java 2KB
OrderMapper.java 2KB
DeviceFailureExecutor.java 2KB
PlainTextPropertyConvertor.java 2KB
DateTimePropertyJsonParser.java 2KB
CommoditySchemaImpl.java 2KB
CommonResponse.java 2KB
PaymentSimulatorController.java 2KB
DatePropertyJsonParser.java 2KB
MonolithTradeDeviceServiceImpl.java 2KB
WechatPayConfig.java 2KB
VendingMachineDeviceService.java 2KB
VendingMachineDo.java 2KB
CabinetVendingMachineLockedEvent.java 2KB
VendingMachine.java 2KB
CabinetVendingMachineLockedEvent.java 2KB
PaymentMapper.java 2KB
CriteriaField.java 2KB
CommodityDumper.java 2KB
JwtUtil.java 2KB
InMemoryCommodityRepositoryImpl.java 1KB
DomainAuthenticationEntryPoint.java 1KB
CommodityServiceImpl.java 1KB
MockCallbackController.java 1KB
ImageUrlPropertyTextParser.java 1KB
CabinetVendingMachineLockedEvent.java 1KB
DomainAccessDeniedHandler.java 1KB
PayService.java 1KB
TradeCommodityService.java 1KB
CommodityController.java 1KB
PaymentEventHandler.java 1KB
RefundSpecification.java 1KB
VendingMachineInstallOrder.java 1KB
MonolithTradeCommodityServiceImpl.java 1KB
MapPropertyJsonParser.java 1KB
OrderCreatedEvent.java 1KB
InstallOrderCreatedEvent.java 1KB
VendingMachineCommodityListDto.java 1KB
DeviceFailureEvent.java 1KB
WxPaymentResultNotificationDto.java 1KB
DeviceFailureEvent.java 1KB
PaymentStateChangeEvent.java 1KB
共 460 条
- 1
- 2
- 3
- 4
- 5
资源评论
热爱技术。
- 粉丝: 2609
- 资源: 7860
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功