扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:申请域名、网络空间、营销软件、网站建设、潞城网站维护、网站推广。
Java代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize = highWaterMark) {
if (newWriteBufferSize - messageSize highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if ("receiveBufferSizePredictorFactory".equals(key)) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize lowWaterMark) {
if (newWriteBufferSize + messageSize = lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable() {
return (getInterestOps() OP_WRITE) == 0;
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize = lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位低水位,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps = ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位高水位,此时等于没有注册写操作
}
} else {//超过高水位counter=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize = highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
} else {
interestOps = ~Channel.OP_WRITE;
}
}
} else {
interestOps = ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
}
return interestOps;
}
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
在之前的文章中,我们介绍了在同一个netty程序中支持多个不同的服务,它的逻辑很简单,就是在一个主程序中启动多个子程序,每个子程序通过一个BootStrap来绑定不同的端口,从而达到访问不同端口就访问了不同服务的目的。
但是多个端口虽然区分度够高,但是使用起来还是有诸多不便,那么有没有可能只用一个端口来统一不同的协议服务呢?
今天给大家介绍一下在netty中使用同一端口运行不同协议的方法,这种方法叫做port unification。
在讲解自定义port unification之前,我们来看下netty自带的port unification,比如SocksPortUnificationServerHandler。
我们知道SOCKS的主要协议有3中,分别是SOCKS4、SOCKS4a和SOCKS5,他们属于同一种协议的不同版本,所以肯定不能使用不同的端口,需要在同一个端口中进行版本的判断。
具体而言,SocksPortUnificationServerHandler继承自ByteToMessageDecoder,表示是将ByteBuf转换成为对应的Socks对象。
那他是怎么区分不同版本的呢?
在decode方法中,传入了要解码的ByteBuf in,首先获得它的readerIndex:
我们知道SOCKS协议的第一个字节表示的是版本,所以从in ByteBuf中读取第一个字节作为版本号:
有了版本号就可以通过不同的版本号进行处理,具体而言,对于SOCKS4a,需要添加Socks4ServerEncoder和Socks4ServerDecoder:
对于SOCKS5来说,需要添加Socks5ServerEncoder和Socks5InitialRequestDecoder两个编码和解码器:
这样,一个port unification就完成了,其思路就是通过传入的同一个端口的ByteBuf的首字节,来判断对应的SOCKS的版本号,从而针对不同的SOCKS版本进行处理。
在本例中,我们将会创建一个自定义的Port Unification,用来同时接收HTTP请求和gzip请求。
在这之前,我们先看一下两个协议的magic word,也就是说我们拿到一个ByteBuf,怎么能够知道这个是一个HTTP协议,还是传输的一个gzip文件呢?
先看下HTTP协议,这里我们默认是HTTP1.1,对于HTTP1.1的请求协议,下面是一个例子:
HTTP请求的第一个单词就是HTTP请求的方法名,具体而言有八种方法,分别是:
OPTIONS
返回服务器针对特定资源所支持的HTTP请求方法。也可以利用向Web服务器发送'*'的请求来测试服务器的功能性。
HEAD
向服务器索要与GET请求相一致的响应,只不过响应体将不会被返回。这一方法可以在不必传输整个响应内容的情况下,就可以获取包含在响应消息头中的元信息。
GET
向特定的资源发出请求。注意:GET方法不应当被用于产生“副作用”的操作中,例如在Web Application中。其中一个原因是GET可能会被网络蜘蛛等随意访问。
POST
向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中。POST请求可能会导致新的资源的建立和/或已有资源的修改。
PUT
向指定资源位置上传其最新内容。
DELETE
请求服务器删除Request-URI所标识的资源。
TRACE
回显服务器收到的请求,主要用于测试或诊断。
CONNECT
HTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。
那么需要几个字节来区分这八个方法呢?可以看到一个字节是不够的,因为我们有POST和PUT,他们的第一个字节都是P。所以应该使用2个字节来作为magic word。
对于gzip协议来说,它也有特殊的格式,其中gzip的前10个字节是header,其中第一个字节是0x1f,第二个字节是0x8b。
这样我们用两个字节也能区分gzip协议。
这样,我们的handler逻辑就出来了。首先从byteBuf中取出前两个字节,然后对其进行判断,区分出是HTTP请求还是gzip请求:
对应的,我们还需要对其添加相应的编码和解码器,对于gzip来说,netty提供了ZlibCodecFactory:
对于HTTP来说,netty也提供了HttpRequestDecoder和HttpResponseEncoder还有HttpContentCompressor来对HTTP消息进行编码解码和压缩。
添加了编码和解码器之后,如果你想自定义一些操作,只需要再添加自定义的对应的消息handler即可,非常的方便。
本文的例子可以参考: learn-netty4
[netty 基本使用- 作为http服务器][gcssloop]
[gcssloop]:
ServerSocket.java
** ServerInitializer.java **
** ServerHandler.java 处理业务 **
** ClientSocket.java **
** Clientinitializer.java **
**ClientHandler.java 处理业务 **
** 可以多次和服务器端通信的写法 **
netty 常用的处理大数据分包传输问题的解决类。
编码类,自动将
+----------------+
| "HELLO, WORLD" |
+----------------+
格式的数据转换成
+--------+----------------+
+--------+----------------+
格式的数据
[netty 数据分包、组包、粘包处理机制][123]
[123]:
根据我们前面分析的,接收到消息后,为了避免在I/O线程里执行耗时的操作,一般都会使用线程池来执行业务处理逻辑.
那是使用Netty提供给我们的方法,传入一个线程池还是使用我们自己定义的线程池好呢?
先来看Netty给我们提供的
即我们添加handler的时候可以传入一个线程池进去
DefaultEventExecutorGroup
它与NioEventLoop之间的区别又是什么?
其次
也就是说使用netty提供默认的,是绑定的.如下图
如果采用自定义线程池时,优化方向就是锁消除.
可以使用Disruptor或者使用ChannelId与业务线程池中的某个业务进行绑定
链接:
Netty模块存在closeFuture().sync()和close().sync()关闭端口,但是使用起来是两种情况
例如f.channel().closeFuture().sync() 是等待服务端监听端口关闭
该方法进行阻塞,等待服务端链路关闭之后继续执行。
这种模式一般都是使用Netty模块主动向服务端发送请求,然后最后结束才使用
例如f.channel().close().sync() 则作为服务端启用Netty模块接收情况使用。
一般在Netty模块对应@PreDestroy方法里面使用该方法来结束服务
如果是用Netty模块作为服务端,在@PreDestroy方法中使用
f.channel().closeFuture().sync() 来停止服务时候,在weblogic部署时会出现停止不了应用情况。
解决这种情况就是将 f.channel().closeFuture().sync() 改 f.channel().close().sync() 即可
netty实现多个handler顺序调用在netty中,一次数据交互,可以由多个handler去处理,例如handler1和handler2,那么,在前面那个handler的messageReceived的最后要加上ctx.sendUpstream(e);理论请见:AChannelEventcanbehandledbyeitheraChannelUpstreamHandleroraChannelDownstreamHandlerandbeforwardedtotheclosesthandlerbycallingChannelHandlerContext.sendUpstream(ChannelEvent)orChannelHandlerContext.sendDownstream(ChannelEvent).代码:复制代码publicclassHandler1extendsSimpleChannelUpstreamHandler{@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente){System.out.println("1messagereceived");Stringa="11";Objecto=a;ctx.getChannel().write(a);ctx.sendUpstream(e);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsException{e.getChannel().close();}}复制代码复制代码publicclassHandler2extendsSimpleChannelUpstreamHandler{@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente){System.out.println("2messagereceived");e.getChannel().close();}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsException{e.getChannel().close();}}复制代码复制代码publicclassTcpServer{publicstaticvoidmain(String[]args){System.out.println("startingatcpserver");ServerBootstrapsb=newServerBootstrap(newNioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));sb.setPipelineFactory(newPKServerPipelineFactory());sb.setOption("child.tcpNoDelay",true);sb.setOption("child.keepAlive",true);sb.bind(newInetSocketAddress(9999));}}
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流