博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMq消息队列使用
阅读量:6988 次
发布时间:2019-06-27

本文共 6032 字,大约阅读时间需要 20 分钟。

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

比kafka还是有过之无不及,其实kafka文档很丰富

但RocketMQ网上的文章太少,找不到相关的操作教程

于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

下载源码的地址 

  • 首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

    com.alibaba.rocketmq
    rocketmq-client
    3.2.6

Downloads

在linux 下用wget 下载源码然后解压出来

在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

可以 vi runserver.sh

分别给 mqnamesrv mqbroker play.sh 执行的权限

chmod +x  mqnamersrv 

chmod +x  mqbroker 

chmod +x  play.sh 

下面红线框的这段 命令输入错误了,忽略不用看

通过 nohup sh mqnamesrv& 启动 RocketMq

目前没看到结束的命令,也没找到相关的介绍,

我这里用的 ps -ef|grep rocketmq  查到进程pid

然后kill pid号

或则pkill -9 java [慎用]

用jps -v 查看下java进程的参数

 rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

在防火墙配置里面加上 9876端口,设置iptables对外开放

部署Broker 

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

 Master和Slave的配置文件参考conf目录下的配置文件

 Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

 一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

 部署一Master一Slave,集群采用异步复制方式:

 Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

 

 

package com.pgsqlmybatis.common.rocketmq;/***************************************************************** 公司名称    :* 系统名称    :信用管家专业版* 类 名 称    :Ios渠道idfa统计,推广统计用* 功能描述    :* 业务描述    :* 作 者 名    :@Author Royal* 开发日期    :2016-05-15* Created     :IntelliJ IDEA**************************************************************** 修改日期    :* 修 改 者    :* 修改内容    :****************************************************************/import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class Producer {    public static void main(String[] args) {        DefaultMQProducer producer = new DefaultMQProducer("Producer");        producer.setNamesrvAddr("xxxxxxxxxx:9876");        try {            producer.start();            String pushMsg="kafka activeMq rocketMq 消息队列使用1";            Message msg = new Message("PushTopic","push","1",                    pushMsg.getBytes("UTF-8"));            SendResult result = producer.send(msg);            System.out.println("id:" + result.getMsgId() +                    " result:" + result.getSendStatus());            String pushMsg2="海量级消息记录单机测试2";            msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));            result = producer.send(msg);            System.out.println("id:" + result.getMsgId() +                    " result:" + result.getSendStatus());            String pushMsg3="海量级消息记录单机测试3";            msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());            result = producer.send(msg);            System.out.println("id:" + result.getMsgId() +                    " result:" + result.getSendStatus());        } catch (Exception e) {            e.printStackTrace();        } finally {            producer.shutdown();        }    }}

  

启动生成者

 

启动消费者

package com.pgsqlmybatis.common.rocketmq;/***************************************************************** 公司名称    :* 系统名称    :信用管家专业版* 类 名 称    :Ios渠道idfa统计,推广统计用* 功能描述    :* 业务描述    :* 作 者 名    :@Author Royal* 开发日期    :2016-05-15* Created     :IntelliJ IDEA**************************************************************** 修改日期    :* 修 改 者    :* 修改内容    :****************************************************************/import java.io.UnsupportedEncodingException;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {    public static void main(String[] args){        DefaultMQPushConsumer consumer =                new DefaultMQPushConsumer("PushConsumer");        consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");        try {            consumer.subscribe("PushTopic", "push");            /**             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List
list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); String recString= null; try { recString = new String(msg.getBody() ,"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(recString); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}

   

 

以上为单机实例配置

如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^ 

参考:http://blog.csdn.net/a19881029/article/details/34446629

        http://sofar.blog.51cto.com/353572/1540874

        http://blog.csdn.net/loongshawn/article/details/51086876

       

       

             

       

转载于:https://www.cnblogs.com/fangyuan303687320/p/5495481.html

你可能感兴趣的文章
【PHP 扩展开发】Zephir 基础篇
查看>>
HTML
查看>>
HashMap浅析?
查看>>
字节跳动开源Go结构体标签表达式解释器,成请求参数校验的杀手锏
查看>>
怎么将在线录制的视频转为GIF动态图
查看>>
js的setTimeout和Promise---同步异步和微任务宏任务
查看>>
【剑指offer】顺时针打印矩阵
查看>>
怎么将图片上传封装成指令?
查看>>
leetcode讲解--861. Score After Flipping Matrix
查看>>
聊聊JavaScript和Scala的表达式 Expression
查看>>
[原]数据科学教程: 如何使用 mlflow 管理数据科学工作流
查看>>
npm上创建发布package
查看>>
解决JS文件引用路径多层查找
查看>>
FE.TEST-前端测试初探
查看>>
超详细Dkhadoop虚拟机安装图文教程
查看>>
排序算法上——冒泡排序、插入排序和选择排序
查看>>
JAVA 8 函数式接口--Supplier
查看>>
Android HTTP
查看>>
Dockerfile多阶段构建原理和使用场景
查看>>
476-数字的补数
查看>>