博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个超级简单的使用netty NIO实现服务端和客户端的栗子
阅读量:4134 次
发布时间:2019-05-25

本文共 6789 字,大约阅读时间需要 22 分钟。

前置条件

  1. IDEA开发工具
  2. maven管理依赖
  3. 熟悉了netty的核心组件,看这个案例会爽一点。如果不是很清楚,可以看我的这篇文章: 

  另外这个案例我直接使用的韩顺平老师的案例。因为我写也是一样,我们就是为了学习,能CV的代码我就不一个一个敲了。 这段代码本来注释已经比较多了,我自己也加了一部分的注释。

引入maven依赖

  先创建一个项目

  在pom引入依赖,轻松搞定,netty给我们准好了一个大而全的包,不需要额外的引入其他的包。

io.netty
netty-all
4.1.20.Final

自定义服务端的处理逻辑代码

   一般自定义的逻辑代码,我们为了方便,都去继承netty的channelHanerAdapter。 这样可以省去一部分通用的功能的开发。如果你想要自己实现,也行。看下图channelHanerAdapter实际上也是实现了 ChannelInboundHandler接口。

  这实际上是一个适配器模式,来减轻我们自定义的业务逻辑接入netty的负担。

package netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/** * @author angus * @create 2021-06-26 12:57 * 说明 * 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范) * 2. 这时我们自定义一个Handler , 才能称为一个handler */public class NettyServerHandler extends ChannelInboundHandlerAdapter{	//读取数据实际(这里我们可以读取客户端发送的消息)	/**	 * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址	 * 2. Object msg: 就是客户端发送的数据 默认Object	 */	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{		System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());		System.out.println("server ctx =" + ctx);		System.out.println("看看channel 和 pipeline的关系");		Channel channel = ctx.channel();		ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站		//将 msg 转成一个 ByteBuf		//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.		ByteBuf buf = (ByteBuf) msg;		System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));		System.out.println("客户端地址:" + channel.remoteAddress());	}	//数据读取完毕	@Override	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {		//writeAndFlush 是 write + flush		//将数据写入到缓存,并刷新		//一般讲,我们对这个发送的数据进行编码		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));	}	//处理异常, 一般是需要关闭通道	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		ctx.close();	}}

 

编写服务端的引导程序

  注释已经比较清晰,我就不赘述了。

package netty.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import netty.NettyServerHandler;/** * @author angus * @create 2021-06-26 12:53 */public class NettyServer {	public static void main(String[] args) {		//创建BossGroup 和 WorkerGroup		//说明		//1. 创建两个线程组 bossGroup 和 workerGroup		//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成		//3. 两个都是无限循环		//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数		//   默认实际 cpu核数 * 2		EventLoopGroup bossGroup = new NioEventLoopGroup(1);		EventLoopGroup workerGroup = new NioEventLoopGroup(); //8		try {			//创建服务器端的启动对象,配置参数, 俗称程序引导器			ServerBootstrap bootstrap = new ServerBootstrap();			//使用链式编程来进行设置			bootstrap.group(bossGroup, workerGroup) //设置两个线程组,服务端是两个线程组,原因就是服务端实际上就是主从的 reactor模型。					.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现, 这里实际上就是传入一个类型,由这个决定了你要使用nio,还是OIO					.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数					.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态					//          .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup					.childHandler(new ChannelInitializer
() {//创建一个通道初始化对象(匿名对象),这个程序里边使用的是匿名对象,你也可以单独拿出去,用一个类实现 ChannelInitializer //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue // 拿到pipeline 放置真正处理业务逻辑的 自定义逻辑处理单元 这个是由使用者定义的。其中NettyServerHandler 是一个channelHandler。 // 因为pipeline里边只能放ChanelHandler,所以自定义的ChanelHandler需要 去实现一个 ChannelInboundHandlerAdapter。通过ChannelInboundHandlerAdapter符合了netty的规范。 ch.pipeline().addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道进行监听 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

 编写client端的处理逻辑

  这个实际上非常简单,就是模拟了一下客户端,发送一条数据。

package netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author angus * @create 2021-06-26 18:16 */public class NettyClientHandler extends ChannelInboundHandlerAdapter {	//当通道就绪就会触发该方法	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		System.out.println("client " + ctx);		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));	}	//当通道有读取事件时,会触发	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		ByteBuf buf = (ByteBuf) msg;		System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));		System.out.println("服务器的地址: " + ctx.channel().remoteAddress());	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		cause.printStackTrace();		ctx.close();	}}

客户端引导程序

package netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author angus * @create 2021-06-26 18:16 */public class NettyClientHandler extends ChannelInboundHandlerAdapter {	//当通道就绪就会触发该方法	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		System.out.println("client " + ctx);		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));	}	//当通道有读取事件时,会触发	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		ByteBuf buf = (ByteBuf) msg;		System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));		System.out.println("服务器的地址: " + ctx.channel().remoteAddress());	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		cause.printStackTrace();		ctx.close();	}}

转载地址:http://cmsvi.baihongyu.com/

你可能感兴趣的文章
A - Average distance HDU - 2376(树形)
查看>>
B - Adding Digits CodeForces - 260A
查看>>
Party at Hali-Bula POJ - 3342(树形dp)
查看>>
E - Balls and Boxes CodeForces - 260C(思维)
查看>>
A - Anniversary party HDU - 1520(没有上司的舞会)
查看>>
B - Greg's Workout CodeForces - 255A(思维)
查看>>
E - Code Parsing CodeForces - 255B(思维)
查看>>
D - Undoubtedly Lucky Numbers CodeForces - 244B(数论 )
查看>>
Minimal coverage(贪心 区间覆盖)
查看>>
201709-5 除法 ccf(树状数组)
查看>>
little w and Segment Coverage(差分)
查看>>
Weak Pair HDU - 5877(dfs+树状数组+离散化+二分)
查看>>
Codeforces Round #572 (Div. 2)(ABCD1D2E)
查看>>
Query on a tree HDU - 3804(线段树求区间最大+树链剖分)
查看>>
Doom HDU - 5239(线段树+思维)
查看>>
Educational Codeforces Round 68 (Rated for Div. 2)(ABCD)
查看>>
Codeforces Round #535 (Div. 3)(ABCDE1E2)
查看>>
权值线段树小结(hdu多校,普通平衡树,郁闷的出纳员)
查看>>
Basketball Exercise CodeForces - 1195C(动态规划dp)
查看>>
Codeforces Global Round 4(ABCDE)
查看>>