使用SpringBoot和Netty实现一对一(互相)简单聊天 - Go语言中文社区

使用SpringBoot和Netty实现一对一(互相)简单聊天


首先看一下效果图:
在这里插入图片描述

依赖前端代码详情请移步:https://github.com/coffcer/vue-chat

本样例前端采用JQuery与Vue + Webpack
为了项目尽可能简单,我们一切从简,具体如下:

  • 不涉及复杂的业务逻辑

  • 测试样例从简(Lucy,Jack,Mike),MYSQL表数据如下:
    在这里插入图片描述

  • 项目存在两个服务器:tomcat服务器,Netty构建的webSocket服务器

  • 项目结构如下:
    在这里插入图片描述

额,DeleteUselessRepository与本项目无关,只是用来清理maven仓库的无效文件夹。

需要指出的是前端模板本身是很优异的,使用了Vue,也就是我们只需要修改Vue的data的数据即可,然后在关键的部位添加我们自己的方法即可。

另外此tomcat服务于netty服务无交集,即不共享session,此为缺陷之一。

下面讲解代码部分:

CacheLoader.java

public class CacheLoader {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public static Map<Double,Channel> channelMap = new ConcurrentHashMap<>();
}

channelGroup和channelMap都是存储客户端的SocketChannel对象的

channelGroup使用原生对象,使用全局事件处理器,主要用来广播消息

channelMap为自己实现,主要是为了完成一对一通信,所以每一个SocketChannel都与其用户id绑定

HttpRequestHandler.java

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    // webSocket标识
    private final String wsUri;

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
        // 如果是webSocket请求,请求地址uri等于wsUri
        if (wsUri.equalsIgnoreCase(fullHttpRequest.uri())) {
            // 将消息发送到下一个channelHandler
            ctx.fireChannelRead(fullHttpRequest.retain());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

因为此处不需要构建http服务(全权交给tomcat处理),故HttpRequestHandler实现就一个功能,判断请求类型,如果是websocket请求则向下转发。

TextWebSocketFrameHandler.java

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ChannelGroup channelGroup;

    private ChatService chatService = new ChatServiceImpl();


    public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 如果ws握手完成
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            // 删除pipeLine中处理http请求的handler
            ctx.pipeline().remove(HttpRequestHandler.class);
            // 写一个消息广播到所有的客户端channel
            //channelGroup.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined!"));
            // 将当前客户端channel添加进group
            channelGroup.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        // 将接收的消息通过ChannelGroup转发到所有连接的客户端
        //channelGroup.writeAndFlush(textWebSocketFrame.retain());
        // 前端组装的消息格式是 {"message":{"text":"项目地址","date":"2018-11-28T02:13:52.437Z"},"to":2,"from":1}
        Map<String,Object> msg = GsonUtils.fromJson(textWebSocketFrame.text().toString(),new TypeToken<Map<String,Object>>(){});
        String type = (String) msg.get("type");
        switch (type) {
            case "REGISTER":
                chatService.register(channelHandlerContext,msg);
                break;
            case "SINGLE_SENDING":
                chatService.singleSend(channelHandlerContext,msg);
                break;
        }
    }
}

userEventTriggered是客户端连接请求触发函数

channelRead0中则是我们的核心业务逻辑

  • 前端传来的数据是Json字符串
  • Json字符串中存在type字段用来标识消息类型,from字段标识消息来源,to字段标识消息去向,message则为消息体。
  • 因为前端模板需要,message中应该存在两个字段:date与text。二者顾名思义。

ChatServiceImpl.java

public class ChatServiceImpl implements ChatService{
    @Override
    public void register(ChannelHandlerContext channelHandlerContext, Map<String, Object> msg) {
        CacheLoader.channelMap.put(Double.parseDouble(msg.get("userId").toString()),channelHandlerContext.channel());
    }

    @Override
    public void singleSend(ChannelHandlerContext channelHandlerContext, Map<String, Object> msg) {
        Double to = Double.parseDouble(msg.get("to").toString());
        msg.remove("to");
        msg.remove("type");
        CacheLoader.channelMap.get(to).writeAndFlush(new TextWebSocketFrame(GsonUtils.toJson(msg)));
    }
}

以上两个逻辑函数比较简单,不要吐槽为什么channelMap的id是Double类型,因为我懒QAQ!

单对单聊天中msg需要传到前台的只有from和message两个字段。

ChatServerInitializer.java

public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {

    private final ChannelGroup channelGroup;

    public ChatServerInitializer(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline()
                // 编解码http请求
                .addLast(new HttpServerCodec())
                //聚合解码HttpRequest/HttpContent/LastHttpContent到FullHttpRequest
                //保证接收的Http请求的完整性
                .addLast(new HttpObjectAggregator(64 * 1024))
                // 处理FullHttpRequest
                .addLast(new HttpRequestHandler("/ws"))
                // 处理其他的WebSocketFrame
                .addLast(new WebSocketServerProtocolHandler("/ws"))
                // 处理TextWebSocketFrame
                .addLast(new TextWebSocketFrameHandler(channelGroup));
    }
}

netty支持六种WebSocket框架

在这里插入图片描述

我们只需要处理TextWebSocketFrameHandler,其他交给netty的WebSocketServerProtocolHandler处理

下面是对前端js的改造和新增

  • 改造发送逻辑,新增send方法
  • 改造加载逻辑,新增fetch方法
  • 改造在原末班的main.js进行,新增handle.js

handle.js

