1. 问题描述
rocketmq 生产者发消息正常 mq后台也可以看到发出的消息
但是消费者一直没消费 好像订阅没成功
2. 问题排查
通过上图查看 确实没有检测到订阅者
3. 问题解决
线上环境是 两台机器 共四个实例
项目中订阅了两个不同地址不同topic的mq
然后那个instanceName会有命名冲突
ip@进程id
重新修改instanceName的value
//设置instanceName
defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+"");spring-rocketmq.xml
consumer.java
package com.x.mq.base;import com.x.mq.EvaluationMessageListener;import org.apache.commons.lang.math.JVMRandom;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Random;/** * Created by admin on 2018/1/31. */public class EvaluationConsumer { private static final Logger logger = LoggerFactory.getLogger(EvaluationConsumer.class); private DefaultMQPushConsumer defaultMQPushConsumer; private EvaluationMessageListener evaluationMessageListener; private String tag; private String topic; public EvaluationMessageListener getEvaluationMessageListener() { return evaluationMessageListener; } public void setEvaluationMessageListener(EvaluationMessageListener evaluationMessageListener) { this.evaluationMessageListener = evaluationMessageListener; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public DefaultMQPushConsumer getDefaultMQPushConsumer() { return defaultMQPushConsumer; } public void setDefaultMQPushConsumer(DefaultMQPushConsumer defaultMQPushConsumer) { this.defaultMQPushConsumer = defaultMQPushConsumer; } public void init() throws MQClientException { // 订阅指定Topic下tags defaultMQPushConsumer.subscribe(topic, tag); // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 // 如果非第一次启动,那么按照上次消费的位置继续消费 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置为集群消费(区别于广播消费):集群只消费一次,广播会被多个消费者消费 defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);// defaultMQPushConsumer.registerMessageListener(messageListenerConcurrently); defaultMQPushConsumer.registerMessageListener(evaluationMessageListener); //设置instanceName defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+""); // 批量消费 defaultMQPushConsumer.setConsumeMessageBatchMaxSize(20); // Consumer对象在使用之前必须要调用start初始化,初始化一次即可 defaultMQPushConsumer.start(); logger.info("==rocketmq==DefaultMQPushConsumer start success! consumerGroup:{},nameServiceAddr:{},topic:{},tag:{}",defaultMQPushConsumer.getConsumerGroup(),defaultMQPushConsumer.getNamesrvAddr(),topic,tag); } public void destroy(){ defaultMQPushConsumer.shutdown(); }}