`

ActiveMQ Producer Flow Control(生产者流量控制)

阅读更多

转载自:http://windows9834.blog.163.com/blog/static/2734500420131192144239/

对官方文档:http://activemq.apache.org/producer-flow-control.html  进行翻译。

在ActiveMQ4.x版本中,使用了流量控制措施,流量控制是用TCP流量控制实现的。因生产者受流量限制使消费者的底层网络连接将被挂起(等待消息),以强制进行流量控制限制。这个策略非常高效,但是如果有多个生产者和消费者共享同一个连接的时候,可能会导致死锁。

在ActiveMQ5.0版本中,我们可以分别对一个共享连接上的各个生产者进行流量控制,而不需要挂起整个连接。“流量控制”意味着当代理 (broker)检测到目标(destination)的内存容量,或temp-或file-store超过了限制,消息流的速度可能被减慢。生产者将会被阻塞直至资源可用,或者收到一个JMSException异常:这种行为是可配置的,下面的<systemUsage>章节会描述到。

 

值得注意的是,当memoryLimit或<systemUsage>限制达到的时候,<systemUsage>默认的设置会引起生产者阻塞:这种阻塞行为有时会被误解为“挂起的生产者”,而事实是生产者只是被挂起,还是活动的,只是一直在等待,直到有可用空间。

.同步发送的消息将会自动对每一个生产者使用流量控制;这一般只针对于持久性消息同步发送,除非您启用useAsyncSend的标志。

.当生产者使用异步发送消息时,一般来说,就是发送非持久化消息的生产者,不需要等候来自代理broker的任何确认回复消息;所以,如果内存限制被超过了,将不会被通知。如果你真的想什么时候都能知道代理broker的限制被超过了,则需要配置ProducerWindowSize这一连接选项,这样就算是异步消息也会对每一个生产者进行流量控制。

ActiveMQConnectionFactory connctionFactory =...
connctionFactory.setProducerWindowSize(1024000);

ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理broker的最大字节数据量,这个确认消息用来告诉生产者,代理broker已经收到先前发送的消息了。

也就是说,
ProducerWindowSize是指在收到broker确认应答之前,生产者能够传送消息给broker的最大信息量。
即使是异步发送消息,生产者也是在收到broker确认应答之后才把下一条消息传给broker。当使用异步传送的时候,可以设置jms.producerWindowSize(单位为字节)的属性,当生产者中等待发送的信息量到达设置的值时,即使没有收到broker的应答消息,生产者同样会把这些消息发给broker。

另外,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要知道queue或者topic的内存使用是否达到限制,你那你可以简单的将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。


如果你愿意,你可以通过在代理broker的配置中,对特定JMS的queue和toipc禁止流量控制,在目的地(destination)的策略(policy)中的producerFlowControl标志设置为false,使代理broker上特定的JMS queue和topic无效,例如:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntrytopic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>

查看 Broker Configuration.
需要注意的是,自从ActiveMQ 5.x中引入新的文件游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>。

<policyEntryqueue=">" producerFlowControl="true" memoryLimit="1mb">    
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>

上面的片段可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为1Mb。

How Producer Flow Control works(生产者流量控制是如何工作的)

如果你发送一条持久化消息(这样就会有一个对OpenWire响应消息的期望),代理broker将会给生产者发送一个ProducerAck 应答消息。该消息会通知生产者其先前的发送窗口已经被处理了,所以它现在可以发送另外的窗口了。这有点像消费者应答,不过是反向的。

Advantage 优势

所以,一个很好的生产者再发送更多的数据之前,会等待生产者应答,以此来避免对代理broker的冲击(并且如果出现了一个比较慢的消费者,强制代理阻塞整个连接)。如果你想知道这部分的源代码是怎么实现的,可以看一下ActiveMQMessageProducer的代码。

 

虽然一个客户端可以完全忽略生产者的所有应答消息,并且处理慢消费者的时候,代理可以在需要的时候拖延传送;虽然这意味着它将拖延整个连接。

Configure Client-Side Exceptions(配置客户端的异常)

应对当代理broker空间不足,而导致send()方法的无限期阻塞操作的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace属性设置为true,代理broker将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,并将其传递给客户端。下面是一个这种配置的示例:

<systemUsage>
<systemUsagesendFailIfNoSpace="true">
<memoryUsage>
<memoryUsagelimit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>

设置这个属性的好处是,客户端可以捕获javax.jms.ResourceAllocationException异常,过后,并重试send()操作,而不是无限期地傻等下去。

从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性被加了进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待了指定时间之后才触发。如果在配置的等待时间过去之后,代理broker上的空间仍然没有被释放,仅当这个时候send()方法才会失败,并且在客户端抛出异常。下面是一个示例:

<systemUsage>
<systemUsagesendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsagelimit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>

定义超时时间的单位是毫秒,所以上面的例子将会在当send()方法失败并对客户端抛出异常之前,等待三秒。这个属性的优点是,它仅仅阻塞配置指定的时间,而不是立即抛出发送失败的异常,或者是无限期阻塞。这个属性不仅在代理broker端提供了一个改进,还对客户端提供了一个改进,使得客户端能捕获异常,稍后并重试send() 操作。

Disabling Flow Control(禁用流量控制)

一个常见的要求是禁用流量控制,使得消息分发能够持续进行,直到所有可用的磁盘空间被等待发送(pending)的消息占用耗尽(无论是持久化的还是配置了非持久化的)。要实现这个要求,你可以使用消息游标(Message Cursors)。

 

System usage(系统资源占用)

你还可以通过<systemUsage>元素的一些属性来减慢生产者。来看一眼下面的例子:

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsagelimit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsagelimit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsagelimit="10 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>

你可以为非持久化的消息(NON_PERSISTENT messages)设置内存限制,为持久化消息(PERSISTENT messages)设置磁盘空间,以及为临时消息设置总的空间,代理broker将在减慢生产者发送速度之前使用这些空间。使用上述的默认设置,当资源使用完时,代理broker将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics