package com.example.server.netty;
import com.example.server.biz.UserBiz;
import com.example.server.utils.JsonUtils;
import com.example.server.utils.SpringUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* 客户端消息处理器
*
* @author xiangLong
* @date 2019年5月31日 上午10:36:44
*/
@Slf4j
public class ChatNewHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 用于记录和管理所有客户端的channle,可以用redis取代
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 当客户端连接服务端之后(打开连接)
* 获取客户端的channle,并且放到ChannelGroup中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("add.....");
users.add(ctx.channel());
}
/**
* 处理前端传过来的数据
* 可能是连接成功通知:服务端需要将uid和channel绑定
* 具体聊天消息:服务端需要把消息转发给接收方,此时手机签收状态为未签收
* 手机签收消息成功通知(用户读没读不一定,只是手机接到消息了):服务端更改数据库记录为已签收
* 心跳通知:无需处理
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("read..........");
// 获取客户端传输过来的消息
String content = msg.toString(Charset.forName("GBK"));
Channel currentChannel = ctx.channel();
// 1. 获取客户端发来的消息
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
Integer action = dataContent.getAction();
// 2. 判断消息类型,根据不同的类型来处理不同的业务
if (action == MsgActionEnum.CONNECT.type) {
// 2.1当websocket 第一次open的时候,初始化channel,把用户的channel和userid关联起来
String senderId = dataContent.getChatMsg().getSenderId();
UserChannelRel.put(senderId, currentChannel);
// 测试
for (Channel c : users) {
System.out.println(c.id().asLongText());
}
UserChannelRel.output();
} else if (action == MsgActionEnum.CHAT.type) {
// 2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]
ChatMsg chatMsg = dataContent.getChatMsg();
chatMsg.setFlag(0);
String msgText = chatMsg.getMsg();
String receiverId = chatMsg.getReceiverId();
String senderId = chatMsg.getSenderId();
// 保存消息到数据库,并且标记为 未签收
// 在handler中无法使用依赖注入的方式获取ioc容器中的service对象,需要直接从ioc容器中取
UserBiz userBiz = (UserBiz) SpringUtil.getBean("userBiz");
String msgId = userBiz.saveMsg(chatMsg);
chatMsg.setMsgId(msgId);
DataContent dataContentMsg = new DataContent();
dataContentMsg.setChatMsg(chatMsg);
// 发送消息
// 从全局用户Channel关系中获取接受方的channel
Channel receiverChannel = UserChannelRel.get(receiverId);
if (receiverChannel == null) {
// TODO channel为空代表用户离线,推送消息(JPush,个推,小米推送)
System.err.println("离线");
} else {
// 当receiverChannel不为空的时候,从ChannelGroup去查找对应的channel是否存在
Channel findChannel = users.find(receiverChannel.id());
if (findChannel != null) {
// 用户在线,服务端利用channel将消息发送给接收方
String result = JsonUtils.objectToJson(dataContentMsg);
ByteBuf encoded = ByteBufAllocator.DEFAULT.buffer(4 * result.length());
encoded.writeBytes(result.getBytes("GBK"));
receiverChannel.writeAndFlush(encoded);
} else {
// 用户离线 TODO 推送消息
System.err.println("离线");
}
}
} else if (action == MsgActionEnum.SIGNED.type) {
// 2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]
UserBiz userBiz = (UserBiz)SpringUtil.getBean("userBiz");
// 扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号间隔
String msgIdsStr = dataContent.getExtand();
String msgIds[] = msgIdsStr.split(",");
List<String> msgIdList = new ArrayList<>();
for (String mid : msgIds) {
if (StringUtils.isNotBlank(mid)) {
msgIdList.add(mid);
}
}
System.out.println(msgIdList.toString());
if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) {
// 批量签收
userBiz.updateMsgSigned(msgIdList);
}
} else if (action == MsgActionEnum.KEEPALIVE.type) {
// 2.4 心跳类型的消息,不需要处理。只是为了判断是否关闭channel
System.out.println("收到来自channel为[" + currentChannel + "]的心跳包...");
}
}
/**
* 当客户端断开连接之后(相当于关闭channel)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
System.out.println("客户端被移除,channelId为:" + channelId);
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
users.remove(ctx.channel());
}
/**
* 这个类的统一异常处理方法
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
ctx.channel().close();
users.remove(ctx.channel());
}
}
评论0