社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
1.登录账号
2.登陆后会显示
3.当另一个用户登录
4.两个用户相互交流不会影响其他用户
下面上代码
pom.xml 添加netty的依赖
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
@SpringBootApplication
public class App implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
//实现CommandLineRunner 重写run方法 这里放了netty的启动
@Override
public void run(String... args) throws Exception {
new NettyService();
}
}
/**
*
* @author Dream
* Netty的服务
*
*/
public class NettyService {
public NettyService() {
/*new Thread(() -> {*/
System.out.println("启动Netty!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyChannelInitializer());
Channel channel = serverBootstrap.bind(9099).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
System.err.println(e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
/*}).start();*/
}
}
/**
*
* @author Dream
*
* 组装handler
*/
public class MyChannelInitializer extends ChannelInitializer {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
pipeline.addLast("handler", new MyMatchingHandler());//每两个匹配房间
}
}
/**
*
* @author Dream
*
* 自定义的Handler
*/
public class MyMatchingHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handshaker;
private ChannelHandlerContext ctx;
private String sessionId;
private String name;
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
if (o instanceof FullHttpRequest) { // 传统的HTTP接入
handleHttpRequest(ctx, (FullHttpRequest) o);
} else if (o instanceof WebSocketFrame) { // WebSocket接入
handleWebSocketFrame(ctx, (WebSocketFrame) o);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
super.close(ctx, promise);
//关闭连接将移除该用户消息
Mage mage = new Mage();
mage.setName(this.name);
mage.setMessage("20002");
//将用户下线信息发送给为下线用户
String table = InformationOperateMapMatching.login.get(this.sessionId);
ConcurrentMap<String, InformationOperateMapMatching> cmap = InformationOperateMapMatching.map.get(table);
if (cmap != null) {
cmap.forEach((id, iom) -> {
try {
if (id != this.sessionId) iom.sead(mage);
} catch (Exception e) {
System.err.println(e);
}
});
}
InformationOperateMapMatching.login.remove(this.sessionId);
InformationOperateMapMatching.map.remove(table);
}
/**
* 处理Http请求,完成WebSocket握手<br/>
* 注意:WebSocket连接第一次请求使用的是Http
* @param ctx
* @param request
* @throws Exception
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 如果HTTP解码失败,返回HTTP异常
if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 正常WebSocket的Http连接请求,构造握手响应返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) { // 无法处理的websocket版本
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else { // 向客户端发送websocket握手,完成握手
handshaker.handshake(ctx.channel(), request);
// 记录管道处理上下文,便于服务器推送数据到客户端
this.ctx = ctx;
}
}
/**
* Http返回
* @param ctx
* @param request
* @param response
*/
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
// 返回应答给客户端
if (response.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(response, response.content().readableBytes());
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(response);
if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 处理Socket请求
* @param ctx
* @param frame
* @throws Exception
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 当前只支持文本消息,不支持二进制消息
if ((frame instanceof TextWebSocketFrame)) {
//获取发来的消息
String text =((TextWebSocketFrame)frame).text();
System.out.println("mage : " + text);
//消息转成Mage
Mage mage = Mage.strJson2Mage(text);
if (mage.getMessage().equals("10001")) {
if (!InformationOperateMapMatching.login.containsKey(mage.getId())) {
InformationOperateMapMatching.login.put(mage.getId(), "");
InformationOperateMapMatching.offer(ctx, mage);
if (queue.size() >= 2) {
String tableId = UUID.randomUUID().toString();
InformationOperateMapMatching iom1 = queue.poll().setTableId(tableId);
InformationOperateMapMatching iom2 = queue.poll().setTableId(tableId);
InformationOperateMapMatching.add(iom1.getChannelHandlerContext(), iom1.getMage());
InformationOperateMapMatching.add(iom2.getChannelHandlerContext(), iom2.getMage());
iom1.sead(iom2.getMage());
iom2.sead(iom1.getMage());
}
} else {//用户已登录
mage.setMessage("-10001");
sendWebSocket(mage.toJson());
ctx.close();
}
} else {
//将用户发送的消息发给所有在同一聊天室内的用户
InformationOperateMapMatching.map.get(mage.getTable()).forEach((id, iom) -> {
try {
iom.sead(mage);
} catch (Exception e) {
System.err.println(e);
}
});
}
//记录id 当页面刷新或浏览器关闭时,注销掉此链路
this.sessionId = mage.getId();
this.name = mage.getName();
} else {
System.err.println("------------------error--------------------------");
}
}
/**
* WebSocket返回
*/
public void sendWebSocket(String msg) throws Exception {
if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
}
//发送消息
this.ctx.channel().write(new TextWebSocketFrame(msg));
this.ctx.flush();
}
}
/**
* 存储信息
*/
public class InformationOperateMapMatching {
public static ConcurrentMap<String, ConcurrentMap<String, InformationOperateMapMatching>> map = new ConcurrentHashMap<>();
public static Queue<InformationOperateMapMatching> queue = new ConcurrentLinkedQueue<>();
public static ConcurrentMap<String, String> login = new ConcurrentHashMap<>();
private ChannelHandlerContext ctx;
private Mage mage;
private InformationOperateMapMatching(ChannelHandlerContext ctx, Mage mage) {
this.ctx = ctx;
this.mage = mage;
}
/**
* 添加到队列当中等待其他用户登录后匹配
* @param ctx
* @param mage
*/
public static void offer(ChannelHandlerContext ctx, Mage mage) {
queue.offer(new InformationOperateMapMatching(ctx, mage));
}
/**
* 添加用户信息
* @param ctx
* @param mage
*/
public static void add(ChannelHandlerContext ctx, Mage mage) {
InformationOperateMapMatching iom = new InformationOperateMapMatching(ctx, mage);
ConcurrentMap<String, InformationOperateMapMatching> cmap = new ConcurrentHashMap<>();
if (map.containsKey(mage.getTable())) {
map.get(mage.getTable()).put(mage.getId(), iom);
} else {
cmap.put(mage.getId(), iom);
map.put(mage.getTable(), cmap);
}
login.replace(mage.getId(), mage.getTable());
}
/**
* 删除用户信息
* @param id
* @param table
*/
public static void delete(String id, String table) {
ConcurrentMap<String, InformationOperateMapMatching> cmap = map.get(table);
if (cmap.size() <= 1) {
map.remove(table);
} else {
cmap.remove(id);
}
}
/**
* 给用户发送消息
* @param mage
* @throws Exception
*/
public void sead(Mage mage) throws Exception{
//this.ctx.channel().write(new TextWebSocketFrame(mage.toJson()));
//this.ctx.flush();
this.ctx.writeAndFlush(new TextWebSocketFrame(mage.toJson()));
}
public Mage getMage() {
return this.mage;
}
public ChannelHandlerContext getChannelHandlerContext() {
return this.ctx;
}
public InformationOperateMapMatching setTableId(String table) {
this.mage.setTableId(table);
return this;
}
}
/**
* 解析消息
* 将前台发过来的消息解析成Mage
* 后台发送消息到前台转成json字符串
*/
@Data
public class Mage {
private static ObjectMapper gson = new ObjectMapper();
/*private static Gson gson = new Gson();*/
/**
* 那个聊天室
*/
private String table;
/**
* 用户id
*/
private String id;
/**
* 用户名
*/
private String name;
/**
* 所发送的消息
*/
private String message;
/**
* 将json字符串转成Mage
* @param message
* @return
* @throws Exception
*/
public static Mage strJson2Mage(String message) throws Exception{
return Strings.isNullOrEmpty(message) ? null : gson.readValue(message, Mage.class);
}
/**
* 将Mage转成json字符串
* @return
* @throws Exception
*/
public String toJson() throws Exception{
return gson.writeValueAsString(this);
}
public Mage setTableId(String table) {
this.setTable(table);
return this;
}
}
前端页面:
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>netty一对一聊天室</title>
<script src="/js/jquery.min.js"></script>
</head>
<body>
<div id="top">请等待匹配</div>
<div id="bottom">
<div id="title"></div>
<div id = 'users'></div>
<div>
<input type="text" id="mag"/>
<input type="button" value="发送" οnclick="send()"/>
</div>
<div id="sed" style="height: 300px;width: 500px;border:1px solid;"></div>
</div>
</body>
<script th:inline="javascript">
var user;
var socket;
$(function() {
$('#bottom').hide();
user = [[${user}]];
console.log(user);
var temp = typeof(user);
console.log(temp);
inChat();
});
//进入聊天室
function inChat() {
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
//获取h5 socket
socket = new WebSocket("ws://127.0.0.1:9099/");
//接收消息
socket.onmessage = function(data){
console.log("socket.onmessage:")
console.log(data);
var mage = JSON.parse(data.data);
console.log(mage.message);
if (mage.message == '10001') {//10001为上线
$('#top').hide();
$('#bottom').show();
$('#title').text('chat' + mage.table);
$('#users').append('<span>'+ mage.name + 't</span>');
user.table = mage.table;
} else if (mage.message == '20002') {//对方下线
$('#bottom').hide();
$('#top').show();
$('#top').text('对方已下线');
socket.close();
} else if (mage.message == '-10001') {//已经在线
$('#bottom').hide();
$('#top').text('已经在线!');
} else {//用户发的消息
$('#sed').append('<span>'+ mage.name + ' : ' + mage.message + '</span><br/>');
}
}
//webSocket的链接
socket.onopen = function(data) {
$('#top').text('链接成功,请等待匹配');
console.log("socket.onopen:")
console.log(data);
user.table = '';
user.message = '10001';
delete user.pwd;
console.log(user);
//链接成功后发送用户信息进入聊天室
socket.send(JSON.stringify(user));
}
//webSocket关闭
socket.onclose = function(data) {
console.log("socket.onclose:")
console.log(data);
}
//webSocket错误信息
socket.onerror = function(data) {
console.log("socket.onerror:")
console.log(data);
}
} else {
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
}
//发送消息
function send() {
user.message = $('#mag').val();
socket.send(JSON.stringify(user));
}
</script>
</html>
源码:https://github.com/dream-broken/springbootEasyFrame如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!