var R$_globalVM;
// 加载初始化信息
R$_fetch = function () {
    // 下面的ajax方法为同步
    var result = null;
    $.ajaxSettings.async = false
    $.get("/user/getInitialData",null,function (data) {
        result =  data;
    });
    return result;
    $.ajaxSettings.async = true
}
var socket;
if (!window.WebSocket) {
    window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
    socket = new WebSocket("ws://localhost:2048/ws");
    socket.onmessage = function (event) {
        // 后端发送过来的数据中存在userId,message
        var source = JSON.parse(event.data);
        console.log(source);
        var vm_data = R$_globalVM.$data.sessionList;
        for(var i = 0;i<vm_data.length;i++){
            if (vm_data[i].userId == source.from){
                vm_data[i].messages.push(source.message);
                break;
            }
        }
    };
    // 连接成功1秒后,将用户信息注册到服务器在线用户表
    socket.onopen = setTimeout(function(event){
        console.log("连接开启!");
        var data = {
            "userId" : R$_globalVM.$data.user.id,
            "type" : "REGISTER"
        };
        send(JSON.stringify(data));
    }, 1000);
    socket.onclose = function (event) {
        console.log("连接被关闭");
    };
} else {
    alert("你的浏览器不支持 WebSocket!");
}

function send(value) {
    if (!window.WebSocket) {
        return;
    }
    if (socket.readyState == WebSocket.OPEN) {
        socket.send(value);
    } else {
        alert("连接没有开启.");
    }
}

R$_send = function (value) {
    send(JSON.stringify(value));
    return true;
}

socket.onopen中完成用户注册请求逻辑

socket.onmessage接收到后台发送的消息,判断来源,将message对应加入到sessionList中

/user/getInitialData对应的控制器逻辑:

@RequestMapping(value = "getInitialData",method = RequestMethod.GET)
@ResponseBody
public Map<String,Object> getInitialData(HttpSession session){
    User user = (User) session.getAttribute("user");
    Map<String,Object> result = new HashMap<>();
    // user
    result.put("user",userService.userToMap(user));
    // userList,sessionList
    List<Map<String,Object>> userList = new ArrayList<>();
    List<Map<String,Object>> sessionList = new ArrayList<>();
    String[] friends = user.getFriends().split(",");
    for (String friend : friends){
        userList.add(userService.userToMap(userService.get(Integer.parseInt(friend))));
        Map<String,Object> tmp_map = new HashMap<>();
        tmp_map.put("messages",new ArrayList<>());
        tmp_map.put("userId",Integer.parseInt(friend));
        sessionList.add(tmp_map);
    }
    result.put("userList",userList);
    result.put("sessionList",sessionList);
    return result;
}

需要注意的是,在main.js中需要:

R$_globalVM = new Vue(o["default"])

改造发送逻辑

<div class=m-text><textarea placeholder="按 Ctrl + Enter 发送" v-model=text @keyup=inputing></textarea></div>

找到inputing方法,改造前后对比:

改造前:

inputing: function (e) {
                e.ctrlKey && 13 === e.keyCode && this.text.length && (this.session.messages.push({
                    text: this.text,
                    date: new Date,
                    self: !0 // !0表示是自己发送的消息(其实就是true),0表示是对方发送的消息
                }), this.text = "")
            }

改造后:

inputing: function (e) {
                e.ctrlKey && 13 === e.keyCode && this.text.length && (this.session.messages.push({
                    text: this.text,
                    date: new Date,
                    self: !0 // !0表示是自己发送的消息(其实就是true),0表示是对方发送的消息
                }),R$_send({"message":{
                    text: this.text,
                    date: new Date},
                    // 发给谁的数据
                    "to": this.session.userId,
                    "from":R$_globalVM.$data.user.id,
                    "type":"SINGLE_SENDING"
                }), this.text = "")
            }

改造加载逻辑

改造前:

var i = s(14), o = r(i), n = "VUE-CHAT-v3";
    if (!localStorage.getItem(n)) {
        var a = new Date, l = {
            user: {id: 1, name: "Coffce", img: "dist/images/1.jpg"},
            userList: [{id: 2, name: "示例介绍", img: "dist/images/2.png"}, {
                id: 3,
                name: "webpack",
                img: "dist/images/3.jpg"
            }],
            sessionList: [{
                userId: 2,
                messages: [{
                    text: "Hello,这是一个基于Vue + Webpack构建的简单chat示例,聊天记录保存在localStorge。简单演示了Vue的基础特性和webpack配置。",
                    date: a
                }, {text: "项目地址: https://github.com/coffcer/vue-chat", date: a}]
            }, {userId: 3, messages: []}]
        };
        localStorage.setItem(n, (0, o["default"])(l))
    }
    t["default"] = {
        fetch: function () {
            return JSON.parse(localStorage.getItem(n))
        }, save: function (e) {
            localStorage.setItem(n, (0, o["default"])(e))
        }
    }

改造后:

t["default"] = {
        fetch: function () {
            // 加载历史聊天记录
            return R$_fetch();
        }, save: function (e) {
            // 暂时不做
            //localStorage.setItem(n, (0, o["default"])(e))
            //R$_save((0, o["default"])(e));
        }
    }

ok,至此,大功告成!!至于tomcat服务,对不起,那完全是无关紧要的,不在此处讲解。

看看成果?
运行结果

hah,Jack,Lucy,Mike三人可以愉快地聊天了。

此外,websocket通信也推荐nodejs的socket.io框架,很不错喔!
在这里插入图片描述

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/AZHELL/article/details/84633292
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