package com.lt.webSocket.websocket.server;
import com.alibaba.fastjson.JSONObject;
import com.lt.webSocket.websocket.listener.RedisMessageListener;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint(path = "${websocket.path}/{arg}",host = "${websocket.host}",port = "${websocket.port}" )
public class WebsocetServer {
@Autowired
private ManagerWebSocketHandler managerWebSocketHandler;
@Autowired
private RedisTemplate redisTemplate;
@Value("${pubMsg.pubChannel}")
private String pubChannel;
/*当有新的连接进入时,对该方法进行回调*/
//BeforeHandshake是在websocket握手前,主要是做一些简单的鉴权和关闭
//获得访问的参数 ServerEndpoint的{arg}的参数 @PathVariable String arg ,加了@RequestParam 就一定要传这个参数,要不会报错
@BeforeHandshake
public void handshake(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){
session.setSubprotocols("stomp");
if (!"ok".equals(req)){
System.out.println("Authentication failed!");
session.close();
}
}
/*当有新的WebSocket连接完成时,对该方法进行回调 */
//reqMap 参数就是url?name=zhangsan&age=10 将name和age作为key 来存储值
/*打开websocket链接,调用当前方法*/
@OnOpen
public void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){
String roomId = String.valueOf(reqMap.getFirst("roomId"));
managerWebSocketHandler.createRoom(roomId,session);
log.info("roomId:{} 链接成功",roomId);
}
/*当有WebSocket连接关闭时,对该方法进行回调*/
@OnClose
public void onClose(Session session) throws IOException {
managerWebSocketHandler.removeSession(session);
log.info("one connection closed");
}
/*当有WebSocket抛出异常时,对该方法进行回调 */
@OnError
public void onError(Session session, Throwable throwable) {
managerWebSocketHandler.removeSession(session);
log.info("长连接异常");
throwable.printStackTrace();
}
/*当接收到字符串消息时,对该方法进行回调*/
@OnMessage
public void onMessage(Session session, String message) {
//心跳 服务端不会主动断。
//如果需求心条,建议在客户端定时发送ping到服务端
if("ping".equals(message)){
//心跳:返回当前时间戳(毫秒值)打日志,但不做其他操作
session.sendText("pong"+System.currentTimeMillis());
return;
}
// session.isActive();//校验session 是否存活
/*
*
* message的格式如下:
* {
* "roomId":123,
* "msg":"我是消息体"
* }
*
* */
/*1、分布式用法*/
//将消息通过redis广播到每台服务器,如果那台服务器本地缓存有当前的房间号,那么就发送消息,没有就不发送
redisTemplate.convertAndSend(pubChannel,message);
/*2、单机用法*/
//如果是单机将上面的分布式用法注释掉,然后打开下面的单机用法发送消息 收到消息,将消息在房间内广播发送,也就是在房间中的用户都能收到消息
/*
String msg = this.parseMsg(message);
managerWebSocketHandler.sendMsg(session,msg);
*/
// session.sendText(message);//给客户端发送信息
}
/*当接收到二进制消息时,对该方法进行回调*/
@OnBinary
public void onBinary(Session session, byte[] bytes) {
for (byte b : bytes) {
System.out.println(b);
}
session.sendBinary(bytes);
}
/*当接收到Netty的事件时,对该方法进行回调 */
@OnEvent
public void onEvent(Session session, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case READER_IDLE:
System.out.println("read idle");
break;
case WRITER_IDLE:
System.out.println("write idle");
break;
case ALL_IDLE:
System.out.println("all idle");
break;
default:
break;
}
}
}
public String parseMsg(String message) {
String msg = RedisMessageListener.parse(message);
//String msg = StringEscapeUtils.unescapeJava(message.toString());
//JSONObject json = JSONObject.parseObject(message.toString());
JSONObject json = JSONObject.parseObject(msg);
return json.getString("msg");
}
public static void main(String[] args) {
ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
Object a = concurrentHashMap.putIfAbsent("1", "a");
Object b = concurrentHashMap.putIfAbsent("1", "b");
Object b1 = concurrentHashMap.putIfAbsent("2", "b");
System.out.println(a);
System.out.println(b);
System.out.println(b1);
}
}
评论0