一、概述
本文使用Netty实现了简单的聊天逻辑,旨在理解Netty的API使用,粘包拆包,编解码,处理器链的内容。
应用层需要对接收到的TCP数据包的粘包拆包情况进行必要处理,粘包拆包如下图:

参考:
- servlet是如何处理粘包问题的?
- 怎样判断已经接收完HTTP客户端的请求数据?
- tomcat NIO处理报文 是否需要拆包 粘包
二、Server端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
public class NettyServer { private static final int LISTEN_PORT = 9000; private static final ByteBuf DELIMITER = Unpooled.copiedBuffer("_".getBytes()); private static final int MAX_FRAME_LENGTH = 1024;
public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, DELIMITER)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ChatServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(LISTEN_PORT).sync(); System.out.println("ChatServer started.");
channelFuture.channel().closeFuture().await(); }
static class ChatServerHandler extends SimpleChannelInboundHandler { private static ChannelGroup onlineClients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override public void channelActive(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String msg = String.format("Client[%s] online", channel.remoteAddress());
onlineClients.add(channel); System.out.println(msg); onlineClients.writeAndFlush(msg + "_", itemChannel -> !itemChannel.equals(channel)); }
@Override public void channelInactive(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String msg = String.format("Client[%s] offline", channel.remoteAddress());
onlineClients.remove(channel); System.out.println(msg + "_"); onlineClients.writeAndFlush(msg + "_", itemChannel -> !itemChannel.equals(channel)); }
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); msg = String.format("Client[%s] says: %s", channel.remoteAddress(), msg);
System.out.println(msg); onlineClients.writeAndFlush(msg + "_", itemChannel -> !itemChannel.equals(channel)); } } }
|
三、Client端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
public class NettyClient { private static final int LISTEN_PORT = 9000; private static final ByteBuf DELIMITER = Unpooled.copiedBuffer("_".getBytes()); private static final int MAX_FRAME_LENGTH = 1024;
public static void main(String[] args) throws InterruptedException { EventLoopGroup eventExecutors = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, DELIMITER)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ChatClientHandler()); } });
ChannelFuture channelFuture = bootstrap.connect("localhost", LISTEN_PORT); channelFuture.await(); System.out.println("Client connected server.");
sendMsgWithConsole(channelFuture.channel());
channelFuture.channel().closeFuture().await(); }
private static void sendBatchMsg(Channel channel) { int i = 20; while (i-- > 0) { ByteBuf byteBuf = Unpooled.copiedBuffer("I am here._".getBytes()); channel.writeAndFlush(byteBuf); } }
private static void sendMsgWithConsole(Channel channel) { Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); System.out.println("Console input:" + s); channel.writeAndFlush(s + "_"); } }
static class ChatClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Client online"); }
@Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("Client offline"); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(msg); } } }
|
# 反应器多线程模式
参考:Scalable_IO_in_Java
