消息队列-RockMQ-重试参数设置
最大重试消费实践
生产者和消费者都会遇到失败的问题。生产者比如网络失败,系统原因。消费者比如消费消息的过程中发生了异常,希望再次消费。
生产者重试
1 ResponseCode 通过ResponseCode 判断是否需要重试
producer.addRetryResponseCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
// 默认已经有了这些情况
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));
// 更多的重试情况可以参考
public class ResponseCode extends RemotingSysResponseCode {
public static final int FLUSH_DISK_TIMEOUT = 10;
public static final int SLAVE_NOT_AVAILABLE = 11;
public static final int FLUSH_SLAVE_TIMEOUT = 12;
public static final int MESSAGE_ILLEGAL = 13;
public static final int SERVICE_NOT_AVAILABLE = 14;
public static final int VERSION_NOT_SUPPORTED = 15;
public static final int NO_PERMISSION = 16;
public static final int TOPIC_NOT_EXIST = 17;
public static final int TOPIC_EXIST_ALREADY = 18;
public static final int PULL_NOT_FOUND = 19;
public static final int PULL_RETRY_IMMEDIATELY = 20;
public static final int PULL_OFFSET_MOVED = 21;
public static final int QUERY_NOT_FOUND = 22;
public static final int SUBSCRIPTION_PARSE_FAILED = 23;
public static final int SUBSCRIPTION_NOT_EXIST = 24;
public static final int SUBSCRIPTION_NOT_LATEST = 25;
public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
public static final int FILTER_DATA_NOT_EXIST = 27;
public static final int FILTER_DATA_NOT_LATEST = 28;
public static final int TRANSACTION_SHOULD_COMMIT = 200;
public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
public static final int TRANSACTION_STATE_UNKNOW = 202;
public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
public static final int NO_BUYER_ID = 204;
public static final int NOT_IN_CURRENT_UNIT = 205;
public static final int CONSUMER_NOT_ONLINE = 206;
public static final int CONSUME_MSG_TIMEOUT = 207;
public static final int NO_MESSAGE = 208;
public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
public static final int DELETE_ACL_CONFIG_FAILED = 210;
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
}
2 setRetryAnotherBrokerWhenNotStoreOK
如果发送失败,是否需要尝试发送到其他的 Broker 节点,就是没有特意的关注,到底是同步发送失败、还是异步发送失败,总之,只要是发送失败了后,看一看该变量,如果是 true 的话,那么就自动尝试将消息发送到其他的 Broker 节点
3 setRetryTimesWhenSendFailed:同步,参数默认值是2
4 setRetryTimesWhenSendAsyncFailed:异步,参数默认值是2
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
异步的时候会设置,异步线程池ExecutorService:
producer.setAsyncSenderExecutor();
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
消费者重试
1 设置最大消费次数
setMaxReconsumeTimes
consumer.setMaxReconsumeTimes();
/**
* Max re-consume times.
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
*
* If messages are re-consumed more than {@link #maxReconsumeTimes} before success.
*/
private int maxReconsumeTimes = -1;
2 通过消费状态,设置过一会儿再重新消费
ConsumeOrderlyStatus#SUSPEND_CURRENT_QUEUE_A_MOMENT
ConsumeConcurrentlyStatus#RECONSUME_LATER
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
二维码