package com.push;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import Redis.utils.JedisUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
* Created on 2014-3-15
* <p>Description: [描述该类概要功能介绍]</p>
*/
public class RedisTopicListener extends JedisPubSub {
private static Logger logger = LoggerFactory
.getLogger(RedisTopicListener.class);
private Queue<AsyncContext> asyncContexts = new ConcurrentLinkedQueue<AsyncContext>();
public void addAsyncContexts(AsyncContext asyncContext){
asyncContexts.add(asyncContext);
}
public RedisTopicListener(){
}
/*
* 常规模式:关闭订阅时触发
* channel key值
* subscribedChannels 订阅数量
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}]--->>【{}】【{}】",
new Object[]{"onUnsubscribe","常规模式:关闭订阅时触发","String channel","int subscribedChannels",channel,subscribedChannels});
}
/*
* 常规模式:启动订阅时触发
* channel key值
* subscribedChannels 订阅数量
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}]--->>【{}】【{}】",
new Object[]{"onSubscribe","常规模式:启动订阅时触发","String channel","int subscribedChannels",channel,subscribedChannels});
// Jedis jedis=JedisUtils.getJedis();
// UUID uuid=UUID.randomUUID();
// jedis.sadd(channel, uuid.toString());
// JedisUtils.returnJedis(jedis);
// logger.info("存入UUID,key值为{},value为{}",channel,uuid.toString());
if(!channel.equals("initChannel")){
for( AsyncContext asyncContext : asyncContexts){
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
if(request.getParameter("wd")!=null&&request.getParameter("wd").equals(channel)){
String uuid=null;
if(request.getParameter("uuid")!=null){
uuid=request.getParameter("uuid");
}else{
uuid= UUID.randomUUID().toString();
}
Jedis jedis=JedisUtils.getJedis();
jedis.sadd(channel, uuid);
JedisUtils.returnJedis(jedis);
HttpServletResponse peer = (HttpServletResponse) asyncContext.getResponse();
Map<String, String> map = new HashMap<String, String>();
map.put("subMsg", "订阅了");
map.put("result", "true");
map.put("uuid", uuid);
try {
peer.getWriter().write( new JSONArray().put(map).toString());
} catch (IOException e) {
e.printStackTrace();
}
peer.setStatus(HttpServletResponse.SC_OK);
peer.setContentType("application/json");
asyncContext.complete();
System.out.println("==============111asyncContexts.size()"+asyncContexts.size());
asyncContexts.remove(asyncContext);
System.out.println("==============222asyncContexts.size()"+asyncContexts.size());
logger.info("删除了一个asyncContext");
break;
}
}
}
}
/*
* 常规模式:收到匹配key值的消息时触发
* channel key值
* message 收到的消息值
*/
@Override
public void onMessage(String channel, String message) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}]--->>【{}】【{}】",
new Object[]{"onMessage","常规模式:收到匹配key值的消息时触发","String channel","String message",channel,message});
JSONObject msg=new JSONObject();
try {
msg.put("key", channel);
msg.put("value", message);
} catch (JSONException e) {
e.printStackTrace();
}
subMsg2Web(msg);
}
/*
* 正则模式:关闭正则类型订阅时触发
* pattern key的正则表达式
* subscribedChannels 订阅数量
*/
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}]--->>【{}】【{}】",
new Object[]{"onPUnsubscribe","正则模式:关闭正则类型订阅时触发",
"String pattern"," int subscribedChannels",pattern, subscribedChannels});
}
/*
* 正则模式:启动正则类型订阅时触发
* pattern key的正则表达式
* subscribedChannels 订阅数量
*/
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}]--->>【{}】【{}】",
new Object[]{"onPSubscribe","正则模式:启动正则类型订阅时触发",
"String pattern"," int subscribedChannels",pattern, subscribedChannels});
}
/*
* 正则模式:收到匹配key值的消息时触发
* pattern订阅的key正则表达式
* channel匹配上该正则key值
* message收到的消息值
*/
@Override
public void onPMessage(String pattern, String channel, String message) {
logger.info("调用的方法为{},功能为{},\n参数为[{}],[{}][{}]--->>【{}】【{}】【{}】",
new Object[]{"onPMessage","正则模式:收到匹配key值的消息时触发",
"String pattern"," int channel"," String message",pattern, channel,message});
System.out.println(channel);
System.out.println(asyncContexts);
JSONObject msg=new JSONObject();
try {
msg.put("key", channel);
msg.put("value", message);
} catch (JSONException e) {
e.printStackTrace();
}
subMsg2Web(msg);
}
public void subMsg2Web(JSONObject message) {
// 获得要推送的channel
String channelName = null;
boolean hasUUID=false;
try {
channelName = message.getString("key");
} catch (JSONException e1) {
e1.printStackTrace();
}
Jedis jedis=JedisUtils.getJedis();
// while (!asyncContexts.isEmpty()) {
logger.info("调用的方法为{},功能为{},\n参数为[{}]--->>【{}】",
new Object[]{"subMsg2Web","推送",
"JSONObject message",message});
for( AsyncContext asyncContext : asyncContexts){
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
String reqUUID=request.getParameter("uuid");
if(reqUUID!=null){
//判断该UUID是否存在channel中
hasUUID=jedis.sismember(channelName,reqUUID);
JedisUtils.returnJedis(jedis);
}
if(hasUUID){
try {
HttpServletResponse peer = (HttpServletResponse) asyncContext
.getResponse();
peer.getWriter().write(
new JSONArray().put(message).toString());
peer.setStatus(HttpServletResponse.SC_OK);
peer.setContentType("application/json");
asyncContext.complete();
System.out.println(asyncContexts.size());
asyncContexts.remove(asyncContext);
logger.info("删除了一个asyncContext");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
logger.info("调用的方法为{},功能为{},参数为【{}】,【{}】",new Object[]{"onUnsubscribe","常规模式:关闭订阅时触发","String channel","int subscribedChannels"});
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
myJPUSH.zip (65个子文件)
myJPUSH
myJPUSH.iml 11KB
.settings
org.eclipse.wst.jsdt.ui.superType.name 6B
com.genuitec.eclipse.core.prefs 128B
org.eclipse.wst.common.project.facet.core.xml 252B
org.eclipse.wst.common.component 459B
org.eclipse.wst.jsdt.ui.superType.container 49B
.jsdtscope 500B
org.eclipse.jdt.core.prefs 395B
src
Redis
utils
JRedisPoolConfig.java 1KB
JedisUtils.java 3KB
redis.properties 400B
com
push
RedisTopicListener.java 8KB
ReverseAjaxServlet.java 2KB
.idea
uiDesigner.xml 9KB
scopes
scope_settings.xml 143B
misc.xml 818B
vcs.xml 173B
.name 7B
copyright
profiles_settings.xml 115B
modules.xml 265B
encodings.xml 171B
compiler.xml 734B
workspace.xml 24KB
.project 2KB
WebRoot
META-INF
MANIFEST.MF 36B
WEB-INF
web.xml 785B
classes
Redis
utils
JedisUtils.class 2KB
JRedisPoolConfig.class 1KB
redis.properties 400B
com
push
RedisTopicListener.class 7KB
ReverseAjaxServlet.class 4KB
ReverseAjaxServlet$1.class 1KB
lib
servlet-api-3.0.20100224.jar 205KB
ezmorph-1.0.6.jar 84KB
commons-codec-1.3.jar 46KB
commons-fileupload-1.2.jar 52KB
commons-beanutils.jar 116KB
commons-collections.jar 546KB
slf4j-jdk14-1.6.1.jar 9KB
commons-logging-1.0.3.jar 31KB
mail-1.4.jar 380KB
commons-httpclient-3.1.jar 298KB
testatoo-container-core-1.0-rc2.jar 66KB
slf4j-api-1.6.1.jar 25KB
commons-logging-1.1.1.jar 59KB
jetty-all-8.0.0.M2.jar 1.46MB
json-lib-2.3-jdk13.jar 148KB
slf4j-log4j12-1.5.11.jar 9KB
commons-logging.jar 37KB
testatoo-container-jetty-1.0-rc2.jar 5KB
commons-pool.jar 41KB
log4j-1.2.15.jar 383KB
commons-digester.jar 107KB
commons-lang-2.3.jar 240KB
commons-cli-1.2.jar 40KB
jedis.jar 136KB
json-lib-2.4-jdk15.jar 155KB
commons-io-1.4.jar 106KB
jettison-1.2.jar 71KB
baiduMap.jsp 1KB
js
jquery.min.js 89KB
jquery.json-2.2.min.js 2KB
index.jsp 3KB
.mymetadata 290B
.classpath 3KB
共 65 条
- 1
_Ares
- 粉丝: 0
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
前往页