博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocket mq通讯模型源码分析
阅读量:5877 次
发布时间:2019-06-19

本文共 21319 字,大约阅读时间需要 71 分钟。

hot3.png

 

该章节主要深入分析rmq是如何构建自己的通讯模型的,主要从以下四点分析:

1、总概论述

2、rmq的通讯协议实现。

3、rmq的三种通讯方式。

4、rmq的线程处理模型。

 

1、总概论述

对于rocket mq的通讯来说,也就是rpc调用的实现,例如,producer端向broker发起一次send(message), 其实就是向broker发起一次rpc,broker再根据通讯方式,响应produer的处理结果。

本文主要是对rmq如何设计通讯协议,如何实现rpc三种调用方式,其中包括invokeSync、invokeAsync、invokeOneway以及如何设计线程处理模型来处理业务。这里稍微提一下,这里不会涉及太多netty的细节,以及本章的阅读对象最好熟悉netty的使用。

 

2、rmq的通讯协议实现

rmq使用netty作为主要的通讯框架,当然,高可用部分纯粹使用nio来实现,这部分等以后讲到rmq高可用是如何实现时再来详细分析。这里主要分析的是rmq的remote,也即通讯模块。

先看一下rmq是如何抽象的,类图2-1:

2-1

总结一下:    NettyRemotingAbstract,该类主要是rmq三种通讯方式以及线程处理模型的一个抽象。    NettyRemotingClient,该类主要负责客户端的通讯处理。    NettyRemotingServer,该类主要负责服务端的通讯处理。    RemotingCommand,该类是所有远程处理请求的抽象。

我们先看一下rmq是如何使用netty的:

图2-2

绝大部分使用过netty框架的程序猿来封装自己的通讯模块时,基本上都是使用图1-2的代码流程,这里稍微研读一下。

2.1 rmq对于reactor模型的实现选择:

if(RemotingUtil.isLinuxPlatform()&& nettyServerConfig.isUseEpollNativeSelector()) {  this.eventLoopGroupSelector=newEpollEventLoopGroup(..);}else{  this.eventLoopGroupSelector=newNioEventLoopGroup(..);}

从源码上可以看出,如果jvm所处的是linux平台,则会选择使用netty自己重新实现的一套reactor模型,使用平台相关的EventLoopGroup,这可以产生更少的垃圾回收以及更快的性能。这里再单独说一下epoll和poll的核心区别,epoll轮询所有有效的注册事件,例如连接成功事件,可写事件或可读事件;而poll是无差别轮询,即会轮询所有的事件。因此性能上epoll会更快更有效。

2.2 netty的配置选项:

rmq使用PooledByteBufAllocator.DEFAULT属性,表明ByteBuf的分配方式是使用池化技术,相比普通方式创建的ByteBuf,也即朝生夕灭的java对象,性能相对来说会快不少。

2.3 netty Handler:

2.3.1 NettyEncoder 编码器

在说协议编码之前,我们先看看rmq的协议是如何设计的,下图是截取rmq源码的一个协议设计说明:

图2-3

从图中可以看出rmq的通讯协议是挺简洁的。

length:消息总长度,int 类型 ,4字节

header length:消息头长度,int 类型 ,4字节
header data :消息头字节数据
body data:消息体字节数据

接下来,我们看看NettyEncoder的核心实现,以下代码为抽取主干的代码:

