vb.nettyp类 vbnet class

netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:申请域名、网络空间、营销软件、网站建设、潞城网站维护、网站推广。

Java代码

public void eventSunk(

ChannelPipeline pipeline, ChannelEvent e) throws Exception {

if (e instanceof ChannelStateEvent) {

……

} else if (e instanceof MessageEvent) {

MessageEvent event = (MessageEvent) e;

NioSocketChannel channel = (NioSocketChannel) event.getChannel();

boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue

assert offered;

channel.worker.writeFromUserCode(channel);

}

}

WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。

Java代码

public boolean offer(MessageEvent e) {

boolean success = queue.offer(e);

assert success;

int messageSize = getMessageSize(e);

int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);

int highWaterMark = getConfig().getWriteBufferHighWaterMark();

if (newWriteBufferSize = highWaterMark) {

if (newWriteBufferSize - messageSize highWaterMark) {

highWaterMarkCounter.incrementAndGet();

if (!notifying.get()) {

notifying.set(Boolean.TRUE);

fireChannelInterestChanged(AbstractNioChannel.this);

notifying.set(Boolean.FALSE);

}

}

}

return true;

}

fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K

Java代码

public boolean setOption(String key, Object value) {

if (super.setOption(key, value)) {

return true;

}

if ("writeBufferHighWaterMark".equals(key)) {

setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));

} else if ("writeBufferLowWaterMark".equals(key)) {

setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));

} else if ("writeSpinCount".equals(key)) {

setWriteSpinCount(ConversionUtil.toInt(value));

} else if ("receiveBufferSizePredictorFactory".equals(key)) {

setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);

} else if ("receiveBufferSizePredictor".equals(key)) {

setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);

} else {

return false;

}

return true;

}

然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K

Java代码

public MessageEvent poll() {

MessageEvent e = queue.poll();

if (e != null) {

int messageSize = getMessageSize(e);

int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);

int lowWaterMark = getConfig().getWriteBufferLowWaterMark();

if (newWriteBufferSize == 0 || newWriteBufferSize lowWaterMark) {

if (newWriteBufferSize + messageSize = lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下

highWaterMarkCounter.decrementAndGet();

if (isConnected() !notifying.get()) {

notifying.set(Boolean.TRUE);

fireChannelInterestChanged(AbstractNioChannel.this);

notifying.set(Boolean.FALSE);

}

}

}

}

return e;

}

超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的

Java代码

public boolean isWritable() {

return (getInterestOps() OP_WRITE) == 0;

}

public int getInterestOps() {

if (!isOpen()) {

return Channel.OP_WRITE;

}

int interestOps = getRawInterestOps();

int writeBufferSize = this.writeBufferSize.get();

if (writeBufferSize != 0) {

if (highWaterMarkCounter.get() 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1

int lowWaterMark = getConfig().getWriteBufferLowWaterMark();

if (writeBufferSize = lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位低水位,此时等于注册写操作

interestOps |= Channel.OP_WRITE;

} else {

interestOps = ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位高水位,此时等于没有注册写操作

}

} else {//超过高水位counter=0,意味着当前数据量小于高水位

int highWaterMark = getConfig().getWriteBufferHighWaterMark();

if (writeBufferSize = highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作

interestOps |= Channel.OP_WRITE;

} else {

interestOps = ~Channel.OP_WRITE;

}

}

} else {

interestOps = ~Channel.OP_WRITE;//写队列没数据,没有注册写操作

}

return interestOps;

}

即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位低水位 则不可写,否则可写

如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。

netty系列之:一口多用,使用同一端口运行不同协议

在之前的文章中,我们介绍了在同一个netty程序中支持多个不同的服务,它的逻辑很简单,就是在一个主程序中启动多个子程序,每个子程序通过一个BootStrap来绑定不同的端口,从而达到访问不同端口就访问了不同服务的目的。

但是多个端口虽然区分度够高,但是使用起来还是有诸多不便,那么有没有可能只用一个端口来统一不同的协议服务呢?

今天给大家介绍一下在netty中使用同一端口运行不同协议的方法,这种方法叫做port unification。

在讲解自定义port unification之前,我们来看下netty自带的port unification,比如SocksPortUnificationServerHandler。

我们知道SOCKS的主要协议有3中,分别是SOCKS4、SOCKS4a和SOCKS5,他们属于同一种协议的不同版本,所以肯定不能使用不同的端口,需要在同一个端口中进行版本的判断。

具体而言,SocksPortUnificationServerHandler继承自ByteToMessageDecoder,表示是将ByteBuf转换成为对应的Socks对象。

那他是怎么区分不同版本的呢?

在decode方法中,传入了要解码的ByteBuf in,首先获得它的readerIndex:

我们知道SOCKS协议的第一个字节表示的是版本,所以从in ByteBuf中读取第一个字节作为版本号:

有了版本号就可以通过不同的版本号进行处理,具体而言,对于SOCKS4a,需要添加Socks4ServerEncoder和Socks4ServerDecoder:

对于SOCKS5来说,需要添加Socks5ServerEncoder和Socks5InitialRequestDecoder两个编码和解码器:

这样,一个port unification就完成了,其思路就是通过传入的同一个端口的ByteBuf的首字节,来判断对应的SOCKS的版本号,从而针对不同的SOCKS版本进行处理。

在本例中,我们将会创建一个自定义的Port Unification,用来同时接收HTTP请求和gzip请求。

在这之前,我们先看一下两个协议的magic word,也就是说我们拿到一个ByteBuf,怎么能够知道这个是一个HTTP协议,还是传输的一个gzip文件呢?

先看下HTTP协议,这里我们默认是HTTP1.1,对于HTTP1.1的请求协议,下面是一个例子:

