RocketMQ入门、安装、详解、外网配置

下载&安装

下载地址http://rocketmq.apache.org/docs/quick-start/

解压 tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz

进入HOME目录 cd rocketmq-all-4.4.0-bin-release

开启端口 9876/9876 10909/10912

修改配置

    1. 启动脚本内存配置

cd bin
grep "Xmx" *
vim runbroker.sh、vim runserver.sh、vim tools.sh

将 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
   JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"

改为 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
     JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"

    2. Broker配置

vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=你的公网IP:9876
brokerIP1=你的公网IP
# 是否自动创建topic 线上改为false 测试true
autoCreateTopicEnable=true

启动

1. 启动注册中心 nohup bin/mqnamesrv -n 你的公网IP:9876 > mqnamesrv.log 2>&1 &

2. 检查端口监听是否为0.0.0.0:9876/外网IP:9876 命令 netstat -anpt | grep 9876

3. 启动数据节点 nohup sh bin/mqbroker -n 你的公网IP:9876 -c conf/broker.conf > broker.log 2>&1 &

4. 查看是否注册成功(集群信息) bin/mqadmin clusterList -n 你的公网IP:9876

常用命令

cd rocketmq-all-4.4.0-bin-release/
---集群相关
查询集群信息 bin/mqadmin clusterList -n localhost:9876

打印Broker配置 bin/mqbroker -m -n localhost:9876

更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876

查看Broker统计信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10911

---订阅组相关
创建订阅组 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName

列出消费组 bin/mqadmin consumerProgress -n localhost:9876

查看消费组IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876

查看消费组数据堆积 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

删除订阅组 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName

---Topic相关
创建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName

Topic列表 bin/mqadmin topicList -n localhost:9876

发送Topic消息测试 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024

打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName

Topic详情统计 bin/mqadmin topicstatus -n localhost:9876 -t TopicName

获取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName

删除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName

查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName

查看Topic状态 bin/mqadmin topicStatus -n localhost:9876 -t TopicName

根据ID查询消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876

根据偏移量查询消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName

 

JAVA示例

  1. pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-tcnative</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
    </dependencies>

 

  2. Producer 生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("GroupName");
        // Specify name server addresses.
        producer.setNamesrvAddr("IP:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicName" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

 

  3. Consumer 消费者

public class Consumer {
    public static void main(String[] args) {
        String topicName = "TopicName";
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("GroupName");
        consumer.setNamesrvAddr("IP:9876");
        try {
            consumer.subscribe(topicName, "*");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            /**
             * 如果是顺序消息,这边的监听就要使用MessageListenerOrderly监听
             * 并且,返回结果也要使用ConsumeOrderlyStatus
             */
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg : msgs) {
                            String recString = null;
                            try {
                                recString = new String(msg.getBody(), "UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                            System.out.println(recString);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  4. 打包成jar的插件[可选]

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!--jdk 版本-->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--全限定名-->
                                    <mainClass>com.package.Consumer</mainClass>
                                </transformer>
                            </transformers>
                            <artifactSet> </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

 个人微信,有什么建议、意见或补充,欢迎及时沟通!!!(添加时注明“博客园”,谢谢)