public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)            throws Exception {      ....            //step1,编码消息头            ByteBuffer header = remotingCommand.encodeHeader();            //step2,写入消息头的            out.writeBytes(header);            //step3,消息体由客户端编码            byte[] body = remotingCommand.getBody();            if (body != null) {                out.writeBytes(body);            ....    }

接着跟入remotingCommand.encodeHeader():

public ByteBuffer encodeHeader(final intbodyLength) {// 消息总长度int length =4;//存放消息头的字节内容byte[] headerData;//step1:正真执行消息头的编码headerData =this.headerEncode();//累加消息头长度length += headerData.length;//累加消息体的长度,到这里其实整个消息的长度已算出length += bodyLength//应为这里只存放消息头的内容,因此需要把消息体内容除去。ByteBuffer result = ByteBuffer.allocate(4+ length - bodyLength);result.putInt(length);//step2:接下来的4个字节,需要通过位移以及掩码运算才可以得出headerLength 以及序列化方式。result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));....}

这里rocketmq 的序列化方式有两种,一种是SerializeType.JSON,使用fastjson序列化;另外一种就是SerializeType.ROCKETMQ,根据协议头所需字段逐个写入。进入rocketMQProtocolEncode方法后,说一下几个关键字段:

2-4

如上代码片段,有三个红圈的地方,一个就是extFields,该字段存储的是消息的扩张字段,例如我们做分布式跟踪的时候,就可以把traceId放入;第二个是opaque,该字段是一次rpc请求的标识;最后一个是flag,这个是系统消息的掩码标识,例如消息是否压缩等等。

headerEncode()分析完后,我们看另外一个方法markProtocolType(headerData.length,serializeTypeCurrentRPC),进入该方法看看:

byte[] result =new byte[4];result[0] = type.getCode();result[1] = (byte) ((source >>16) &0xFF);result[2] = (byte) ((source >>8) &0xFF);result[3] = (byte) (source &0xFF);return result;

总结来说就是一个4字节、int类型的header length存储位,前面高一位的字节是序列化方式,后面低24位则是真正的消息头部长度;为什么是24位呢?我们看一下NettyDecoder.FRAME_MAX_LENGTH属性,就是消息解码时,最大的数据帧长度为16777216也就是2^24 - 1,刚好是24位,表明每条消息的大小不能超过16m。

到这里为止,我们分析完了NettyEncoder编码器,NettyDecoder是一个逆过程,这里读者可以自行分析了。

2.3.2 IdleStateHandler编码器:

该Handler是netty自带的一个读写空闲监测处理器,rmq集群内所有端到端的连接方式均采取长连接,所以需要空闲监测,通过该方式,可以主动释放一些无用、空闲的连接。当监听到例如写事件过时时,表明该连接是空闲的,因此会传播一个IdleStateEvent,NettyConnetManageHandler收到该事件以后,会异步关闭该连接。NettyConnetManageHandler会异步处理所有抛出异常信息的连接通道。

2.3.3NettyServerHandler编码器:

该handler就是具体的业务处理器了,该处理器在下面讲到rmq的线程处理模型时再分析。

以上就是rmq如何使用netty构造自己的通讯协议以及一些用法。

 

3、rmq的三种通讯方式

rmq的rpc通讯方式刚刚提到,一共有三种。分别是invokesync同步调用,同步等待结果;invokeAsync同步调用,异步回调结果;invokeoneway,同步调用,直接返回。

用订外卖来举例说明,A打电话给B订外卖,invokesync调用就是A会一直等待B,直到B把外卖送到,期间,A什么也做不了;invokeAsync就是A打给B明确订外卖以后,A会先去处理别的事情,等外卖到了,再由B主动通知A拿外卖,这时A才放下手中的活,去拿外卖;invokeoneway就是A直接告诉B去订外卖,等B明确收到通知后,A就不关注结果了,B收到订外卖的通知后,就直接准备外卖,但也不会通知A了。

我们分析一下NettyRemotingAbstract通讯抽象是如何实现的。

3.1 invokeSync:

public RemotingCommand invokeSyncImpl(...) {        //获取请求标识,该标识在RemotingCommand 创建时,便经过 int opaque = requestId.getAndIncrement()生成。        //而RemotingCommand.requestId = new AtomicInteger(0) ,该属性是属于RemotingCommand的类属性,并且getAndIncrement是线程安全的,        //所以,在RemotingCommand 创建实例变量时,便可以生成一个唯一的opaque标识了。        final int opaque = request.getOpaque();        try {            //rmq使用CountDownLatch实现了同步调用的Future模式            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);            //step 1->responseTable.put            //放入响应缓存里,key为opaque,可以以responseFuture一一对应            this.responseTable.put(opaque, responseFuture);            final SocketAddress addr = channel.remoteAddress();             //step 2->channel.writeAndFlush(request),正真发起网络请求            //这里使用了netty 的ChannelFutureListener为了监听发送结果,这里使用了闭包的方式实现理,这里使用了闭包的方式实现            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {            //step 3-> 注册发送消息结果,ChannelFuture返回true,则说明发送消息成功了。但任然需要等待响应。注册该监听器,是为了避免发送失败,但客户端任然在等待,直到超时的情况,否则,如果短时间内被调用方不可用,就会导致大量线程在闲置等待响应结果。                @Override                public void operationComplete(ChannelFuture f) throws Exception {                    if (f.isSuccess()) {                        responseFuture.setSendRequestOK(true);                        return;                    } else {                        responseFuture.setSendRequestOK(false);                    }                    responseTable.remove(opaque);                    responseFuture.setCause(f.cause());                    responseFuture.putResponse(null);                    PLOG.warn("send a request command to channel <" + addr + "> failed.");                }            });            //step 4->responseFuture.waitResponse,这里同步等待远程调用的响应结果            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);            if (null == responseCommand) {                if (responseFuture.isSendRequestOK()) {                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,                        responseFuture.getCause());                } else {                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());                }            }            return responseCommand;        } finally {            //这里一定要移除缓存的请求            this.responseTable.remove(opaque);        }    }    //step 5->这里直接跳到,作为调用方处理被调用方返回的响应结果    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {        final int opaque = cmd.getOpaque();        //step 6->通过返回的响应请求标识,得到相对应的ResponseFuture         final ResponseFuture responseFuture = responseTable.get(opaque);        if (responseFuture != null) {            responseFuture.setResponseCommand(cmd);            responseFuture.release();            responseTable.remove(opaque);            if (responseFuture.getInvokeCallback() != null) {                executeInvokeCallback(responseFuture);            } else {                //step 7->//解除同步等待                responseFuture.putResponse(cmd);            }        } else {            PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));            PLOG.warn(cmd.toString());        }    }

总结一下invokeSync的实现:

step1,第一步,先创建一个FutureResponse作为value,并通过opaque,也即请求标识作为key,放入responseTable结果缓存表中。

step2,第二步, 调用channel.writeAndFlush(request)向被调用方发起网络请求,并通过闭包的方式注册.发送结果监听。

step3,第三步,如果ChannelFuture返回true,则说明发送消息成功了。但仍然需要等待响应。注册该监听器,是为了避免发送失败,但客户端仍然在等待,直到超时,否则,如果短时间内被调用方不可用,就会导致大量线程在闲置等待响应结果。

step4,第四步,同步等待远程调用的响应结果:

3-1

 

可以看到里面使用了CountDwonLatch实现同步等待。

step 5,第五步,processResponseCommand作为【调用方】处理【被调用方】返回的响应结果的入口

step 6,第六步,通过返回的响应请求标识,得到相对应的ResponseFuture

step 7,最后responseFuture.putResponse(...),作为解除对应于ResponseFuture阻塞的请求:

3-2

这里顺便提一句,同步调用是通过线程池来限制请求个数的,从而达到限流的目的。

3.2 invokeAsync:

public void invokeAsyncImpl(...,  final InvokeCallback invokeCallback)        {        //获取请求唯一标识        final int opaque = request.getOpaque();        //semaphoreAsync此处的信号量是为了控制异步请求的个数,这里的默认最大并发个数为65535,并给出等待超时时间,        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);        if (acquired) {            //代码走到这里,说明已经获取请求成功            //这里利用原生的信号量,封装了一个只能释放一次的信号量,就是说,一次异步请求,只能释放一次资源,            //这里其实也算是一种防范式编程,为了避免一次请求会释放多次信号,导致别的请求无法释放的情况。            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);            //step1->缓存请求-响应结果键值对,这里的invokeCallback是客户端实现的回调实例。            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);            this.responseTable.put(opaque, responseFuture);            try {                //step2->发起网络请求,并注册【发送结果监听】                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                    @Override                    public void operationComplete(ChannelFuture f) throws Exception {                        if (f.isSuccess()) {                            responseFuture.setSendRequestOK(true);                            return;                        } else {                            responseFuture.setSendRequestOK(false);                        }                        responseFuture.putResponse(null);                        responseTable.remove(opaque);                        try {                             //这里为什么发送失败还要执行回调结果呢?其实也是一种防范行为,确保回调结果只执行一次。                            executeInvokeCallback(responseFuture);                        } catch (Throwable e) {                            PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);                        } finally {                            responseFuture.release();                        }                        ...                    }                });            } catch (Exception e) {                responseFuture.release();                PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);            }        } else {            ...            //获取请求不成功,会抛出请求过多异常            throw new RemotingTooMuchRequestException(info);        }    }   //step 3->这里直接跳至以下,作为【调用方】处理【被调用方】返回的响应结果   public void processResponseCommand(...) {        final int opaque = cmd.getOpaque();        final ResponseFuture responseFuture = responseTable.get(opaque);        if (responseFuture != null) {            responseFuture.setResponseCommand(cmd);            //释放信号量,only once            responseFuture.release();                         responseTable.remove(opaque);            //step 4->执行客户端注册的回调防范,代码走到这里,说明请求->响应的整个流程都成功了,并且也只会仅执行一次            if (responseFuture.getInvokeCallback() != null) {                executeInvokeCallback(responseFuture);            } else {                responseFuture.putResponse(cmd);            }        ...    }        //into step 4:    private void executeInvokeCallback(...) {        ...                 //异步执行回调方法                executor.submit(new Runnable() {                    @Override                    public void run() {                        try {                           //继续跟入executeInvokeCallback()方法                            responseFuture.executeInvokeCallback();                        } catch (Throwable e) {                            PLOG.warn("execute callback in executor exception, and callback throw", e);                        }                    }                });          ...    }   //into executeInvokeCallback()     public void executeInvokeCallback() {        if (invokeCallback != null) {            //使用AtomicBoolean状态位,在并发情况下,回调也仅执行一次。            if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {                invokeCallback.operationComplete(this);            }        }    }

总结一下,异步调用和同步调用的流程其实差不多,同样需要请求标识来获取响应结果,但异步请求就多了一个请求个数的限制,防止请求数过多,起了限流作用。并且需要客户端注册回调方法,在请求->响应整个过程成功后,异步执行回调。

3.3invokeOneway:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {        //step1->通过状态位标识该请求是oneway的rpc方式,当被调用方发现该request是oneway的形式后,仅处理请求业务,不再响应回调用方。        request.markOnewayRPC();        //step2->通过型号量控制oneway请求个数        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);        if (acquired) {            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);            try {                //step3->发送请求                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                    @Override                     //step4->确保请求发送后,结束oneway调用                    public void operationComplete(ChannelFuture f) throws Exception {                        once.release();                        if (!f.isSuccess()) {                            PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");                        }                    }                });            }            ...        }        ...    }

总结一下oneway调用,其实就是调用方确保请求发送成功后,客户端就直接返回了。

以上就是rmq的三种rpc方式的实现。

 

4、rmq的线程处理模型

在讲这个之前,先说一下netty的一个比较好的实战方式,就是在reactor线程中,我们最好不要注册一个同步的业务处理器,因为有可能业务处理比较耗时,从而导致大量的连接或读写事件无法及时处理而堆积,这样不仅会影响系统的整体吞吐,导致无法发挥出netty的优势,更有可能会导致频繁gc。

因此我们需要一个异步执行的业务处理器,来避免上述问题。我们来看看rmq如何设计该业务处理器,也就是往pipeline注册的入站事件,对于server端来说是NettyServerHandler ,对于client端来说是NettyClientHandler,两类处理器的线程模型是一样的,只是对应的业务处理不一样而已,因此,这里我们选择NettyServerHandler 来分析。在分析前,我们先看看rmq通讯模型抽象类NettyRemotingAbstract的一个属性字段processorTable

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =

new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

从代码上可以看出,processorTable就是一个HashMap:

key是Integer 类的请求码,rmq会对一类业务用一个处理器,也就是NettyRequestProcessor实现类来抽象,因此一个业务类的请求处理器与一个请求码相对应,这里我们先看看NettyRequestProcessor该抽象类的继承关系(图4-1):

 

图4-1

以红色框的PullMessageProcessor处理器为例,对应的request code为常量RequestCode.PULL_MESSAGE(值为11)。它是broker专门处理consumer客户端的拉取消息请求的一个业务逻辑处理器,本章先不分析如何实现这些业务细节,等到以后分析具体的业务实现时,例如客户端是如何从broker获取消息等流程时,再从源码上分析PullMessageProcessor是如何实现的。

 

value是一个Pair抽象:

其实pair就是一个简单的[对.绑定]抽象,以Pair<NettyRequestProcessor, ExecutorService>为例,在一类NettyRequestProcessor请求处理器,就绑定一个线程池,也就是由单独的线程池处理这类业务。

总体的效果就是,我们可以通过RequestCode获取具体的【业务处理器类】中【具体的处理方法】,再交由绑定的线程池异步处理。

processorTable是如何初始化的呢?我们以broker初始化为例。

BrokerController.initialize()时,在通过registerProcessor()

public void registerProcessor() {      ...        //向processorTable注册处理器,key为requestCode,value为具体业务处理器实例        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);                /**         * QueryMessageProcessor         */        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);         ...    }    @Override    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {        ExecutorService executorThis = executor;         //如果不想为该处理器指定一个线程池,则使用公共的线程池        if (null == executor) {            executorThis = this.publicExecutor;        }        //绑定业务处理器以及对应的线程池        Pair
pair = new Pair
(processor, executorThis); //向processorTable注册。 this.processorTable.put(requestCode, pair); }

以上为线程模型的初始化过程。接下来分析如何运行。

NettyServerHandler具体处理业务逻辑的流程如下:

class NettyServerHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { //step1->将反序列化的消息交由processMessageReceived处理 processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { //站在被调用方的角度,处理调用方的业务请求 case REQUEST_COMMAND: //step2->假如我们是被调用方,继续进入处理方法 processRequestCommand(ctx, cmd); break; //站在调用方的角度,处理被调用方返回的结果 case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } //step3->进入处理业务逻辑的核心方法 public void processRequestCommand(...) { //根据上面分析,通过请求码获取具体的业务请求器,以及线程池绑定对 final Pair
matched = this.processorTable.get(cmd.getCode()); //如果获取不到指定的,则使用默认处理器绑定对 final Pair
pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { //step4->构造匿名Runnable实现类,在里面执行真正的逻辑任务 Runnable run = new Runnable() { @Override public void run() { try { RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } //step6->真正具体的业务方法 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } //非oneway请求下,将结果写回【调用方】 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { ... } ... } } catch (Throwable e) { ... } } }; //有些业务处理器需要熔断限流保护,当rejectRequest返回true的时候,表明该处理器已经处于高负荷状态,主动拒绝请求。 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { //Runnable匿名实现类再次封装成统一的请求任务 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); //step5->往绑定的线程池提交任务 pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { //以下的异常处理就是【被调用方】通知【调用方】线程池的任务队列已满,无法继续处理,即请求过多。 if ((System.currentTimeMillis() % 10000) == 0) { PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " // + pair.getObject2().toString() // + " request code: " + cmd.getCode()); } //确保请求方式为非oneway的情况下,才通知调用方 if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } ... }

总结一下【被调用方】接受到请求后的处理流程:

step1,读的入站事件,经过反序列化以后,构造出RemotingCommand实体,该类是rmq所有远程调用的抽象类,由type属性决定是Request或Response请求,然后将实体传播到NettyServerHandler (服务端),交由processMessageReceived方法处理。

step2,我们上一步已假设是request类型,因此进入processRequestCommand方法。

step3,进入该方法后,根据RequestCode,获取【业务处理器】和【线程池】绑定对。

step4,用Runnable匿名实现封装具体业务处理逻辑,即run方法执行真正的逻辑。

step5,往绑定的线程池提交任务。

step6,交由NettyRequestProcessor实现类,通过RequestCode,调用具体的业务逻辑处理;当然,如果客户端有主动向broker注册RPCHook,也即处理逻辑前后置处理,便会在业务处理前以及业务处理后执行,通常用于监控比较多。最后再根据返回的response是否为null以及请求方式,来决定是否响应【调用方】。

以上为rmq的三种通讯方式的实现分析。

 

到这里,我们已经清楚rmq是如何设计通讯协议,如何实现rpc三种调用方式,以及业务处理的线程模型,接下我们就可以很方便地分析rmq的所有业务处理流程了。

作者:萝卜头4lbt
链接:http://www.jianshu.com/p/ddd8b5514a93
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

转载于:https://my.oschina.net/xiaominmin/blog/1592953

你可能感兴趣的文章
jxl最全使用方法
查看>>
linux-镜像下载
查看>>
CURL访问举例
查看>>
Vue 全选/取消全选,反选/取消反选
查看>>
thinkphp实现导航高亮的简单方法
查看>>
sp 获取数据库文件路径,用于暴力迁移服务器
查看>>
体现临床实际基线疾病活动度的早期RA患者中, 治疗起效时间对临床和放射学的影响...
查看>>
强直性脊柱炎活动指数(ASDAS)在日常诊疗种评估生物制剂治疗患者的应用-来自葡萄牙登记系统风湿病患者...
查看>>
RH133读书笔记(11)-Lab 11 System Rescue and Troubleshooting
查看>>
C# Winform 大全开发手册
查看>>
Mysql中一级缓存二级缓存区别
查看>>
面试题--在一个字符串中查找重复次数最多的字符(转)
查看>>
QTP的那些事--有关qtp中的action模板的使用
查看>>
VBS进价编程必须学会的WMI介绍
查看>>
常见Failed to load ApplicationContext异常解决方案!!
查看>>
Activiti 各个节点涉及的表
查看>>
POJ 2373 Dividing the Path (单调队列优化DP)题解
查看>>
小程序之web-view打开外部链接
查看>>
转|poj2234maches game|博弈论
查看>>
删除公共字符
查看>>