HTTP请求的第一个单词就是HTTP请求的方法名,具体而言有八种方法,分别是:

OPTIONS

返回服务器针对特定资源所支持的HTTP请求方法。也可以利用向Web服务器发送'*'的请求来测试服务器的功能性。

HEAD

向服务器索要与GET请求相一致的响应,只不过响应体将不会被返回。这一方法可以在不必传输整个响应内容的情况下,就可以获取包含在响应消息头中的元信息。

GET

向特定的资源发出请求。注意:GET方法不应当被用于产生“副作用”的操作中,例如在Web Application中。其中一个原因是GET可能会被网络蜘蛛等随意访问。

POST

向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中。POST请求可能会导致新的资源的建立和/或已有资源的修改。

PUT

向指定资源位置上传其最新内容。

DELETE

请求服务器删除Request-URI所标识的资源。

TRACE

回显服务器收到的请求,主要用于测试或诊断。

CONNECT

HTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。

那么需要几个字节来区分这八个方法呢?可以看到一个字节是不够的,因为我们有POST和PUT,他们的第一个字节都是P。所以应该使用2个字节来作为magic word。

对于gzip协议来说,它也有特殊的格式,其中gzip的前10个字节是header,其中第一个字节是0x1f,第二个字节是0x8b。

这样我们用两个字节也能区分gzip协议。

这样,我们的handler逻辑就出来了。首先从byteBuf中取出前两个字节,然后对其进行判断,区分出是HTTP请求还是gzip请求:

对应的,我们还需要对其添加相应的编码和解码器,对于gzip来说,netty提供了ZlibCodecFactory:

对于HTTP来说,netty也提供了HttpRequestDecoder和HttpResponseEncoder还有HttpContentCompressor来对HTTP消息进行编码解码和压缩。

添加了编码和解码器之后,如果你想自定义一些操作,只需要再添加自定义的对应的消息handler即可,非常的方便。

本文的例子可以参考: learn-netty4

netty基本使用- socket通信

[netty 基本使用- 作为http服务器][gcssloop]

[gcssloop]:

ServerSocket.java

** ServerInitializer.java **

** ServerHandler.java 处理业务 **

** ClientSocket.java **

** Clientinitializer.java **

**ClientHandler.java 处理业务 **

** 可以多次和服务器端通信的写法 **

netty 常用的处理大数据分包传输问题的解决类。

编码类,自动将

+----------------+

| "HELLO, WORLD" |

+----------------+

格式的数据转换成

+--------+----------------+

+--------+----------------+

格式的数据

[netty 数据分包、组包、粘包处理机制][123]

[123]:

Netty实战6——Netty业务处理线程池的选择

根据我们前面分析的,接收到消息后,为了避免在I/O线程里执行耗时的操作,一般都会使用线程池来执行业务处理逻辑.

那是使用Netty提供给我们的方法,传入一个线程池还是使用我们自己定义的线程池好呢?

先来看Netty给我们提供的

即我们添加handler的时候可以传入一个线程池进去

DefaultEventExecutorGroup

它与NioEventLoop之间的区别又是什么?

其次

也就是说使用netty提供默认的,是绑定的.如下图

如果采用自定义线程池时,优化方向就是锁消除.

可以使用Disruptor或者使用ChannelId与业务线程池中的某个业务进行绑定

链接:

技巧分享-20周-Netty的closeFuture().sync()和close().sync()

Netty模块存在closeFuture().sync()和close().sync()关闭端口,但是使用起来是两种情况

例如f.channel().closeFuture().sync() 是等待服务端监听端口关闭

该方法进行阻塞,等待服务端链路关闭之后继续执行。

这种模式一般都是使用Netty模块主动向服务端发送请求,然后最后结束才使用

例如f.channel().close().sync() 则作为服务端启用Netty模块接收情况使用。

一般在Netty模块对应@PreDestroy方法里面使用该方法来结束服务

如果是用Netty模块作为服务端,在@PreDestroy方法中使用

f.channel().closeFuture().sync() 来停止服务时候,在weblogic部署时会出现停止不了应用情况。

解决这种情况就是将 f.channel().closeFuture().sync() 改 f.channel().close().sync() 即可

如何保证netty执行事件是顺序而且高效

netty实现多个handler顺序调用在netty中,一次数据交互,可以由多个handler去处理,例如handler1和handler2,那么,在前面那个handler的messageReceived的最后要加上ctx.sendUpstream(e);理论请见:AChannelEventcanbehandledbyeitheraChannelUpstreamHandleroraChannelDownstreamHandlerandbeforwardedtotheclosesthandlerbycallingChannelHandlerContext.sendUpstream(ChannelEvent)orChannelHandlerContext.sendDownstream(ChannelEvent).代码:复制代码publicclassHandler1extendsSimpleChannelUpstreamHandler{@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente){System.out.println("1messagereceived");Stringa="11";Objecto=a;ctx.getChannel().write(a);ctx.sendUpstream(e);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsException{e.getChannel().close();}}复制代码复制代码publicclassHandler2extendsSimpleChannelUpstreamHandler{@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente){System.out.println("2messagereceived");e.getChannel().close();}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsException{e.getChannel().close();}}复制代码复制代码publicclassTcpServer{publicstaticvoidmain(String[]args){System.out.println("startingatcpserver");ServerBootstrapsb=newServerBootstrap(newNioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));sb.setPipelineFactory(newPKServerPipelineFactory());sb.setOption("child.tcpNoDelay",true);sb.setOption("child.keepAlive",true);sb.bind(newInetSocketAddress(9999));}}


当前名称:vb.nettyp类 vbnet class
URL标题:http://csdahua.cn/article/hgehid.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流