搭建部署:
1. 部署平台和部署方式:
Ubuntu:22.10
部署方式:源码安装部署
a. 下载源码到本地:rocketmq-all-5.3.1-source-release.zip
$ unzip rocketmq-all-5.3.1-source-release.zip // 解压缩
$ cd rocketmq-all-5.3.1-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U // maven 构建
$ cd distribution/target/rocketmq-5.3.1/rocketmq-5.3.1 // 进入到rocketmq安装目录
2. 启动服务:
a. 修改Name Server启动占用堆栈大小:sudo vi bin/runserver.sh,主要原因是减少rocketmq对本地资源占用过高,修改是在if...else...部分。if部分是jdk版本小于9;else部分是jdk版本大于等于9
b. 修改broker服务器启动占用堆栈大小:sudo vim bin/runbroker.sh, 同样根据版本修改
3. 启动name server服务:nohup sh bin/mqnamesrv &
4. 启动broker服务:nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
-n localhost:9876: 指定name server的地址,也可以通过其他方式指定,例如 export NAMESERV_ADDR="loalhost:9876"
5. name server 负责服务注册,维护Topic到broker的映射关系
6. broker是服务代理。负责维护Topick的消息队列,需要知道name server所在的地址
7. 本地服务验证生产者:sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
通过rocketmq自带的测试程序,作为生产者向rocketmq发送消息
8. 本地验证消费者:sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
通过rocketmq自带的测试程序,为作为消费者消费rocketmq消息
9. java程序使用SDK开发:
a. pom.xml中引入rocketmq的java依赖:(可以通过rocket的源码进行查看)
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version>
</dependency>
需要配置阿里的镜像源, 否则无法安装:
<!-- 阿里云仓库 -->
<mirror>
<id>alimaven</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
</mirror>
10. 练习代码:
consumer:
package com.cuc;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class Consumer {private static final String PRODUCER_GROUP = "please_rename_unique_group_name";private static final String DEFAULT_NAMESRVADDR="127.0.0.1:9876";private static final String TOPIC = "TopicTest";public static void main(String[] args) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);try {consumer.subscribe(TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.println(msg);System.out.printf("%s Receive New Messages:%s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}catch (MQClientException e) {e.printStackTrace();}System.out.println("Consumer Started");}
}
producer:
package com.cuc;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {private static final int MESSAGE_COUNT=1;private static final String PRODUCER_GROUP = "please_rename_unique_group_name";private static final String DEFAULT_NAMESRVADDR="127.0.0.1:9876";private static final String TOPIC = "TopicTest";private static final String TAG = "TagA";public static void main(String[] args) throws MQClientException,InterruptedException{// 指定nameserver// 创建生产者和broker// 发送消息DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();for(int i=0;i<MESSAGE_COUNT;i++){try {// 实例化消息Message msg = new Message(TOPIC, TAG, ("Hello Rocketmq").getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息,并返回发送结果SendResult sendResult = producer.send(msg);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}