package org.mf.socket;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.java_websocket.WebSocket;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
/**
* 类名/接口名
*
* @author mengfei
* @date 2023年09月11日 13:36
*/
public class WebSocketUtil {
private static final Logger LOGGER = LogManager.getLogger(WebSocketUtil.class);
/**
* 自定义server容器,用来记录连接的webSocket
*/
private static CustomWebServer CUSTOM_WEB_SERVER = null;
/**
* 启动websocket服务
* @param port 端口号
* @throws Exception
*/
public static void startServer(Integer port) {
//设置jvm退出时的清理线程
setClearWebSocketTask();
//创建同步阻塞队列,用来获取webSocket启动结果
SynchronousQueue<WebSocketStartResult> synchronousQueue = new SynchronousQueue<>();
CUSTOM_WEB_SERVER = new CustomWebServer(port, synchronousQueue);
CUSTOM_WEB_SERVER.start();
//线程一直阻塞,直到拿到启动结果
WebSocketStartResult startResult = null;
try {
startResult = synchronousQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (startResult.getStartStatus() == 2) {
//启动失败,抛出异常
Exception startException = (Exception) startResult.getStartResult();
LOGGER.error("websocket server启动失败", startException);
throw new RuntimeException(startException);
}
LOGGER.info("websocket server启动成功,端口号:{}", port);
}
private static void setClearWebSocketTask() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
stopServer();
}
});
thread.setName("websocket关闭线程");
thread.setDaemon(true);
Runtime.getRuntime()
.addShutdownHook(thread);
}
public static void stopServer() {
if (CUSTOM_WEB_SERVER != null) {
try {
CUSTOM_WEB_SERVER.stop();
} catch (InterruptedException e) {
LOGGER.error("websocket服务关闭失败", e);
throw new RuntimeException(e);
}
}
}
/**
* 向websocket连接中推送消息,方法加锁,防止多个线程同时推送消息造成错误
* @param messageBody
* @return
*/
public static synchronized void writeMessage(String messageBody) {
if (CUSTOM_WEB_SERVER == null) {
return;
}
ConcurrentHashMap<WebSocket, Integer> connAndStatus = CUSTOM_WEB_SERVER.getConns();
for (Map.Entry<WebSocket, Integer> webSocketAnsStatusEntry : connAndStatus.entrySet()) {
WebSocket conn = webSocketAnsStatusEntry.getKey();
try {
//发送消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("messageBody", messageBody);
conn.send(jsonObject.toJSONString());
} catch (Exception e) {
LOGGER.error("websocket消息推送失败", e);
}
}
};
/**
* websocket启动结果
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
private static class WebSocketStartResult {
/**
* socket server启动状态
*/
private Integer startStatus;
/**
* socket server启动结果信息
* 如果status为1,result中存放启动成功字符串
* 如果status为2,result中存放启动的异常
*/
private Object startResult;
}
/**
* 自定义websocket服务
*/
private static class CustomWebServer extends WebSocketServer {
/**
* 用于通知websocket server启动结果
*/
private SynchronousQueue<WebSocketStartResult> synchronousQueue;
/**
* 记录webSocket的启动状态,0-已创建未启动,1-启动成功,2-启动失败
*/
private Integer startState = 0;
/**
* 存储所有socket连接(使用线程安全的map集合,防止在往连接中推送消息时发生不必要的错误,比如连接已经关闭但还是向其推送消息)
*/
@Getter
private ConcurrentHashMap<WebSocket, Integer> conns = new ConcurrentHashMap<>();
public CustomWebServer(int port, SynchronousQueue<WebSocketStartResult> synchronousQueue) {
super(new InetSocketAddress(port));
this.synchronousQueue = synchronousQueue;
}
/**
* websocket conn打开的回调
* @param conn The <tt>WebSocket</tt> instance this event is occurring on.
* @param handshake The handshake of the websocket instance
*/
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
LOGGER.info("websocket 连接创建,连接:{}", conn);
//将连接加入map
conns.put(conn, 0);
}
/**
* websocket conn收到消息的回调,目前仅处理认证消息,其他消息返回错误
* @param conn The <tt>WebSocket</tt> instance this event is occurring on.
* @param message The UTF-8 decoded message that was received.
*/
@Override
public void onMessage(WebSocket conn, String message) {
LOGGER.info("websocket 接收到消息,连接:{},消息:{}", conn, message);
//在此处处理接收的message
}
/**
* websocket conn被关闭的回调
* @param conn The <tt>WebSocket</tt> instance this event is occurring on.
* @param code The codes can be looked up here: {@link CloseFrame}
* @param reason Additional information string
* @param remote Returns whether or not the closing of the connection was initiated by the remote
* host.
*/
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
LOGGER.info("websocket 连接关闭,连接:{},状态:{},关闭原因:{},是否是远端关闭:{}", conn, code, reason, remote);
//从连接map中移除
conns.remove(conn);
}
/**
* websocketServer发生异常的回调
* @param conn Can be null if there error does not belong to one specific websocket. For example
* if the servers port could not be bound.
* @param ex The exception causing this error
*/
@Override
public void onError(WebSocket conn, Exception ex) {
LOGGER.error("websocket 异常,连接:{},exception:{}", conn, ex);
// 在这里,您可以处理任何错误情况
if (startState == 0) {
//修改启动状态
startState = 2;
//将启动结果的异常放入请求队列中
WebSocketStartResult webSocketStartResult = new WebSocketStartResult(2, ex);
try {
synchronousQueue.put(webSocketStartResult);
} catch (Interrupt
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
使用Java-WebSocket实现基于WebSocket协议的消息推送功能。 运行时先启动WebSocketUtil类(消息推送端/服务端),再启动SocketClient(消息接收端/客户端),如需自定义消息协议可以在发送端的writeMessage方法中定义发送协议,在接收端onMessage方法中定义接收协议。 本示例中消息格式均为json格式传递,服务端指定启动的WebSocket服务启动的端口号,客户端启动的时候指定服务端的ip地址和端口协议,双方即可实现通讯,消息的发送在例子中是一个每隔1秒循环发送的消息,大家可以根据需求自定义发送策略。 本例中还f缺少服务端对socket连接的认证和socket的管理控制等功能,还缺少对消息的分类发送能力,客户端也缺少对消息的筛选功能,感兴趣的同学可以自行拓展进行二次开发,如例子中有错误欢迎大家批评指正!
资源推荐
资源详情
资源评论
收起资源包目录
web-socket-demo.zip (13个子文件)
pom.xml 2KB
src
main
resources
log4j2.xml 493B
java
org
mf
socket
WebSocketUtil.java 9KB
SocketClient.java 3KB
target
classes
log4j2.xml 493B
org
mf
socket
SocketClient.class 2KB
WebSocketUtil$1.class 536B
WebSocketUtil.class 5KB
SocketClient$MyWebSocketClient$1.class 1KB
SocketClient$1.class 2KB
WebSocketUtil$CustomWebServer.class 4KB
WebSocketUtil$WebSocketStartResult.class 2KB
SocketClient$MyWebSocketClient.class 2KB
generated-sources
annotations
共 13 条
- 1
资源评论
码小飞飞飞飞
- 粉丝: 845
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功