博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq Don't have SubscriptionGroup
阅读量:7117 次
发布时间:2019-06-28

本文共 3333 字,大约阅读时间需要 11 分钟。

1. 问题描述

   rocketmq 生产者发消息正常 mq后台也可以看到发出的消息

   但是消费者一直没消费 好像订阅没成功

 

2. 问题排查

通过上图查看 确实没有检测到订阅者

 

 

3. 问题解决

线上环境是 两台机器 共四个实例

项目中订阅了两个不同地址不同topic的mq

然后那个instanceName会有命名冲突

ip@进程id 

重新修改instanceName的value

//设置instanceName

 defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+"");

spring-rocketmq.xml

View Code

consumer.java

package com.x.mq.base;import com.x.mq.EvaluationMessageListener;import org.apache.commons.lang.math.JVMRandom;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Random;/** * Created by admin on 2018/1/31. */public class EvaluationConsumer {    private static final Logger logger = LoggerFactory.getLogger(EvaluationConsumer.class);    private DefaultMQPushConsumer defaultMQPushConsumer;    private EvaluationMessageListener evaluationMessageListener;    private String tag;    private String topic;    public EvaluationMessageListener getEvaluationMessageListener() {        return evaluationMessageListener;    }    public void setEvaluationMessageListener(EvaluationMessageListener evaluationMessageListener) {        this.evaluationMessageListener = evaluationMessageListener;    }    public String getTag() {        return tag;    }    public void setTag(String tag) {        this.tag = tag;    }    public String getTopic() {        return topic;    }    public void setTopic(String topic) {        this.topic = topic;    }    public DefaultMQPushConsumer getDefaultMQPushConsumer() {        return defaultMQPushConsumer;    }    public void setDefaultMQPushConsumer(DefaultMQPushConsumer defaultMQPushConsumer) {        this.defaultMQPushConsumer = defaultMQPushConsumer;    }    public void init() throws MQClientException {        // 订阅指定Topic下tags        defaultMQPushConsumer.subscribe(topic, tag);        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置为集群消费(区别于广播消费):集群只消费一次,广播会被多个消费者消费 defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);// defaultMQPushConsumer.registerMessageListener(messageListenerConcurrently); defaultMQPushConsumer.registerMessageListener(evaluationMessageListener); //设置instanceName defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+""); // 批量消费 defaultMQPushConsumer.setConsumeMessageBatchMaxSize(20); // Consumer对象在使用之前必须要调用start初始化,初始化一次即可
defaultMQPushConsumer.start(); logger.info("==rocketmq==DefaultMQPushConsumer start success! consumerGroup:{},nameServiceAddr:{},topic:{},tag:{}",defaultMQPushConsumer.getConsumerGroup(),defaultMQPushConsumer.getNamesrvAddr(),topic,tag); } public void destroy(){ defaultMQPushConsumer.shutdown(); }}
View Code

 

转载于:https://www.cnblogs.com/rocky-fang/p/8608174.html

你可能感兴趣的文章
比较好玩的动态添加网页元素
查看>>
可替代的C语言开发环境
查看>>
Word 2003中打开最近操作过的文档的两种推荐的方法
查看>>
一条长为L的绳子,一面靠墙,另外三边组成矩形,问此矩形最大面积能是多少?...
查看>>
保持Service不被Kill掉的方法--双Service守护 && Android实现双进程守护 2
查看>>
从源码分析常见的基于Array的数据结构动态扩容机制
查看>>
重新认识javascript的settimeout和异步
查看>>
【组合数学+动态规划】在如下8*6的矩阵中,请计算从A移动到B一共有____种走法。要求每次只能向上或向右移动一格,并且不能经过P。...
查看>>
前几天入手一大菠萝,写个初始化教程
查看>>
CSS布局 ——从display,position, float属性谈起
查看>>
SoapUI Pro Project Solution Collection-DataSource(jdbc,excel)
查看>>
全国主要城市不同日照标准的间距系数
查看>>
python网络爬虫(一):网络爬虫科普与URL含义
查看>>
UVA 11732 - strcmp() Anyone?(Trie)
查看>>
Vue v-bind的使用
查看>>
36.5. height / width
查看>>
动手实践虚拟网络 - 每天5分钟玩转 OpenStack(10)
查看>>
【Python】supervisor 工具介绍
查看>>
浅谈嵌入式软件的未来发展
查看>>
Spark源码分析之二:Job的调度模型与运行反馈
查看>>