package com.demo.websocket;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.json.JSONUtil;
import com.demo.bean.SignalMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import cn.hutool.core.collection.CollUtil;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_JOIN;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_PEER_CANDIDATE;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_SDP;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_SDP_ANSWER;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_RESP_JOIN;
import static com.demo.bean.SignalMsg.SIGNAL_CMD_SDP_START;
@Slf4j
@ServerEndpoint(value = "/myWebsocket")
@Component
public class MyWebSocket {
private static final Map<String, List<MyWebSocket>> roomMap = new ConcurrentHashMap<>();
private Session session;
private String userId;
private String roomId;
private String description;
private Set<String> candidateSet = new ConcurrentHashSet<>();
@OnMessage
public void echoTextMessage(Session session, String msg) throws IOException {
SignalMsg signalMsg = JSONUtil.toBean(msg, SignalMsg.class);
String cmd = signalMsg.getCmd();
switch (cmd) {
//进入房间
case SIGNAL_CMD_JOIN:
join(signalMsg);
break;
case SIGNAL_CMD_SDP:
sdp(signalMsg);
break;
case SIGNAL_CMD_SDP_ANSWER:
sdpAnswer(signalMsg);
break;
case SIGNAL_CMD_PEER_CANDIDATE:
candidate(signalMsg);
break;
default:
}
}
/**
* 网络协商
*
* @param signalMsg
* @return
* @author 刘朋
* <br/>date 2023-05-15
*/
private void candidate(SignalMsg signalMsg) throws IOException {
String candidate = signalMsg.getMsg();
candidateSet.add(candidate);
log.info("网络协商Candidates=" + candidateSet.size() + candidate);
SignalMsg msg = new SignalMsg();
msg.setCmd(SIGNAL_CMD_PEER_CANDIDATE);
msg.setMsg(candidate);
sendOther(msg);
}
private void sdpAnswer(SignalMsg signalMsg) throws IOException {
description = signalMsg.getMsg();
SignalMsg msg = new SignalMsg();
msg.setCmd(SIGNAL_CMD_SDP_ANSWER);
msg.setMsg(description);
sendOther(msg);
}
/**
* 媒体协商
*
* @param signalMsg
* @return
* @author 刘朋
* <br/>date 2023-05-15
*/
private void sdp(SignalMsg signalMsg) throws IOException {
description = signalMsg.getMsg();
SignalMsg msg = new SignalMsg();
msg.setCmd(SIGNAL_CMD_SDP);
msg.setMsg(description);
sendOther(msg);
}
/**
* 进入房间通知,通知房间其他人 有人进入房间
*
* @param signalMsg
* @return
* @author 刘朋
* <br/>date 2023-05-15
*/
private void join(SignalMsg signalMsg) throws IOException {
this.roomId = signalMsg.getRoomId();
this.userId = signalMsg.getUid();
List<MyWebSocket> myWebSockets = roomMap.get(roomId);
if (Objects.isNull(myWebSockets)) {
myWebSockets = new CopyOnWriteArrayList<>();
roomMap.put(roomId, myWebSockets);
}
//加入房间
myWebSockets.add(this);
log.info("有人进入房间roomId:{} , userId:{}", roomId, userId);
//进入房间应答
SignalMsg respMsg = new SignalMsg();
respMsg.setCmd(SIGNAL_CMD_RESP_JOIN);
session.getBasicRemote().sendText(JSONUtil.toJsonStr(respMsg));
if (myWebSockets.size() > 1) {
SignalMsg msg = new SignalMsg();
msg.setCmd(SIGNAL_CMD_SDP_START);
session.getBasicRemote().sendText(JSONUtil.toJsonStr(msg));
}
}
/**
* 发送给其他人
*
* @param msg
* @throws IOException
*/
private void sendOther(SignalMsg msg) throws IOException {
List<MyWebSocket> myWebSockets = roomMap.get(roomId);
if (myWebSockets.size() > 1) {
for (MyWebSocket myWebSocket : myWebSockets) {
if (!myWebSocket.equals(this)) {
//通知其他用户有人进入房间
myWebSocket.session.getBasicRemote().sendText(JSONUtil.toJsonStr(msg));
}
}
}
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
System.out.println("有新的连接 !");
this.session = session;
}
@OnClose
public void end() {
System.out.println("关闭ws");
List<MyWebSocket> myWebSockets = roomMap.get(roomId);
if (CollUtil.isNotEmpty(myWebSockets)) {
myWebSockets.remove(this);
}
}
@OnError
public void onError(Throwable t) {
System.out.println("Chat Error: " + t.toString());
}
}