1、主要是基于mqtt的消息推送服务,本系统的设计采用springboot+mqtt来实现消息推送
2、采用kafka作为数据获取后暂存点,可以通过接口获取数据(sub),或则通过客户端发送到mqtt订阅的方式获取对应的消息
3、消息传输格式采用json,内部定义了一些常用的消息作为本服务的一个约束
4、需要搭建mqtt服务器,集群或则单机,具体搭建需要参考mqtt
5、这边项目还需要redis作为缓存工具,主要是缓存用户的发送信息
@RestController
@RequestMapping("/api/push-server/")
public class RegisterController {
@Autowired
private RedisTemplate redisTemplate;
/**
* 进行注册
*/
@RequestMapping(value = "unRegist",method = RequestMethod.POST)
public void unRegist(RegistDto registDto){
redisTemplate.boundSetOps(registDto.getTopicPre()).add(registDto.getMark());
}
/**
* 取消注册
* @param registDto
*/
@RequestMapping(value = "regist",method = RequestMethod.POST)
public void regist(RegistDto registDto){
redisTemplate.boundSetOps(registDto.getTopicPre()).remove(registDto.getMark());
}
}
进行对应的订阅渠道注册,主要是消息推送的时候能针对性的推送
/**
* @Auther yidadi
* @Date 17-9-1 下午4:31
*/
@RestController
@RequestMapping("/api/push-server/")
public class PushController {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private MqttClient mqttProducer;
/**
*
* @param pushMsgDto
*/
@RequestMapping(value = "pushTopic",method = RequestMethod.POST)
public void pushTopic(PushMsgDto pushMsgDto){
Set<String> datas = redisTemplate.boundSetOps(pushMsgDto.getTopicPre()).members();
if(datas.isEmpty()){
throw new PushException("the register user is empty");
}else{
datas.forEach(data->{
MqttMessage message = new MqttMessage();
message.setQos(pushMsgDto.getQos());
message.setRetained(false);
message.setPayload(pushMsgDto.getContent().getBytes());
try {
mqttProducer.publish(pushMsgDto.getTopicPre().concat(data),message);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
}
/**
*
* @param pushMsgDto
*/
@RequestMapping(value = "pushOne",method = RequestMethod.POST)
public void pushOne(PushMsgDto pushMsgDto){
MqttMessage message = new MqttMessage();
message.setQos(pushMsgDto.getQos());
message.setRetained(false);
message.setPayload(pushMsgDto.getContent().getBytes());
try {
mqttProducer.publish(pushMsgDto.getTopicPre().concat(pushMsgDto.getMark()),message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
可以直接调用接口的方式发送推送消息,可以单点推送也可针对对应的渠道推送
/**
* @Auther yidadi
* @Date 17-9-4 下午3:42
*/
@RestController
@RequestMapping("/api/push-server/")
public class SubController {
@Autowired
private KafkaProducer kafkaProducer;
/**
*
* @param subMsgDto
*/
@RequestMapping(value = "sub",method = RequestMethod.POST)
public void sub(SubMsgDto subMsgDto){
ProducerRecord producerRecord = new ProducerRecord(subMsgDto.getMqtopic(), JsonUtils.toJson(subMsgDto));
kafkaProducer.send(producerRecord);
}
}
可以通过接口方式的获取客户端发送回来的数据
/**
* @Auther yidadi
* @Date 17-9-4 下午3:38
*/
public class MqttCallBack implements MqttCallback {
private KafkaProducer kafkaProducer;
public MqttCallBack(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
SubMsgDto subMsgDto = (SubMsgDto)JsonUtils.readFromJson(new String(message.getPayload()),SubMsgDto.class);
kafkaProducer.send(new ProducerRecord(subMsgDto.getMqtopic(),JsonUtils.toJson(subMsgDto)));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
也可以通过mqtt的方式获取对应的渠道消息发送到kafka,消息体里面需要制定对应的kafka的topic
Yuki-^_^
- 粉丝: 3103
- 资源: 3611
最新资源
- 基于大数据环境搭建,本项目为大数据基础镜像组件,Hadoop、Spark、Hive、Tez、Hue、Flink、Zookeeper、Kafka、MySQL等,用
- 基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法详细文档+全部资料.zip
- 基于开源flink,源码阅读注释详细文档+全部资料.zip
- 基于微服务架构的实时计算(Flink)展示平台详细文档+全部资料.zip
- 工具4:股权激励如何实施.xls
- 天津滨海快速交通发展有限公司股权激励机制探讨2.ppt
- 某某交通股份有限公司高层股权激励方案.doc
- 话费管理规定.docx
- 话费补贴申请书.doc
- 交通补贴及移动话费补贴政策.doc
- 话费补贴管理制度.doc
- 电话费补贴管理办法(暂行).doc
- 话费补助管理制度.doc
- 员工话费补贴管理制度.doc
- 手机补贴标准管理办法.doc
- 加班与加班费的控制技巧.ppt
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