欢迎来到课桌文档! | 帮助中心 课桌文档-建筑工程资料库
课桌文档
全部分类
  • 党建之窗>
  • 感悟体会>
  • 百家争鸣>
  • 教育整顿>
  • 文笔提升>
  • 热门分类>
  • 计划总结>
  • 致辞演讲>
  • 在线阅读>
  • ImageVerifierCode 换一换
    首页 课桌文档 > 资源分类 > DOCX文档下载  

    2022Apache RocketMQ 从入门到实战.docx

    • 资源ID:1422154       资源大小:1.04MB        全文页数:164页
    • 资源格式: DOCX        下载积分:5金币
    快捷下载 游客一键下载
    会员登录下载
    三方登录下载: 微信开放平台登录 QQ登录  
    下载资源需要5金币
    邮箱/手机:
    温馨提示:
    用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)
    支付方式: 支付宝    微信支付   
    验证码:   换一换

    加入VIP免费专享
     
    账号:
    密码:
    验证码:   换一换
      忘记密码?
        
    友情提示
    2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
    3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
    4、本站资源下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。
    5、试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓。

    2022Apache RocketMQ 从入门到实战.docx

    ApacheRoCketMO从入门到实战I目录开篇:我的另一种参与RocketMQ开源社区的方式61.1 RocketMQ核心概念扫盲篇101.2 生产环境中,autoCreateTopicEnable为什么不能设置为true181.3 实战:RoCketMQ学习环境搭建指南篇281.4 RocketMQHA核心工作机制391.5 踩坑记:roCketmq-ConSole消费TPS为0,但消息积压数却在降低是个什么“鬼”491.6 RocketMQ一个新的消费组初次启动时从何处开始消费呢?641.7 一次RocketMQ进程自动退出排查经验分享781.8 RocketMQ主题扩分片后遇到的坑821.9 RocketMQ消息发送systembusy、brokerbusy原因分析与解决方案坑911.10 再谈RocketMQbrokerbusy1041.11 从年末生产故隙解锁RocketMQ集群部署的最佳实践1081.12 RocketMQ一行代码造成大量消息丢失1151.13 RocketMQD1.edger多副本即主从切换实战1211.14 RocketMQmsgld与OffsetMsgId释疑1311.15 RocketMQAC1.使用指南1411.16 RocketMQ消息轨迹-设计篇1511551.17 消息发送常见问题与解决方案开篇:我的另一种参与RocketMQ开源社区的方式夕ROCfcetMQYwnMthful2019OlTSTAM)INGCOVnUBlTORCERTIFICATElU卜torrrtihthl&dinwmz:iRfvQ-"姆CkHiyiPu-3AX”'2WeXE1.洸,1."bjgFi.Z*m,uIhw说到参与开源项目,很多人都理解为成为一名Committer才能算式参与到开源社区的建设?但其实这个就是参与开源项目有代码层面的贡献,也有非代码贡献层面的如技术布道、社区运营(线上直播、线下活动、文档编辑)等。如何参与一个开源项目,容我慢慢道来。-、与RocketMQ相识、相知到在一起在2017年听到阿里巴巴将RocketMQ捐赠给Apache基金会成为Apache的顶级项目,我内心是无比激动,因为终于可以一睹一款高性能的消息中间件的实现原理。通过阅读了RocketMQ官方,以下几个特别的点更是吸引了我的注意,让我下定决心深入研究一番。.RocketMQ为什么性能高效,到底运用了什么“厉害”的技术?.RocketMQ如何实现刷盘(可以类比一下数据库方面的刷盘、red。、undo日志)?.RocketMQ文件存储设计理念、基于文件的Hash索引是怎么实现的?.定时消息、消息过滤等实现原理。.如何进行网络编程(Netty实战)?下定决心后便开始了我的源码分析RocketMQ之旅,大概在4个多月的时间中连续发表了30余篇文章,从Nameserver,消息发送高可用设计、消息存储、消息消费、消息过滤、事务消息等各个方面对其进行了体系化的剖析,边写边分享,边分享边传播,终于得到了机械工业出版社华章分社的杨福川老师的认可,邀请我出书。在杨老师和张工的帮助与指点下,经过将近半年的努力,书稿基本完稿。由于我当时是位名不经传的新人,按照出版行业的惯例,需要找一些该领域内专家大牛帮忙做序或写写推荐语。当时我也是初生牛犊不怕虎,蹦出了一个非常大胆的想法,是不是可以联系到RockaMQ官方的一些大佬,最终我直接锁定了RocketMQ创始人冯嘉大神,希望他能帮我作序推荐,令人惊喜的是冯嘉大神非常平易见人,得知我的来意后,他说了这样一句话:“我是非常愿意为写书的朋友作序,但需要评估一下书稿的质量,如果质量OK,非常愿意效劳”。我备受鼓舞,在和出版社初步沟通后,将试读稿件再加上消息存储整章的内容发给冯嘉大神后,经冯嘉大神认真审稿后,决定帮忙推荐作序,真的非常受鼓舞。随着RockeiMQ技术内幕一书的正式出版上市,并得到广大读者朋友的认可,与官方的联系也越来越多,后面在RocketMQ中国社区负责人青峰大佬的筹备下,我还参与了RockotMQ官方社区的源码解析直播活动、官方文档审稿等工作,并在社区得到了不错的反响。说到这里大家是不是觉得非常奇怪,是不是都认为你只是在写文章,写书,没有真正参与开源社区呀,没有贡献代码,这个算哪门子参与开源社区?其实我一开始连我自己也没有意识到我正在参与一个开源项目,直到我在冯嘉大神为我写的序言中给了我一个新的称号:RockotMQ布道师,从而才真正了解到参与开源的另外一种方式:做一个开源项目的传播者,让更多人更容易的应用它,即降低大众对它的使用门槛。有了新的称号,那就得更加努力,朝着优秀努力,在2019年我又陆续发表了20几篇关于ROCketMQ相关的文章,这些文章含金量极高,不仅及时跟进了RocketMQ4.3,0之后的新特性:消息轨迹、AC1.主从切换等机制,更是发表了数篇实战类文章,详细指出在生产环境下一些使用误区,更是输出了几篇生产环境真实故障与解决方案。最终于2019年RocketMQ官方社区授予我优秀布道师荣誉称号。RocketMQ成就了我,我也会继续努力,为传播RocketMQ尽一份力所能及的力量。2020年,继续努力。二、如何成为开源项目的Committer有一些粉丝在问我,您对RockctMQ研究的这么深入,为什么不考虑贡献代码,成为一名Committer呢?这是因为参与开源项目需要具备一些基本条件,当下我的实际情况不符合,那成为一个开源项目的Committer有些什么条件呢?1 .扎实的JaVa基础功底一个开源项目的底层都会涉及到存储,这就要求具备一定的数据结构基础,JAVA集合框架中的类自然成为了我们突破数据结构最好的老师,其次是java并发,即多线程、并发容器、锁等课题,这方面可以好好学习一下JUC框架。最后最好是具备一些网络方面的知识,例如NlO、Nettyo2 .持续输出能力成为一个开源项目的contributions非常容易,提交一个PR并被通过即可,甚至于提交一个文档被接受也同样可以,难的是持续贡献,最终被开源项目的PMC认为对该项目有着突出贡献。我比较“苦逼”,在带娃方面我的资源只有我老婆,父母在老家无法分身,故下班后我没有连续的空闲时间专心投入一项任务中,而开源最需要的是精益求精,不只是需要完成功能,而是要编写结构优良的代码,设计所占据的时间比代码开发时间要多的多,故我个人认为我暂时不方便走代码贡献这条道路。但我零碎时间还是充足的,故现阶段我会好好利用这些零碎时间,继续通过写文章的方式为开源项目贡献自己的一份力量。接下来我们回到本节的主题,那如何参与一个开源项目呢?在参与一个开源项目之前,我觉得第一个最基本的步骤还是要打牢基础,这里的基础至少要包括JAVA集合、JAVA并发(JUC)这两项,只是最最基本的,至少要阅读其源码,理解其设计理念,至于NlO,Netty这些可以后续在需要使用时再去专门学习,有针对性的学习,有使用需求,或许学习动力更强劲,学习效率更高效。当具备一定的基础后,如何从零开始参与进开源项目呢?通常有如下几个方法:.看看官方文档,特别是设计手册,从整体上把握其设计理念。写写源码分析类文章,从整体上把控这个框架,这个花费时间较多,如果框架正在起步。阶段,不建议该方法;如果框架比较成熟,非常建议采用该方法。.尝试看看开源项目中的issues,看能不能解决,从问题入手,快速融入该项目。.尝试谢谢单元测试用例,测试驱动开发,借此学习该框架。后面的事情就是坚持不懈,朝着目标不断前进,中途可以放慢速度,但千万别放弃,因为只有坚持,才能胜利,只要前进,就离目标更近。参与开源,一个最基本的条件是拥有大量的连续时间,想要成为一个开源框架的Commder,唯有坚持不懈,持续投入,持续产出。最后再次感谢RocketMQ社区对我的认可,我会尽努力做出更大的贡献,也希望广大读者朋友们,积极参与开源社区,贡献一份自己的力量,同事打造自身影响力,助力职场步步高升。1.1RocketMQ核心概念扫盲篇在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RockotMQ核心概念,为大家学习RQCkelMQ打下牢固的基础。-、RocketMQ部署架构在RockelMQ主要的组件如下:1. NameserverNameserver集群,topic的路由注册中心,为客户端根据Topic提供路由服务,从而引导客户端向BrC)ker发送消息。NameSerVer之间的节点不通信。路由信息在Nameserver集群中数据一致性采取的最终一致性。2. Broker消息存储服务器,分为两种角色:Master与Slave,上图中呈现的就是2主2从的部署架构,在RockeiMQ中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,从服务器可以承担读服务(消息消费)。所有Broker,包含Slave服务器每隔30s会向Nameserver发送心跳包,心跳包中会包含存在在Broker上所有的topic的路由信息。3. Client消息客户端,包括ProdUCer(消息发送者)和CQnSUmer(消费消费者).客户端在同一时间只会连接一台nameserver,只有在连接出现异常时才会向尝试连接另外一台。客户端每隔30s向Nameserver发起topic的路由信息查询。温馨提示:Nameserver是在内存中存储Topic的路由信息,持久化Topic路由信息的地方是在RrOker中,即$ROCKETMQ_HOME)/store/config/topics.jsono在RocketMQ4.5.0版本后引入了多副本机制,即一个复制组(m-s)可以演变为基于raft协议的复制组,复制组内部使用raft协议保证broker节点数据的强一致性,该部署架构在金融行业用的比较多。二、消息订阅模型在RocketMQ的消息消费模式采用的是发布与订阅模式。topic:一类消息的集合,消息发送者将一类消息发送到一个主题中,例如订单模块将订单发送到OrdeJlopic中,而用户登录时,将登录事件发送到user_.og:ntopic中。consumegroup:消息消费组,一个消费单位的“群体”,消费组首先在启动时需要订阅需要消费的topic0一个topic可以被多个消费组订阅,同样一个消费组也可以订阅多个主题。一个消费组拥有多个消费者。术语解释起来有点枯燥晦涩,接下来我举例来阐述。例如我们在开发一个订单系统,其中有一个子系统:Order-SerViCe-app,在该项目中会创建一个消费组Order_consumer来订阅Ordor_tOPic,并且基于分布式部署,order-service-app的部署情况如下:ordcr-servicc-appordcr-scrvicc-appordcr-servicc-app192.168.1.16192.168.1.17192.168.1.18orderconsumerorderconsumerorderconsumer即order-service-app部署了3台服务器,每一个jvm进程可以看做是消费组order_consumer消费组的其中一个消费者。1 .消费模式那这三个消费者如何来分工来共同消费Orderjopic中的消息呢?在R。CketMQ中支持广播模式与集群模式。广播模式:一个消费组内的所有消费者每一个都会处理topic中的每一条消息,通常用于刷新内存缓存。集群模式:一个消费组内的所有消费者共同消费个topic中的消息,即分工协作,一个消费者消费一部分数据,启动负载均衡,集群模式是非常普遍的模式,符合分布式架构的基本理念,即横向扩容,当前消费者如果无法快速及时处理消息时,可以通过增加消费者的个数横向扩容,快速提高消费能力,及时处理挤压的消息。2 .消费队列负载算法与重平衡机制那集群模式下,消费者是如何来分配消息的呢?例如上面实例中order_topic有16个队列,那一个拥有3个消费者的消费组如何来分配队列中。在MQ领域有一个不成文的约定:同一个消费者同一时间可以分配多个队列,但一个队列同一时间只会分配给一个消费者。RocketMQ提供了众多的队列负载算法,其中最常用的两种平均分配算法。AnocateMessageQueueAveragely-平均分配ABocatcMessageQueuoAveragelyByCircle.轮流平均分配为了说明这两种分配算法的分配规则,现在对16个队列,进行编号,用qql5表示,消费者用COc2表示。A.ocatcMossagoQueueAveragely分配算法的队列负载机制如下:cO:qqlq2q3q4q5cl:q6q7q8q9qlc2:qllql2ql3ql4ql5其算法的特点是用总数除以消费者个数,余数按消费者顺序分配给消费者,故c会多分配一个队列,而且队列分配是连续的。ADocateMessageQueueAverageIyByCircle分配算法的队列负载机制如下:cO:qq3q6q9ql2ql5cl:qlq4q7qlql3该分配算法的特点就是轮流一个一个分配。温馨提示:如果topic的队列个数小于消费者的个数,那有些消费者无法分配到消息。在RocketMQ中一个topic的队列数直接决定了最大消费者的个数,但topic队列个数的增加对RocketMQ的性能不会产生影响。在实际过程中,对主题进行扩容(增加队列个数)或者对消费者进行扩容、缩容是一件非常寻常的事情,那如果新增一个消费者,该消费者消费哪些队列呢?这就涉及到消息消费队列的重新分配,即消费队列重平衡机制。在RocketMQ客户端中会每隔20s去查询当前topic的所有队列、消费者的个数,运用队列负载算法进行重新分配,然后与上一次的分配结果进行对比,如果发生了变化,则进行队列重新分配;如果没有发生变化,则忽略。例如采取的分配算法如下图所示,现在增加一个消费者c3,那队列的分布情况是怎样的呢?AlIoCateMeSSageQUeUeAVeragely分配算法的队列负载机制如下:c:qq1q2q3q4q5d:q6q7q8q9q10c2:q11q12q13q14q15根据新的分配算法,其队列最终的情况如下:cO:qqlq2q3cl:q4q5q6q7c2:q8q9qlqllc3:ql2ql3ql4ql5上述整个过程无需应用程序干预,由RocketMQ完成。大概的做法就是将将原先分配给自己但这次不属于的队列进行丢弃,新分配的队列则创建新的拉取任务。3 .消费进度消费者消费一条消息后需要记录消费的位置,这样在消费端重启的时候,继续从上一次消费的位点开始进行处理新的消息。在RocketMQ中,消息消费位点的存储是以消费组为单位的。集群模式下,消息消费进度存储在broker端,$ROCKETMQHOME)storeconfigconsumerffsel.json是其具体的存储文件,其中内容截图如下:"OffsetTable":“TopicTest部IeaSerenameuniquegroupname4";0:1,1:1,2:1,3:2)."%RETRY%please_ren<)in_unlque_roup_nane_4§please_rename_unique_group_naine_4":0可见消费进度的Key为:topicCOnSUmeGrOup,然后每一个队列一个偏移量。广播模式的消费进度文件存储在用户的主目录,默认文件全路劲名:$(UserjiomE)/.rocketmq_offsetSo4 .消费模型RocketMQ提供了并发消费、顺序消费两种消费模型。并发消费:对一个队列中消息,每一个消费者内部都会创建一个线程池,对队列中的消息多线程处理,即偏移量大的消息比偏移量小的消息有可能先消费。顺序消费:在某一项场景,例如MySQ1.binlog场景,需要消息按顺序进行消费。在RockotMQ中提供了基于队列的顺序消费模型,即尽管一个消费组中的消费者会创建一个多线程,但针对同一个Queue,会加锁。温馨提示:并发消费模型中,消息消费失败默认会重试16次,每一次的间隔时间不样;而顺序消费,如果条消息消费失败,则会直消费,直到消费成功。故在顺序消费的使用过程中,应用程序需要区分系统异常、业务异常,如果是不符合业务规则导致的异常,则重试多少次都无法消费成功,这个时候定要告警机制,及时进行人为干预,否则消费会积压。三、事务消息事务消息并不是为了解决分布式事务,而是提供消息发送与业务落库的一致性,其实现原理就是次分布式事务的具体运用,请看如下示例:publicstaticvoidmain(Stringargs)OrderServiceorderservice;DefaultMQProducermqProducer;/存储订单OpertaorlOrderService.SaveOrCreateOrder(null);/发送消息grtaor2mqProducer.send(;<null);)上述伪代码中,将订单存储关系型数据库中和将消息发送到MQ这是两个不同介质的两个操作,如果能保证消息发送、数据库存储这两个操作要么同时成功,要么同时失败,RocketMQ为了解决该问题引入了事务消息。温馨提示,本节主要的目的是让大家知晓各个术语的概念,由于事务消息的使用,将在该专栏的后续文章中详细介绍。四、定时消息开源版本的R。CketMQ目前并不支持任意精度的定时消息。所谓的定时消息就是将消息发送到Broker,但消费端不会立即消费,而是要到指定延迟时间后才能被消费端消费。RocketMQ目前支持指定级别的延迟,其延迟级别如下:Is5sIOs30sIm2m3m4m5m6m7m8m9mIOm20m30mIh2h五、消息过滤消息过滤是指消费端可以根据某些条件对一个topic中的消息进行过滤,即只消费一个主题下满足过滤条件的消息。RocketMQ目前主要的过滤机制是基于tag的过滤与基于消息属性的过滤,基于消息属性的过滤支持SQ1.92表达式,对消息进行过滤。六、小结本文的主要目的是介绍RocketMQ常见的术语,例如nameserver>broker>主题、消费组、消费者、队列负载算法、队列重平衡机制、并发消费、顺序消费、消费进度存储、定时消息、事务消息、消息过滤等基本概念,为后续的实战系列打下坚实基础。从下一篇开始,将正式开始R。CketMQ之旅,开始学习消息发送。1.2生产环境中,autoCreateTopicEnable为什么不能设置为true、现象很多网友会问,为什么明明集群中有多台Broker服务器,autoCreateTopicEnabie设置为true,表示开启Topic自动创建,但新创建的Topic的路由信息只包含在其中一台Broker服务器上,这是为什么呢?期望值:为了消息发送的高可用,希望新创建的Tope在集群中的每台BrOker上创建对应的队列,避免Broker的单节点故障。现象截图如下:Broker集群信息自动创建的iopicTest5的路由信息:IOPiCTeSt5只在broker-a服务器上创建了队列,并没有在broker-b服务器创建队列,不符合期望。默认读写队列的个数为4o我们再来看一下RocketMQ默认topic的路由信息截图如下:从图中可以默认Topic的路由信息为broker-a.broker-b上各8个队列。二、思考默认Topic的路由信息是如何创建的?.Topic的路由信息是存储在哪里?Nameserver?broker?RocketMQTopic默认队列个数。1 .ROeketMQ基本路由规则Broker在启动时向Nameserver注册存储在该服务器上的路由信息,并每隔30s向Narneserver发送心跳包,并更新路由信息。Nameserver每隔IoS扫描路由表,如果检测到BrOker服务宕机,则移除对应的路由信息。消息生产者每隔30s会从Nameserver重新拉取Topic的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向Nameserver拉取该主题的消息。回到本文的主题:autoCreateTopicEnable,开启自动创建主题,试想一下,如果生产者向一个不存在的主题发送消息时,上面的任何一个步骤都无法获取一个不存在的主题的路由信息,那该如何处理这种情况呢?在RocketMQ中,如果autOCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MiXAl1.AUTOCREATETOPICKEYTOPIC),此时消息发送者得到的路由信息为:但问题就来了,默认TOPiC在集群的每一台Broker上创建8个队列,那问题来了,为啥新创建的Topic只在一个Broker上创建4个队列?2 .探究autoCreateTopicEnable机制默认TOPiC路由创建时机温馨提示:本文不会详细跟踪整个整个源码创建过程,只会点出代码的关键入口点,如想详细了解NameServer路由消息、消息发送高可用的实现原理,建议查阅笔者的书籍RoCketMQ技术内幕第二、三章。Stepl:在BrOker启动流程中,会构建ToPiCConf&Manager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向top:CCOnfigTable中添加默认主题的路由信息。TopicConfigManager构造方法:/7MixAll.ACTO_CREATEJroPIJKEYJTOPICif(this.brokerController.getBrokerConfig().iSAutoCreateTopicEnabIe()Stringtopic=MixAl1.AUT0_CREATE_T0PIC_KEY_TOPIC,TopicConfigtopicConfig=newTopicConfig(topic):this.SysteiaTopic1.ist.add(topic);topicConfig.SetReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums():topicConfig.SetWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums();intperm=PermName.PERM_INHERITIPermName.PERM_READPermName.PERM_WRITE.topicConfig.setPerm(perm):this.topicConfigTable.put(topicConfig.getTopicName(),topicConfig);)备注:该topicConf'gTable中所有的路由信息,会随着Broker向Nameserver发送心跳包中,NarneSOrVer收到这些信息后,更新对应Topic的路由信息表。注意:BrokerConfig的defau.TOpicQueueNurri默认为8。两台Broker服务器都会运行上面的过程,故最终NanleSerVer中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。Step2:生产者寻找路由信息生产者首先向NameServer查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,R。CkelMQ会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer会向生产者返回默认主题的路由信息。然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?答案是会的,其代码如下:MQciientlnstancettupdateTopicRoutelnfoFrorriNcirneServer一.,;upsatvJpy2cRoutcaFxcu>avSex/-tricetopic.,.;DCfaUlI.OofenltMQPrGcvcordcfn11itMQPrc三ecr;!fUh门.1比kN39xy.?二6dg1.TgJrT1.Jfdg,S.TleeVait.WIUiSEC0XifITopicXouKeOataIopicItouxeDataifUiDefeullkkdefaultMQPradiiCtfi?-.)I100icRooDaQ.'<*"T'.gelDrf«u!tTy:cRouicTrfoFrc-X.ccServrrdcfaukVCFrodiccrgrxCrrXeToicKr;1000*3)i(topicltouteDtJ三11uUlI(,CQceDtdftt:tQp<Rtftc9a<.get>QucucDtaO>(ia:CuCKUaC-Math.4rj1144vfau2tM0Pxoducc.etDcfaultTopicQeucKuss<data.;>otK«adQueu«Nuat<>,daA.zerXrAdQuoutf,dxa-tU""Qu-N-<UiieNuiisI温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProduCerttdefaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaUKMQProducerdefaultTopicQueucNums默认值为4,故自动创建的主题,其队列数量默认为4。Step3:发送消息DefaultMQproduccrImplftsendKerneJmpSendMessaeeReauettRefiderreQuestHeader三11eSendMessageRequestHeaderO:rpquvxtHpAdcrr.JiProducPrGroup:r,:,;J,ZiJPr<<i,re3:PrcxinrcrGroupCi.rPQU÷ctHd÷r.÷*opten<.g*Top;<,rgwcH6*r."iDe!e,Top:r:?v-getCreateTopKryO).rustH÷ader.sD*fanltTopeQuNnns卜c”.MmgetDeHH81eQeeMaM()requestHeader.5et<ueucldQq.<etQuceId(1).requetHeadtfi.setSysFaB,.i*sflla/requst½adr.setorn7iBctoBp,Sstn.eurrntJi3yillii)qquoKtHoadvx.sctFlf(bsk.etFlag,»,AquoctHc»r.Propr>UcaOcodaragf0Prr:ti*c<::requejtHeader.sxReconsuDeT;»5<0*.9QU9stK9adQ.SQtVDXtModeithis.isVnxtM0d9<).在消息发送时的请求报文中,设置默认t。PC名称,消息发送toPC名称,使用的队列数量为DefaultMQproducerftdefaultTopicQueueNurns,即默认为4。Step4:Broker端收到消息后的处理流程服务端收到消息发送的处理器为:SendMcssageProcessor,在处理消息发送时,会调用SUPer.msgCheck方法:AbstractSendMessageProcessor#rnsgCheckTopicfonfietA>piC¢nf=I1.1-.j:).:.SexTopicContgManaerO.SelecxToplcContigCrecuesiKeader.etopic().:?Gl二二tcmcConf;/I!1»tODIESv-FldM三0;if(redustHacer.XsUnitMode)iif(xequeMHeader.getTopic<)-=twt三Wth(Mixl1.RETRGROUP_TOPIC_PSEFTJCi>IopicSysFlaM-TopicSysFleH.buildSysFlfelcthasUr.11Sb-)else(iop>icSysFlaiTopicSysFles.buiidSvsF19MUnltire.fol$e).,.<.vrn(t:ou.:”!,::.rui.:-:rcquctHedor.grtTcpic(J,ctx.channel().re11otodc!rc5s,1;',1.iXCQaiig::,-二-1;.getTopicConf-gmagcx<>.crcateTopicInSendXc53gcllethodrcauestHeader.getTDic(>lrequest11eder.getDefaultTopic(),RwotingHelper.ParSmCneIR受E>eAddretx.channel(>).reqiiestHender.e*tDefxltToPiUQUou&Xuq,().top;c5ysFleg)if(nj.7KaucCftnfW;i(reuesHeadcr.getTopc().startsVth(MxAli.RETR1.GRO1.P_)(CooicCcmf”三t1.,j,I。,'1r.CctTop1.cConfisMnnger()CreatrTopicTnScn<Wr:<$aeBncxUeIhod,"requestH¢ader.getTopic(.旧IDbBUlI8RuwdnI1.clPeBiQMPWoTEPernNanetOpicSybFlae)在Broker端,首先会使用TopicConfigManagor根据topic查询路由信息,如果Broker端不存在该主题的路由配置(路由信息),此时如果Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在BrOker创建新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另一方面会有一个定时任务,定时存储在broker端,具体路径为$ROCKETTQMEstoreCOnfig/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。广大读者朋友,跟踪到这一步的时候,大家应该对启用自动创建主题机制时,新主题是的路由信息是如何创建的,为了方便理解,给出创建主题序列图:ProducerBrokerNameServer每30s向NameSerVer拉取topic的最新路由信息创建系统Topic.如果允许自动创建Topic,则创J建默认主题的路由信息加教t。PiCSJSOn中J的路由配置信息定时任务(30s),汇报路由配消总发送从本地缓存路由表中查询路由信息更新路由信息如果本地未查询到,则根据t。PiC查找路由信息Il如果未查询到,则根据默认t。PiC查找路由信息如果为登询到,则抛出没有J找到路由信息错误-1如果找到,对队列数按条件改变,并选择一个队列,向QBroker发送消息发送消息-1如果不存在该t。PiC的路由信息,则尝试查找默认t。PiC的路由信息,如果能找到,则根掂请求中内队列数,创建对应的T。PiC路由信息,注意,此时还只存与与Broker的内存中现象分析经过上面自动创建路由机制的创建流程,我们可以比较容易的分析得出如下结论:因为开启了自动创建路由信息,消息发送者根据Topc去NameServer无法得到路由信息,但接下来根据默认Topic从NamoServer是能拿到路由信息(在每个Broker中,存在8个队列),因为两个Broker在启动时都会向NameServer汇报路由信息。此时消息发送者缓存的路由信息是2个Broker,每个Broker默认4个队列(原因见3.2.1:Step2的分析)°消息发送者然后按照轮询机制,发送第一条消息选择(broker-a的rnessageQueue:O),向Broker发送消息,BrOker服务器在处理消息时,首先会查看自己的路由配置管理器(T。PiCConfigManager)中的路由信息,此时不存在对应的路由信息,然后尝试查询是否存在默认TOPiC的路由信息,如果存在,说明启用了HUtoCreateTopicEnable,则在TopicConfigManager中创建新TOPiC的路由信息,此时存在与BrokerW务端的内存中,然后本次消息发送结束。此时,在NamCSerVer中还不存在新创建的Topic的路由信息。这里有三个关键点:1. 启用autoCreateTopicEnable创建主题时,在Broker端创建主题的时机为,消息生产者往Broker端发送消息时才会创建。2. 然后Broker端会在一个心跳包周期内,将新创建的路由信息发送到NameServer,于此同时,Broker端还会有一个定时任务,定时将内存中的路由信息,持久化到Broker端的磁盘上。3. 消息发送者会每隔30s向NameServer更新路由信息,如果消息发送端一段时间内未发送消息,就不会有消息发送集群内的第二台Broker,那么NamcServer中新创建的Topic的路由信息只会包含Broker-a,然后消息发送者会向NameServer拉取最新的路由信息,此时就会消息发送者原本缓存了2个broker的路由信息,将会变为一个Broker的路由信息,则该Topic的消息永远不会发送到另外一个Broker,就出现了上述现象。原因就分析到这里了,现在我们还可以的大

    注意事项

    本文(2022Apache RocketMQ 从入门到实战.docx)为本站会员(夺命阿水)主动上传,课桌文档仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知课桌文档(点击联系客服),我们立即给予删除!

    温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。




    备案号:宁ICP备20000045号-1

    经营许可证:宁B2-20210002

    宁公网安备 64010402000986号

    课桌文档
    收起
    展开