社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
目录
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
package com.superman.testnetty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 程序的入口,负责启动应用
*Timely communication
*/
public class TCNettyServer {
private int port;
public TCNettyServer(int port) {
this.port = port;
}
public void run() {
// 启用两个Reactor线程池【netty是基于NIO的,基于线程处理的】
// 用于接收Client端连接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 进行网络通信读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup);
// 指定使用NioServerSocketChannel这种类型的通道
b.channel(NioServerSocketChannel.class);
/*
* 使用 childHandler 去初始化服务器 添加handler,用来监听已经连接的客户端的Channel的动作和状态。
*
* 被绑定的MyWebSocketChannelHandler()里面设置了服务端初始化参数以及
*/
b.childHandler(new TCWebSocketChannelHandler());
System.out.println("netty服务端开启等待客户端连接....");
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new TCNettyServer(8888).run();
}
}
package com.superman.testnetty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* 初始化连接时候的各个组件
* Timely communication
*
*/
public class TCWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec", new HttpServerCodec());
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 添加具体的处理器。可以addLast(或者addFirst)多个handler,
// 第一个参数是名字,无具体要求,如果填写null,系统会自动命名。
e.pipeline().addLast("handler", new TCWebSocketHandler());
/**通过使用管道的ChannelPipeline方式来处理请求
* 第一个配置的管道先处理,然后移交给下一个管道来处理,在每个管道处理中
* 各个handler可以决定是否继续或中断
* ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器{@link ChannelPipeline}
* Netty中的事件分为inbound事件和outbound事件。
* inbound事件通常由I/O线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。方法名以file开始{@link ChannelHandlerContext}
* outbound事件类似于发送、刷新、断开连接、绑定本地地址等关闭channel
*/
}
}
package com.superman.testnetty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 接收/处理/响应客户端websocket请求的核心业务处理类
* 通过添加hanlder,我们可以监听Channel的各种动作以及状态的改变,包括连接,绑定,接收消息等。
*
* Timely communication
*/
public class TCWebSocketHandler extends SimpleChannelInboundHandler<Object> {
// 用于服务器端web套接字打开和关闭握手
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "/websocket";
//客户端与服务端创建连接的时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
TCChannelManage.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启,客户端remoteAddress:" + ctx.channel().remoteAddress());
}
//客户端与服务端断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TCChannelManage.group.remove(ctx.channel());
System.out.println("客户端与服务端连接关闭...");
}
//服务端接收客户端发送过来的数据结束之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//服务端处理客户端websocket请求的核心方法
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
/* 传统的HTTP接入(采用http处理方式)
* 第一次握手请求消息由HTTP协议承载,所以它是一个HTTP消息,
* 握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
* 执行handleHttpRequest方法来处理WebSocket握手请求。
*/
// FullHttpRequest是完整的 HTTP请求,协议头和Form数据是在一起的,不用分开读
if (msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest) msg);
}
/**
* WebSocket接入(采用socket处理方式)
* 提交请求消息给服务端,
* WebSocketServerHandler接收到的是已经解码后的WebSocketFrame消息。
*/
else if (msg instanceof WebSocketFrame) {
handWebsocketFrame(context, (WebSocketFrame) msg);
}
/**
* Websocket的数据传输是frame形式传输的,比如会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:
*
* 1)大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。
*
* 2)和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。
*/
}
/**
* 处理客户端与服务端之前的websocket业务
*
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
//判断是否是关闭websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
}
//判断是否是ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// 返回应答消息
String requestMsg = ((TextWebSocketFrame) frame).text();
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "的消息==》" + requestMsg);
String[] array = requestMsg.split(",");
// 先判断通道管理器中是否存在该通道,没有则添加进去
if (!TCChannelManage.hasChannel(array[0])) {
TCChannelManage.userIdAndChannelMap.put(array[0], ctx.channel());
}
if (array[0].length() != 0 && array[1].length() != 0) {
TCChannelManage.send(array[0], array[1], array[2], ctx.channel());
} else if (array[0].length() != 0 && array[1].length() == 0) {
//如果没有指定接收者表示群发array.length() = 2
System.out.println("用户" + array[0] + "群发了一条消息:" + array[2]);
TCChannelManage.group.writeAndFlush(new TextWebSocketFrame("用户" + array[0] + "群发了一条消息:" + array[2]));
} else {
//如果没有指定发送者与接收者表示向服务端发送array.length() = 1
System.out.println("服务端接收用户" + ctx.channel().remoteAddress() + "消息,不再发送出去");
ctx.writeAndFlush(new TextWebSocketFrame("你向服务端发送了消息==》" + array[2]));
}
}
}
/**
* 处理客户端向服务端发起http握手请求的业务
*
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
System.out.println("处理http请求,http方法==>>" + req.getMethod() + ",http地址==>>" + req.getUri());
Map<String, String> parmMap = new HashMap<>();
try {
parmMap = parse(req);
} catch (IOException e) {
e.printStackTrace();
}
// 如果不是WebSocket握手请求消息,那么就返回 HTTP 400 BAD REQUEST 响应给客户端。
if (!req.getDecoderResult().isSuccess()
|| !("websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//如果是握手请求,那么就进行握手
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
// 通过它构造握手响应消息返回给客户端,
// 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码,
// 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 服务端向客户端响应消息
*
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 解析GET、POST请求参数
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
*/
public Map<String, String> parse(FullHttpRequest fullReq) throws IOException {
HttpMethod method = fullReq.getMethod();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.getUri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(
new DefaultHttpDataFactory(false), fullReq);
List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
for(InterfaceHttpData data:postData){
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
parmMap.put(attribute.getName(), attribute.getValue());
}
}
} else {
// 不支持其它方法
System.out.println("不支持其他方法提交的参数");
}
return parmMap;
}
@Override
protected void channelRead0(ChannelHandlerContext arg0, Object arg1)
throws Exception {
messageReceived(arg0,arg1);
}
}
package com.superman.testnetty;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 存储整个工程的全局配置
* Timely communication
*
*/
public class TCChannelManage {
/**
* 存储每一个客户端接入进来时的channel对象
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 读锁
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
public static ConcurrentMap<String, Channel> userIdAndChannelMap = new ConcurrentHashMap<>();
public static void send(String senderId, String receiverId, String message, Channel senderChannel) {
// 发送肯定是A要给B发,A就是发消息的对象,B可以是人,机器等对象
try {
rwLock.readLock().lock();
// 1.寻找receiverId的channel
Channel receiverChannel = userIdAndChannelMap.get(receiverId);
if (receiverChannel == null) {
// 使用发送者的通道告知发送者,你要发的那个人不在线
senderChannel.writeAndFlush(new TextWebSocketFrame(receiverId + "不在线"));
return;
}
// 2.发送。A给B发,B若要收到消息,其实是通过B的channel给B发消息
receiverChannel.writeAndFlush(new TextWebSocketFrame(senderId + "发来消息===》" + message));
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
public static boolean hasChannel(String id) {
Channel channel = userIdAndChannelMap.get(id);
if (channel == null) {
return false;
} else {
return true;
}
}
}
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
<title>WebSocket客户端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage = function(event){
var ta = document.getElementById('responseContent');
ta.value += event.data + "rn";
};
socket.onopen = function(event){
var ta = document.getElementById('responseContent');
ta.value = "你当前的浏览器支持WebSocket,请进行后续操作rn";
};
socket.onclose = function(event){
var ta = document.getElementById('responseContent');
ta.value = "";
ta.value = "WebSocket连接已经关闭rn";
};
}else{
alert("您的浏览器不支持WebSocket");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!!");
}
}
</script>
</head>
<body>
<H3>及时唠嗑</H3>
<form onSubmit="return false;">
<input type = "text" name = "senderId" placeholder="发送者唯一id"/>
<br/><br/>
<input type = "text" name = "receiverId" placeholder="接收者唯一id"/>
<br/><br/>
<input type = "text" name = "message" placeholder="消息内容"/>
<br/><br/>
<input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.senderId.value+','+this.form.receiverId.value+','+this.form.message.value)"/>
<hr color="red"/>
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>
</form>
</body>
</html>
https://download.csdn.net/download/weixin_42749765/11263481
java -jar -server -Xms4G -Xmx4G -XX:NewSize=3584m -XX:PermSize=64m -XX:SurvivorRatio=1 -XX:+UseParallelGC -XX:-UseAdaptiveSizePolicy
轻松处理器支持1w往上
ok
持续更新
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!