社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
HttpRequestEncoder | 对 HTTP 请求进行解码,用于服务端入站 |
HttpResponseEncoder | 对 HTTP 响应进行解码,用于客户端入站 |
HttpRequestDecoder | 对 HTTP 请求进行编码,用于客户端出站 |
HttpResponseDecoder | 对 HTTP 响应进行编码,用于服务端出站 |
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder()); //Http请求入站解码
pipeline.addLast("encoder", new HttpResponseEncoder()); //Http响应出站编码
}
}
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpResponseDecoder()); //Http响应入站解码
pipeline.addLast("encoder", new HttpRequestEncoder()); //Http请求出站编码
}
}
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new HttpServerChannelInitializer())
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = bootstrap.bind().sync();
future.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new HttpServer(23333).start();
}
//HttpSevrer channlHandler 初始器
private class HttpServerChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//入站 Http 请求解码
ch.pipeline().addLast("httpDecode", new HttpRequestDecoder());
//出站 Http 响应编码
ch.pipeline().addLast("httpEncode", new HttpResponseEncoder());
//服务端业务处理器
ch.pipeline().addLast("httpHandler", new HttpServerHandler());
}
}
}
public class HttpServerHandler extends ChannelInboundHandlerAdapter{
private ByteBufReader reader;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印 HttpRequest header 和 content
if(msg instanceof HttpRequest){
HttpRequest request = (HttpRequest)msg;
System.out.println("HttpRequest Header:" + request.headers());
reader = new ByteBufReader((int)HttpUtil.getContentLength(request));
}else if(msg instanceof HttpContent){
HttpContent httpContent = (HttpContent)msg;
ByteBuf buf = httpContent.content();
reader.reading(buf);
buf.release();
if(reader.isEnd()){
String content = reader.getStringContent(Charset.forName("UTF-8"));
System.out.println("HttpRequest Content: "+ content);
String responseContent = "[server check] - " + content;
//构建HttpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);
//添加响应正文
response.content().writeBytes(Unpooled.wrappedBuffer(responseContent.getBytes(CharsetUtil.UTF_8)));
//修改响应头
response.headers().set("Content-Type","text/plain;charset=utf-8");
response.headers().set("Content-Length",response.content().readableBytes());
response.headers().set("Connection",HttpHeaderValues.KEEP_ALIVE);
//发送 HttpResponse
ctx.writeAndFlush(response);
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class ByteBufReader {
private ByteBuf bufTmp; //ByteBuf 内部储存
private boolean end; //标记 ByteBuf 是否已经写入结束
public ByteBufReader(int length) {
bufTmp = Unpooled.buffer(length);
}
public boolean isEnd() {
return end;
}
//读取缓冲区数据
public void reading(ByteBuf datas){
datas.readBytes(bufTmp,datas.readableBytes());
if(bufTmp.writableBytes() == 0)
end = true;
}
//获取缓冲区数据的字节组,该方法是一次性的
public byte[] getBytesContent(){
if(end){
byte[] content = new byte[bufTmp.readableBytes()];
bufTmp.readBytes(content);
bufTmp.release();
end = false;
return content;
}else{
return null;
}
}
//获取缓冲区数据的转化字符串,该方法是一次性的
public String getStringContent(Charset charset){
byte[] bytes = getBytesContent();
if(bytes == null)
return null;
return new String(bytes,charset);
}
}
public class HttpClient {
public void connect(String host,int port) throws InterruptedException, URISyntaxException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.remoteAddress(host,port)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new HttpClientChannelInitializer());
ChannelFuture future = bootstrap.connect().sync();
//向服务端发送请求
URI uri = new URI("http://" + host + ":" + port);
String requestContent = "Netty rockets!";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,uri.toASCIIString());
//添加请求正文
request.content().writeBytes(Unpooled.wrappedBuffer(requestContent.getBytes(CharsetUtil.UTF_8)));
//添加请求头
request.headers().set("Host",host+":"+port);
request.headers().set("Content-Type","text/plain;charset=utf-8");
request.headers().set("Content-Length",requestContent.length());
request.headers().set("Connection",request.content().readableBytes());
//发送请求
future.channel().writeAndFlush(request);
future.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException, URISyntaxException {
new HttpClient().connect("127.0.0.1",23333);
}
//HttpSevrer channlHandler 初始器
private class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//入站 Http 响应解码
ch.pipeline().addLast("httpDecode", new HttpResponseDecoder());
//出站 Http 请求编码
ch.pipeline().addLast("httpEncode", new HttpRequestEncoder());
//客户端业务处理器
ch.pipeline().addLast("httpHandler", new HttpClientHandler());
}
}
}
public class HttpClientHandler extends ChannelInboundHandlerAdapter{
private ByteBufReader reader;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印 HttpReponse header 和 content
if(msg instanceof HttpResponse){
HttpResponse response = (HttpResponse)msg;
System.out.println("HttpResponse Header: "+ response.headers());
reader = new ByteBufReader((int) HttpUtil.getContentLength(response));
}else if(msg instanceof HttpContent){
HttpContent httpContent = (HttpContent)msg;
ByteBuf buf = httpContent.content();
reader.reading(buf);
buf.release();
if(reader.isEnd()){
String content = reader.getStringContent(CharsetUtil.UTF_8);
System.out.println("HttpResponse Content:" + content);
ctx.close();
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//刷新缓冲区并关闭该远程连接
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
HttpRequest Header:[Host: 127.0.0.1:23333, Content-Type: text/plain;charset=utf-8, Content-Length: 14, Connection: 14] HttpRequest Content: Netty rockets! |
HttpResponse Header:[Content-Type: text/plain;charset=utf-8, Content-Length: 31, Connection: keep-alive] HttpResponse Content:[server check] - Netty rockets! |
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!