该章节主要深入分析rmq是如何构建自己的通讯模型的,主要从以下四点分析:
1、总概论述
2、rmq的通讯协议实现。
3、rmq的三种通讯方式。
4、rmq的线程处理模型。
1、总概论述
对于rocket mq的通讯来说,也就是rpc调用的实现,例如,producer端向broker发起一次send(message), 其实就是向broker发起一次rpc,broker再根据通讯方式,响应produer的处理结果。
本文主要是对rmq如何设计通讯协议,如何实现rpc三种调用方式,其中包括invokeSync、invokeAsync、invokeOneway以及如何设计线程处理模型来处理业务。这里稍微提一下,这里不会涉及太多netty的细节,以及本章的阅读对象最好熟悉netty的使用。
2、rmq的通讯协议实现
rmq使用netty作为主要的通讯框架,当然,高可用部分纯粹使用nio来实现,这部分等以后讲到rmq高可用是如何实现时再来详细分析。这里主要分析的是rmq的remote,也即通讯模块。
先看一下rmq是如何抽象的,类图2-1:
2-1
总结一下: NettyRemotingAbstract,该类主要是rmq三种通讯方式以及线程处理模型的一个抽象。 NettyRemotingClient,该类主要负责客户端的通讯处理。 NettyRemotingServer,该类主要负责服务端的通讯处理。 RemotingCommand,该类是所有远程处理请求的抽象。
我们先看一下rmq是如何使用netty的:
图2-2
绝大部分使用过netty框架的程序猿来封装自己的通讯模块时,基本上都是使用图1-2的代码流程,这里稍微研读一下。
2.1 rmq对于reactor模型的实现选择:
if(RemotingUtil.isLinuxPlatform()&& nettyServerConfig.isUseEpollNativeSelector()) { this.eventLoopGroupSelector=newEpollEventLoopGroup(..);}else{ this.eventLoopGroupSelector=newNioEventLoopGroup(..);}
从源码上可以看出,如果jvm所处的是linux平台,则会选择使用netty自己重新实现的一套reactor模型,使用平台相关的EventLoopGroup,这可以产生更少的垃圾回收以及更快的性能。这里再单独说一下epoll和poll的核心区别,epoll轮询所有有效的注册事件,例如连接成功事件,可写事件或可读事件;而poll是无差别轮询,即会轮询所有的事件。因此性能上epoll会更快更有效。
2.2 netty的配置选项:
rmq使用PooledByteBufAllocator.DEFAULT属性,表明ByteBuf的分配方式是使用池化技术,相比普通方式创建的ByteBuf,也即朝生夕灭的java对象,性能相对来说会快不少。
2.3 netty Handler:
2.3.1 NettyEncoder 编码器
在说协议编码之前,我们先看看rmq的协议是如何设计的,下图是截取rmq源码的一个协议设计说明:图2-3
从图中可以看出rmq的通讯协议是挺简洁的。
length:消息总长度,int 类型 ,4字节
header length:消息头长度,int 类型 ,4字节 header data :消息头字节数据 body data:消息体字节数据
接下来,我们看看NettyEncoder的核心实现,以下代码为抽取主干的代码:
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { .... //step1,编码消息头 ByteBuffer header = remotingCommand.encodeHeader(); //step2,写入消息头的 out.writeBytes(header); //step3,消息体由客户端编码 byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); .... }
接着跟入remotingCommand.encodeHeader():
public ByteBuffer encodeHeader(final intbodyLength) {// 消息总长度int length =4;//存放消息头的字节内容byte[] headerData;//step1:正真执行消息头的编码headerData =this.headerEncode();//累加消息头长度length += headerData.length;//累加消息体的长度,到这里其实整个消息的长度已算出length += bodyLength//应为这里只存放消息头的内容,因此需要把消息体内容除去。ByteBuffer result = ByteBuffer.allocate(4+ length - bodyLength);result.putInt(length);//step2:接下来的4个字节,需要通过位移以及掩码运算才可以得出headerLength 以及序列化方式。result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));....}
这里rocketmq 的序列化方式有两种,一种是SerializeType.JSON,使用fastjson序列化;另外一种就是SerializeType.ROCKETMQ,根据协议头所需字段逐个写入。进入rocketMQProtocolEncode
方法后,说一下几个关键字段:
2-4
如上代码片段,有三个红圈的地方,一个就是extFields,该字段存储的是消息的扩张字段,例如我们做分布式跟踪的时候,就可以把traceId放入;第二个是opaque,该字段是一次rpc请求的标识;最后一个是flag,这个是系统消息的掩码标识,例如消息是否压缩等等。
headerEncode()
分析完后,我们看另外一个方法markProtocolType(headerData.length,serializeTypeCurrentRPC)
,进入该方法看看:
byte[] result =new byte[4];result[0] = type.getCode();result[1] = (byte) ((source >>16) &0xFF);result[2] = (byte) ((source >>8) &0xFF);result[3] = (byte) (source &0xFF);return result;
总结来说就是一个4字节、int类型的header length存储位,前面高一位的字节是序列化方式,后面低24位则是真正的消息头部长度;为什么是24位呢?我们看一下NettyDecoder.FRAME_MAX_LENGTH属性,就是消息解码时,最大的数据帧长度为16777216也就是2^24 - 1,刚好是24位,表明每条消息的大小不能超过16m。
到这里为止,我们分析完了NettyEncoder编码器,NettyDecoder是一个逆过程,这里读者可以自行分析了。
2.3.2 IdleStateHandler编码器:
该Handler是netty自带的一个读写空闲监测处理器,rmq集群内所有端到端的连接方式均采取长连接,所以需要空闲监测,通过该方式,可以主动释放一些无用、空闲的连接。当监听到例如写事件过时时,表明该连接是空闲的,因此会传播一个IdleStateEvent,NettyConnetManageHandler收到该事件以后,会异步关闭该连接。NettyConnetManageHandler会异步处理所有抛出异常信息的连接通道。2.3.3NettyServerHandler编码器:
该handler就是具体的业务处理器了,该处理器在下面讲到rmq的线程处理模型时再分析。以上就是rmq如何使用netty构造自己的通讯协议以及一些用法。
3、rmq的三种通讯方式
rmq的rpc通讯方式刚刚提到,一共有三种。分别是invokesync同步调用,同步等待结果;invokeAsync同步调用,异步回调结果;invokeoneway,同步调用,直接返回。
用订外卖来举例说明,A打电话给B订外卖,invokesync调用就是A会一直等待B,直到B把外卖送到,期间,A什么也做不了;invokeAsync就是A打给B明确订外卖以后,A会先去处理别的事情,等外卖到了,再由B主动通知A拿外卖,这时A才放下手中的活,去拿外卖;invokeoneway就是A直接告诉B去订外卖,等B明确收到通知后,A就不关注结果了,B收到订外卖的通知后,就直接准备外卖,但也不会通知A了。
我们分析一下NettyRemotingAbstract通讯抽象是如何实现的。
3.1 invokeSync:
public RemotingCommand invokeSyncImpl(...) { //获取请求标识,该标识在RemotingCommand 创建时,便经过 int opaque = requestId.getAndIncrement()生成。 //而RemotingCommand.requestId = new AtomicInteger(0) ,该属性是属于RemotingCommand的类属性,并且getAndIncrement是线程安全的, //所以,在RemotingCommand 创建实例变量时,便可以生成一个唯一的opaque标识了。 final int opaque = request.getOpaque(); try { //rmq使用CountDownLatch实现了同步调用的Future模式 final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); //step 1->responseTable.put //放入响应缓存里,key为opaque,可以以responseFuture一一对应 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //step 2->channel.writeAndFlush(request),正真发起网络请求 //这里使用了netty 的ChannelFutureListener为了监听发送结果,这里使用了闭包的方式实现理,这里使用了闭包的方式实现 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { //step 3-> 注册发送消息结果,ChannelFuture返回true,则说明发送消息成功了。但任然需要等待响应。注册该监听器,是为了避免发送失败,但客户端任然在等待,直到超时的情况,否则,如果短时间内被调用方不可用,就会导致大量线程在闲置等待响应结果。 @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); PLOG.warn("send a request command to channel <" + addr + "> failed."); } }); //step 4->responseFuture.waitResponse,这里同步等待远程调用的响应结果 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { //这里一定要移除缓存的请求 this.responseTable.remove(opaque); } } //step 5->这里直接跳到,作为调用方处理被调用方返回的响应结果 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); //step 6->通过返回的响应请求标识,得到相对应的ResponseFuture final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseFuture.release(); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { //step 7->//解除同步等待 responseFuture.putResponse(cmd); } } else { PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); PLOG.warn(cmd.toString()); } }
总结一下invokeSync的实现:
step1,第一步,先创建一个FutureResponse作为value,并通过opaque,也即请求标识作为key,放入responseTable结果缓存表中。step2,第二步, 调用channel.writeAndFlush(request)向被调用方发起网络请求,并通过闭包的方式注册.发送结果监听。
step3,第三步,如果ChannelFuture返回true,则说明发送消息成功了。但仍然需要等待响应。注册该监听器,是为了避免发送失败,但客户端仍然在等待,直到超时,否则,如果短时间内被调用方不可用,就会导致大量线程在闲置等待响应结果。
step4,第四步,同步等待远程调用的响应结果:
3-1
可以看到里面使用了CountDwonLatch实现同步等待。
step 5,第五步,processResponseCommand
作为【调用方】处理【被调用方】返回的响应结果的入口
step 6,第六步,通过返回的响应请求标识,得到相对应的ResponseFuture
step 7,最后responseFuture.putResponse(...)
,作为解除对应于ResponseFuture阻塞的请求:
3-2
这里顺便提一句,同步调用是通过线程池来限制请求个数的,从而达到限流的目的。
3.2 invokeAsync:public void invokeAsyncImpl(..., final InvokeCallback invokeCallback) { //获取请求唯一标识 final int opaque = request.getOpaque(); //semaphoreAsync此处的信号量是为了控制异步请求的个数,这里的默认最大并发个数为65535,并给出等待超时时间, boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { //代码走到这里,说明已经获取请求成功 //这里利用原生的信号量,封装了一个只能释放一次的信号量,就是说,一次异步请求,只能释放一次资源, //这里其实也算是一种防范式编程,为了避免一次请求会释放多次信号,导致别的请求无法释放的情况。 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); //step1->缓存请求-响应结果键值对,这里的invokeCallback是客户端实现的回调实例。 final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { //step2->发起网络请求,并注册【发送结果监听】 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { //这里为什么发送失败还要执行回调结果呢?其实也是一种防范行为,确保回调结果只执行一次。 executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } ... } }); } catch (Exception e) { responseFuture.release(); PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { ... //获取请求不成功,会抛出请求过多异常 throw new RemotingTooMuchRequestException(info); } } //step 3->这里直接跳至以下,作为【调用方】处理【被调用方】返回的响应结果 public void processResponseCommand(...) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); //释放信号量,only once responseFuture.release(); responseTable.remove(opaque); //step 4->执行客户端注册的回调防范,代码走到这里,说明请求->响应的整个流程都成功了,并且也只会仅执行一次 if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); } ... } //into step 4: private void executeInvokeCallback(...) { ... //异步执行回调方法 executor.submit(new Runnable() { @Override public void run() { try { //继续跟入executeInvokeCallback()方法 responseFuture.executeInvokeCallback(); } catch (Throwable e) { PLOG.warn("execute callback in executor exception, and callback throw", e); } } }); ... } //into executeInvokeCallback() public void executeInvokeCallback() { if (invokeCallback != null) { //使用AtomicBoolean状态位,在并发情况下,回调也仅执行一次。 if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { invokeCallback.operationComplete(this); } } }
总结一下,异步调用和同步调用的流程其实差不多,同样需要请求标识来获取响应结果,但异步请求就多了一个请求个数的限制,防止请求数过多,起了限流作用。并且需要客户端注册回调方法,在请求->响应整个过程成功后,异步执行回调。
3.3invokeOneway:public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { //step1->通过状态位标识该请求是oneway的rpc方式,当被调用方发现该request是oneway的形式后,仅处理请求业务,不再响应回调用方。 request.markOnewayRPC(); //step2->通过型号量控制oneway请求个数 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { //step3->发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override //step4->确保请求发送后,结束oneway调用 public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } ... } ... }
总结一下oneway调用,其实就是调用方确保请求发送成功后,客户端就直接返回了。
以上就是rmq的三种rpc方式的实现。
4、rmq的线程处理模型
在讲这个之前,先说一下netty的一个比较好的实战方式,就是在reactor线程中,我们最好不要注册一个同步的业务处理器,因为有可能业务处理比较耗时,从而导致大量的连接或读写事件无法及时处理而堆积,这样不仅会影响系统的整体吞吐,导致无法发挥出netty的优势,更有可能会导致频繁gc。
因此我们需要一个异步执行的业务处理器,来避免上述问题。我们来看看rmq如何设计该业务处理器,也就是往pipeline注册的入站事件,对于server端来说是NettyServerHandler
,对于client端来说是NettyClientHandler
,两类处理器的线程模型是一样的,只是对应的业务处理不一样而已,因此,这里我们选择NettyServerHandler
来分析。在分析前,我们先看看rmq通讯模型抽象类NettyRemotingAbstract
的一个属性字段processorTable
:
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
从代码上可以看出,processorTable就是一个HashMap:
key是Integer 类的请求码,rmq会对一类业务用一个处理器,也就是NettyRequestProcessor
实现类来抽象,因此一个业务类的请求处理器与一个请求码相对应,这里我们先看看NettyRequestProcessor
该抽象类的继承关系(图4-1):
图4-1
以红色框的PullMessageProcessor
处理器为例,对应的request code为常量RequestCode.PULL_MESSAGE
(值为11)。它是broker专门处理consumer客户端的拉取消息请求的一个业务逻辑处理器,本章先不分析如何实现这些业务细节,等到以后分析具体的业务实现时,例如客户端是如何从broker获取消息等流程时,再从源码上分析PullMessageProcessor
是如何实现的。
value是一个Pair抽象:
其实pair就是一个简单的[对.绑定]抽象,以Pair<NettyRequestProcessor, ExecutorService>
为例,在一类NettyRequestProcessor
请求处理器,就绑定一个线程池,也就是由单独的线程池处理这类业务。 总体的效果就是,我们可以通过RequestCode获取具体的【业务处理器类】中【具体的处理方法】,再交由绑定的线程池异步处理。
那processorTable
是如何初始化的呢?我们以broker初始化为例。
在BrokerController.initialize()
时,在通过registerProcessor()
public void registerProcessor() { ... //向processorTable注册处理器,key为requestCode,value为具体业务处理器实例 this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); /** * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); ... } @Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; //如果不想为该处理器指定一个线程池,则使用公共的线程池 if (null == executor) { executorThis = this.publicExecutor; } //绑定业务处理器以及对应的线程池 Pairpair = new Pair (processor, executorThis); //向processorTable注册。 this.processorTable.put(requestCode, pair); }
以上为线程模型的初始化过程。接下来分析如何运行。
NettyServerHandler
具体处理业务逻辑的流程如下: class NettyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { //step1->将反序列化的消息交由processMessageReceived处理 processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { //站在被调用方的角度,处理调用方的业务请求 case REQUEST_COMMAND: //step2->假如我们是被调用方,继续进入处理方法 processRequestCommand(ctx, cmd); break; //站在调用方的角度,处理被调用方返回的结果 case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } //step3->进入处理业务逻辑的核心方法 public void processRequestCommand(...) { //根据上面分析,通过请求码获取具体的业务请求器,以及线程池绑定对 final Pair matched = this.processorTable.get(cmd.getCode()); //如果获取不到指定的,则使用默认处理器绑定对 final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { //step4->构造匿名Runnable实现类,在里面执行真正的逻辑任务 Runnable run = new Runnable() { @Override public void run() { try { RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } //step6->真正具体的业务方法 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } //非oneway请求下,将结果写回【调用方】 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { ... } ... } } catch (Throwable e) { ... } } }; //有些业务处理器需要熔断限流保护,当rejectRequest返回true的时候,表明该处理器已经处于高负荷状态,主动拒绝请求。 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { //Runnable匿名实现类再次封装成统一的请求任务 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); //step5->往绑定的线程池提交任务 pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { //以下的异常处理就是【被调用方】通知【调用方】线程池的任务队列已满,无法继续处理,即请求过多。 if ((System.currentTimeMillis() % 10000) == 0) { PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " // + pair.getObject2().toString() // + " request code: " + cmd.getCode()); } //确保请求方式为非oneway的情况下,才通知调用方 if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } ... }
总结一下【被调用方】接受到请求后的处理流程:
step1,读的入站事件,经过反序列化以后,构造出RemotingCommand
实体,该类是rmq所有远程调用的抽象类,由type属性决定是Request或Response请求,然后将实体传播到NettyServerHandler (服务端),交由processMessageReceived
方法处理。 step2,我们上一步已假设是request类型,因此进入processRequestCommand
方法。
step3,进入该方法后,根据RequestCode,获取【业务处理器】和【线程池】绑定对。
step4,用Runnable匿名实现封装具体业务处理逻辑,即run方法执行真正的逻辑。
step5,往绑定的线程池提交任务。
step6,交由NettyRequestProcessor
实现类,通过RequestCode,调用具体的业务逻辑处理;当然,如果客户端有主动向broker注册RPCHook,也即处理逻辑前后置处理,便会在业务处理前以及业务处理后执行,通常用于监控比较多。最后再根据返回的response是否为null以及请求方式,来决定是否响应【调用方】。
以上为rmq的三种通讯方式的实现分析。
到这里,我们已经清楚rmq是如何设计通讯协议,如何实现rpc三种调用方式,以及业务处理的线程模型,接下我们就可以很方便地分析rmq的所有业务处理流程了。
作者:萝卜头4lbt 链接:http://www.jianshu.com/p/ddd8b5514a93 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。