欢迎来到课桌文档! | 帮助中心 课桌文档-建筑工程资料库
课桌文档
全部分类
  • 党建之窗>
  • 感悟体会>
  • 百家争鸣>
  • 教育整顿>
  • 文笔提升>
  • 热门分类>
  • 计划总结>
  • 致辞演讲>
  • 在线阅读>
  • ImageVerifierCode 换一换
    首页 课桌文档 > 资源分类 > DOCX文档下载  

    2022Apache Flink 十大技术难点实战.docx

    • 资源ID:1421964       资源大小:665.26KB        全文页数:110页
    • 资源格式: DOCX        下载积分:5金币
    快捷下载 游客一键下载
    会员登录下载
    三方登录下载: 微信开放平台登录 QQ登录  
    下载资源需要5金币
    邮箱/手机:
    温馨提示:
    用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)
    支付方式: 支付宝    微信支付   
    验证码:   换一换

    加入VIP免费专享
     
    账号:
    密码:
    验证码:   换一换
      忘记密码?
        
    友情提示
    2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
    3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
    4、本站资源下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。
    5、试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓。

    2022Apache Flink 十大技术难点实战.docx

    ApacheFlink十大技术难点买战I目录102万行代码,1270个问题,Flink新版发布了什么?4从开发到生产上线,如何确定集群规划大小?11Demo:基于FIinkSQ1.构建流式应用22FlinkCheckpoint问题排查实用指南37如何分析及处理Flink反压?48FlinkonYARN(±):一张图轻松掌握基础架构与启动流程56FlinkonYARN(下):常见问题与排查思路64ApacheFlink与ApacheHive的集成72FlinkBatchSQ1.1.10实践83如何在PyFlink1.10中自定义PythonUDF?90107Flink1.10NativeKUberneteS原理与实践102万行代码,1270个问题,Flink新版发布了什么?导读:ApacheFlink是公认的新一代开源大数据计算引擎,可以支持流处理、批处理和机器学习等多种计算形态,也是Apache软件基金会和GitHub社区最为活跃的项目之一。2019年1月,阿里巴巴实时计算团队宣布将经过双十一历练和集团内部业务打磨的Blink引擎进行开源并向ApacheFiink贡献代码,此后的一年中,阿里巴巴实时计算团队与ApacheFlink社区密切合作,持续推进FIink对Blink的整合。2月12日,ApacheFlink1.10.0正式发布,在Flink的第一个双位数版本中正式完成了Blink向FIink的合并。在此基础之上,Flink1.10版本在生产可用性、功能、性能上都有大幅提升。本文将详细为大家介绍该版本的重大变更与新增特性。Flink1.10是迄今为止规模最大的一次版本升级,除标志着Blink的合并完成外,还实现了Flink作业的整体性能及稳定性的显著优化、对原生Kubernetes的初步集成以及对Python支持(PyFlink)的重大优化等。综述Flink1.10.0版本一共有218名贡献者,解决了1270个JIRAissue,经由2661个Comrnii总共提交了超过102万行代码,多项数据对比之前的几个版本都有所提升,印证着Flink开源社区的蓬勃发展。1.7.0版本1.8.0版本1.9.0版本1.10.0版本解决的问题数量4284229771270代码提交次数969109419642661贡献者人数112140190218其中阿里巴巴实时计算团队共提交64.5万行代码,超过总代码量的60%,做出了突出的贡献。CommitsCode1.ines在该版本中,FIink对SQ1.的DD1.进行了增强,并实现了生产级别的Batch支持和Hive兼容,其中TPC-DSIOT的性能更是达到了Hive3.0的7倍之多。在内核方面,对内存管理进行了优化。在生态方面,增加了PythonUDF和原生Kubernetes集成的支持。后续章节将在这些方面分别进行详细介绍。内存管理优化在旧版本的Flink中,流处理和批处理的内存配置是割裂的,并且当流式作业配置使用RocksDB存储状态数据时,很难限制其内存使用,从而在容器环境下经常出现内存超用被杀的情况。在1.10.0中,我们对TaskExecutor的内存模型,尤其是受管理内存(ManagedMemory)进行了大幅度的改进(F1.IP-49),使得内存配置对用户更加清晰:PressMemoryFBnkMemoryFramewortcHeapMemoryFrameworkOff-HeapMemoryTaskHeapMemoryTaskOff-HeapMemoryNetworkMemoryManagedMemoryOn-HeapOff-HeapJVMMetaspaceJVMOverhead此外,我们还将RocksDBstatebackend使用的内存纳入了托管范畴,同时可以通过简单的配置来指定其能使用的内存上限和读写缓存比例(F1.INK-7289)o如下图所示,在实际测试当中受控前后的内存使用差别非常明显。受控前的内存使用情况(share-slot)IaftaaeUeFaMW;3j、l1MI.HMwm%小心>A山山儿»1»nwAM«0»MMUMi.IMWMrUIMMVUW/MU>1me”jna<uocm(HttaM>*vyIMWIMWBW.I2VS623Z>4,Wh,-BrrwHr”扁版MMhuU¼MM曲WMAO»W30»«>M0*««*-*M*l-K-wmwMetoVMteM"5j'Fv<4>jM<kUtMUvlVMl>4WUM<MiH/IlCMNR.mm>(c*)m受控后的内存使用情况(share-slot)Batch兼容Hive且生产可用Flink从1.9.0版本开始支持Hive集成,但并未完全兼容。在1.10.0中我们对Hive兼容性做了进一步的增强,使其达到生产可用的标准。具体来说,FEnk1.10.0中支持: Meta兼容-支持直接读取Hivecatalog,覆盖Hive1.x2.x3.X全部版本 数据格式兼容-支持直接读取Hve表,同时也支持写成Hve表的格式;支持分区表 UDF兼容-支持在FlinkSQ1.内直接调用Ilive的UDF,UDTF和UDAF与此同时,1.10.0版本中对batch执行进行了进一步的优化(F1.INK-14133),主要包括: 向量化读取ORC(F1.INK-14135) 基于比例的弹性内存分配(F1.IP-53) Shuffle的压缩(F1.INK-14845) 基于新调度框架的优化(F1.INKT4735)在此基础上将Flink作为计算引擎访问Hive的meIa和数据,在TPC-DSIOTbenchmark下性能达到Hive3.0的7倍以上。TPC-DSBenchmarkFlink7XFasterthanHiveSQ1.DD1.增强Flink1.10.0支持在SQ1.建表语句中定义watermark和计算列,以watermark为例:CREATETAB1.Etablejame(WATERMARKFORCOlUmnNameAS<watermark_strategy_expression>)WITH()除此之外,Flink1.10.0还在SQ1.中对临时函数/永久函数以及系统/目录函数进行了明确区分,并支持创建目录函数、临时函数以及临时系统函数:CREATE(TEMPORARYTEMPORARYSYSTEMFUNCTIONIFNOTEXISTScatalog_name.db_name.function_nameASidentifier1.ANGUAGEJAVASCA1.APythonUDF支持Flink从1.9.0版本开始增加了对Python的支持(PyFlink),但用户只能使用Java开发的User-defined-function(UDF),具有一定的局限性。在1.10.0中我们为PyFlink增加了原生UDF支持(F1.IP-58),用户现在可以在TableAPI/SQ1.中注册并使用自定义函数,如下图所示:,ifos.path.ex1sts(s1nk_path): ifos.path.Isf1le(s1nk-path):,os.renove(s1nk-path) else: Shutll.r«tree(stnk_path) senv.setParalIeIlSla(D t-三st_env.fronelements(l.'hi,.hello').(2.hi'.,hello')1.I'a,.'b'.,c) st_env.connect(F1leSysten().path(sink_path),.wlth-fornat(OldCsv() 一fielddel1n1ter(',*).field(a-.DataTypes.BIGINT(),.f1eldCb",DataTypes.STRING(),.field(c-,DataTypes.STRIMG(),.w1th_schema(Scheaa(),.fieldCa,DataTypes.BIGINT().f1eld(-b-,DataTypes.STRING(),.f1eld(c-,DataTypes1STRINGO),.re|ister_table_sink("strean_s1nk-).,t.select("a1.b.c,).1nsert_1nto("strea«_s1nk'),st_env.execute("streaB_job")>>>I同时也可以方便的通过pip安装PyFlink:pipinstallapache-flink更多详细介绍,请参考:hltRs:"enjoyment.cool20200219Deep-dive-how-tc-SUPPort-Python-UDF-in-Apache-F1ink-ITO/原生Kubernetes集成Kubernetes(K8S)是目前最为流行的容器编排系统,也是目前最流行的容器化应用发布平台。在旧版本当中,想要在K8S上部署和管理一个Flink集群比较复杂,需要对容器、算子及kubectl等K8S命令有所了解。在Flink1.10中,我们推出了对K8S环境的原生支持(F1.lNK-9953),Flink的资源管理器会主动和Kubernetes通信,按需申请pod,从而可以在多租户环境中以较少的资源开销启动Flink,使用起来也更加的方便。更多内容,参考1.10.0版本发布日志:htips";ci.apache.Org/projects;f!:nk£ink-docs-stab沦/release-notes;flinkT.IOhtml结语2019年1月,阿里巴巴实时计算团队宣布Blink开源。整整一年之后,Flink1.10.0版本的发布宣告Flink和Blink的整合正式完成。我们践行着自己的诺言,开放源码,更相信社区的力量,相信社区是开源协作精神与创新的摇篮。我们也衷心希望有更多的志同道合的小伙伴加入我们,一起把APaCheFIink做的越来越好!I从开发到生产上线,如何确定集群规划大小?在Flink社区中,最常被问到的问题之一是:在从开发到生产上线的过程中如何确定集群的大小。这个问题的标准答案显然是“视情况而定”,但这并非一个有用的答案。本文概述了一系列的相关问题,通过回答这些问题,或许你能得出一些数字作为指导和参考。计算并建立一个基线第一步是仔细考虑应用程序的运维指标,以达到所需资源的基线。需要考虑的关键指标是: 每秒记录数和每条记录的大小 已有的不同键(key)的数量和每个键对应的状态大小 状态更新的次数和状态后端的访问模式最后,一个更实际的问题是与客户之间围绕停机时间、延迟和最大吞吐量的服务级别协议(sla),因为这些直接影响容量规划。接下来,根据预算,看看有什么可用的资源。例如: 网络容量,同时把使用网络的外部服务也纳入考虑,如Kafka、HDFS等。 磁盘带宽,如果您依赖于基于磁盘的状态后端,如RocksDB(并考虑其他磁盘使用,如Kafka或HDFS) 可用的机器数量、CPU和内存基于所有这些因素,现在可以为正常运行构建一个基线,外加一个资源缓冲量用于恢复追赶或处理负载尖峰。建议您在建立基线时也考虑检查点期间(checkpoim-ing)使用的资源情况。示例:数据说明当前在假设的集群上计划作业部署,将建立资源使用基线的过程可视化。这些数字是粗略的值,它们并不全面在文章的最后将进一步说明在进行计算过程中遗漏的部分。Flink流计算作业和硬件示例Flink流计算作业拓扑示例在本案例中,我将部署一个典型的Flmk流处理作业,该作业使用Fiink的Kafka数据消费者从Kafka消息源中读取数据。然后使用带键的总计窗口运穿符(windowoperator)进行转换运算。窗口运算符在时间窗口5分钟执行聚合。由于总是有新的数据,故符把窗口配置为1分钟的滑动窗口(slidingwindow)o这意味着将在每分钟更新过去5分钟的聚合量。流计算作业为每个用户id创建一个合计量。从Kafka消息源消费的每条消息大小(平均)为2kbo假设吞吐量为每秒100万条消息。要了解窗口运算符(WindOWOPerator)的状态大小,需要知道不同键的数目。在本例中,键(keys)是用户id的数量,即500000000个不同的用户。对于每个用户,需要计算四个数字,存储为长整形(8字节)。总结一下工作的关键指标: 消息大小:2KB 吞吐量:100OOOomSg/秒 不同键数量:500000000(窗1.l聚合:每个键4个长整形) Checkpointing:每分钟一次。Hardware:5machines10gigabitEthernet,EachmachinerunningaFIinkTaskManager,DisksareattachedviathenetworkKafkaisseparate假定的硬件设置如上图所示,共有五台机器在运行作业,每台机器运行一个Flink任务管理器(Flink的工作节点)。磁盘是通过网络相互连接的(这在云设置中很常见),从主交换机到运行TaskManager的每台计算机都由一个10千兆位以太网连接。Kafka缓存f三(brokers)在不同的机器上分开运行。每台机器有16个CPU核。为了简化处理,不考虑CPU和内存需求。但实际情况中,根据应用程序逻辑和正在使用的状态后端,我们需要注意内存。这个例子使用了一个基于R。CkSDB的状态后端,它稳定并且内存需求很低。从单独的一台机器的视角要了解整个作业部署的资源需求,最容易的方法是先关注一台计算机和一个TaskManager中的操作。然后,可以使用一台计算机的数字来计算总体资源需求量。默认情况下(如果所有运算符具有相同的并行度并且没有特殊的调度限制),流作业的所有运算符都在每一台计算机上运行。在这种情况下,Kafka源(或消息消费者)、窗口运算符和Kafka发送端(或消息生产者)都在这五台机器上运行。TaskManagernKafkaSourcekeyBywindowKafkaSink机器视角图-TaskManagcrn从上图来看,keyBy是一个单独运算符,因此计算资源需求更容易。实际上,keyBy是一个API构造,并转换为Kafkasource和窗口运算符(WindoWoperator)之间连接的配置属性。以下将自上而下地分析(上图)这些运算符,了解他们的网络资源需求。TheKafkasource要计算单个Karka源(SOUrCe)接收的数据量,我们首先计算Kafka的合计输入。这些source每秒接收1000000条消息,每条消息大小为2KBo2KBX1,000,000/s=2GB/s将2GB/s除以机器数(5)得到以下结果:2GB/s÷5台机器=400MB/s群集中运行的5个Kafka源中的每一个都接收平均吞吐量为400MB/s的数据结果。10GigebitEthernet(FullDuplex)In:1250MB/sKaftea:400MB/s2KB*1f000f000=2GBs2GBs/5machines400MB/s10GgbitEthernet(FullDuplex)Out:1250MB/sKafkasource的计算过程TheShuffle/keyBy接下来,需要确保具有相同键(在本例中为用户id)的所有事件都在同一台计算机上结束。正在读取的Kafka消息源的数据(在Kafka中)可能会根据不同的分区方案进行分区。Shuffle过程将具有相同键的所有数据发送到一台计算机,因此需要将来自Kafka的400MB/s数据流拆分为一个userid分区流:400MB/s÷5台机器=80MB/s平均而言,我们必须向每台计算机发送80MBs的数据。此分析是从一台机器的角度进行的,这意味着某些数据已经在指定的目标机器运行了,因此减去80MB/s即可:400MB/s-80MB=320MB/s可以得到结果:每台机器以320MB/s的速率接收和发送用户数据。IOGigabitEther110UFullDuplex)In:1250MB/sKafa:400MB/s2KB*1,000,0002GBs2GBs/5machines=400MB/sShuffle:320MB/s10GigabftEthernet(FullDuplex)Out:1250MB/s400MBs/5receivers三80MBs1receiverislocal,4remote:4*80=320MB/sout4ShUff加:320MB/sTheshuffle的计算过程Window窗口输出和Kafka发送下一个要问的问题是窗口运算符发出多少数据并发送到Kafka接收器。答案是67MB/s,我们来解释一下我们是怎么得到这个数字的。窗口运算符为每个键(key)保留4个数字(表示为长整形)的聚合值。运算符每分钟发出一次当前聚合总值。每个键从聚合中发出2个整形(userid,window_ts)和4个长整形:(2x4字节)+(4x8字节)=每个键40字节然后将键的总数(500000000除以机器数量)计算在内:100000000个keysX40个字节=4GB(从每台机器来看)然后计算每秒大小:4GB/W÷60=67MB/秒(由每个任务管理器发出)这意味着每个任务管理器平均从窗口运算符发出67MBZs的用户数据。由于每个任务管理器上都有一个Kafka发送端(和窗口运算符在同一个任务管理器中),并且没有进一步的重新分区,所以这得到的是FIink向Kafka发送的数据量。10GigabitEthernetFtillDuplex)In:1250MB/sKafka:4MB/s2KB*1,000,000=2GBs2GBs/5machines-400MB/sShuffle:320MB/s10GgabrtEthernet(FullDuplex)Out:1250MB/s400MBs5receivers=80MBs1receiverislocal,4remote:41f80320MB/soutShUffIe:320MB/s4GB/minute=>67MB/second(onaverage)Kafa:67MB/s用户数据:从Kafka,分发到窗口运算符并返回到Kafka窗口运算器的数据发射预计将是“突发”的,因为它们每分钟发送一次数据。实际上,运算符不会以67mb/s的恒定速率给客户发送数据,而是每分钟内将可用带宽最大化几秒钟。这些总计为:数据输入:每台机器720MB/s(400+320)数据输出:每台机器387MB/s(320+67)IOGigabitEthernet(FullDuplex)In:1250MB/sKafka:400MB/sShuffle:320MB/sTotalIn:720MB/sIOGigabnEthernet(FullDuplex)Out:1250MB/sTotalOut:387MB/s 状态访问和检查点这不是全部的(内容)。到目前为止,我只查看了Fink正在处理的用户数据。在实际情况中需要计入从磁盘访问的开销,包括到RocksDB的存储状态和检查点。要了解磁盘访问成本,请查看窗口运算符(WindoWoperator)如何访问状态。Kafka源也保持一定的状态,但与窗IJ运算符相比,它可以忽略不计。要了解窗IJ运算符(windowoperator)的状态大小,需要从不同的角度进行查看。FEnk正在用1分钟的滑动窗口计算5分钟的窗口量。Flink通过维护五个窗口来实现滑动窗口,每次滑动都对应一个1分钟的窗口。如前所述,当使用窗口实现即时聚合时,将为每个窗口中的每个键(key)维护40字节的状态。对于每个传入事件,首先需要从磁盘检索当前聚合值(读取40字节),更新聚合值,然后将新值写回(写入40字节)。管口状态这意味着:40字节状态X5个窗口X每台计算机200000msg/s=40MBs即需要的每台计算机的读或写磁盘访问权限。如前所述,磁盘是网络相互连接的,因此需要将这些数字添加到总吞吐量计算中。现在总数是: 数据输入:760MB/s(400MB/s数据输入+320MB/s随机播放+40MB/s状态)数据输出:427MB/s(320MB/s随机播放+67MB/s数据输出+40MB/s状态)10GabtEthernet(FullDuplex)In:1250MB/sTotalIn:760MB/sKafka:400MB/s】MiwShuffle:320MB/s】DiSkread:40MB/sShuffle:320MB/s1Kafka:67MB/s】TotalOut:427MB/s10GgabtEthernet(FullDuplex)Out:1250MBZs上述考虑是针对状态访问的,当新事件到达窗1.l运算符时,状态访问会持续进行,还需要容错启用检查点。如果机器或其他部分出现故障,需要恢复窗口内容并继续处理。检查点设置为每分钟一个检查点,每个检查点将作业的整个状态复制到网络连接的文件系统中。让我们一起来看看每台计算机上的整个状态有多大:40字节状态X5个窗口X100000000个keys=20GB并且,要获得每秒的值:20GB÷60=333MB/秒与窗口运算类似,检查点是突发的,每分钟一次,它都试图将数据全速发送到外部存储器。Checkpointing引发对RocksDB的额外状态访问(在本案例中,RocksDB位于网络连接的磁盘上)。自Flinkl.3版本以来,RocksDB状态后端支持增量CheCkPoinI,概念上通过仅发送自上一个CheCkPOinl以来的变化量,减少了每个CheCkPOint上所需的网络传输,但本例中不使用此功能。这会将总数更新为: 数据输入:760MB/s(400+320+40) 数据输出:760MB/s(320+67+40+333)10GigabrtEthernet(FullDuplex)Out:1250MB/sDiskwrite:40MB/sTotalOut:70MB/s10GigabitEthernet(FullDuplex)In:1250MB/sKafa:400MB/sShuffle:320M8sTotalIn:760MB/sDiskread:40MB/s这意味着整个网络流量为:760+760x5+400÷2335=10335MB/秒400是5台机器上80MB状态访问(读写)进程的总和,2335是集群上Kafka输入和输出进程的总和。这大概是上图所示硬件设置中可用网络容量的一半以上。Overallnetworktraffic:联网要求补充一点,这些计算都不包括协议开销,例如来自Flink.Kafka或文件系统的TCP、Ethernet和RPC调用。但这仍然是一个很好的出发点,可以帮助您了解工作所需的硬件类型,以及性能指标。扩展方法基于以上分析,这个例子,在一个5节点集群的典型运行中,每台机器都需要处理760个Mb/s的数据,无论是输入还是输出,从1250Mb/s的总容量来看,它保留了大约40%的网络容量因为部分被主观所简化的复杂因素,例如网络协议开销、从检查点恢复事件重放期间的重我,以及由数据歪斜引起的跨集群的负载不平衡。对于40%的净空是否合适,没有一个一刀切的答案,但是这个算法应该是一个很好的起点。尝试上面的计算,更换机器数量、键(keys)的数量或每秒的消息数,选择要考虑的运维指标,然后将其与您的预算和运维因素相平衡。IDemo:基于FlinkSQ1.构建流式应用上周四在Flink中文社区钉钉群中直播分享了Dem。:基于FlinkSQ1.构建流式应用,直播内容偏向实战演示。这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除Flink外其他组件全部采用DockerCompose安装,简化准备流程。读者也可以结合视频和本文一起学习。完整分享可以观看视频回顾:h-WWWFlink1.10.0于近期刚发布,释放了许多令人激动的新特性。尤其是FlinkSQ1.模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用FlinkSQ1.如何快速构建流式应用。本文将基于Kafka,MySQ1.,Elasticsearch,Kibana,使用FIinkSQ1.构建一个电商用户行为的实时分析应用。本文所有的实战演练都将在FlinkSQ1.C1.I上执行,全程只涉及SQ1.纯文本,无需一行Java/SCaIa代码,无需安装IDEo本实战演练的最终效果图:准备一台装有Docker和Java8的1.inUX或MaCoS计算机。使用DockerCompose启动容器本实战演示所依赖的组件全都编排到了容器中,因此可以通过docker-com-pose一键启动。你可以通过wget命令自动下载该docker-compose.yml文件,也可以手动下载。mkdirflink-demo;cdflink-demo;wget该DockerCompose中包含的容器有:.DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到Kafka集群中。默认每秒生成1000条数据,持续生成约3小时。也可以更改docker-compose.yml中datagen的speedup参数来调整生成速率(重启dockercompose才能生效)。 MySQ1.:集成了MySQ1.5.7,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。 Kafka:主要用作数据源。DataGen组件会自动将数据灌入这个容器中。 Zookeeper:Kafka容器依赖。 Elasticsearch:主要存储FlinkSQ1.产出的数据。 Kibana:可视化Elasticsearch中的数据。在启动容器前,建议修改Docker的配置,将资源调整到4GB以及4核。启动所有的容器,只需要在d。Cker-compose.yml所在目录下运行如下命令。docker-composeup-d该命令会以detached模式自动启动DockerCompose配置中定义的所有容器。你可以通过dockerps来观察上述的五个容器是否正常启动了。也可以访问http:I:<.Ihos:;601来查看Kibana是否运行正常。另外可以通过如下命令停止所有的容器:docker-composedown下载安装Flink本地集群我们推荐用户手动下载安装Flink,而不是通过Docker自动启动Flinko因为这样可以更直观地理解Flink的各个组件、依赖、和脚本。1 .下载Flink1.10.0安装包并解压(解压目录flink-1.10.0):hltp”:apache,org/dist/flink/flinkT.10.O/flink1.10.0binscala2.I1.tgz2 .进入fIinkT.10.0目录:cdflink-1.10.03 .通过如下命令下载依赖jar包,并拷贝到Iib/目录下,也可手动下载和拷贝。因为我们运行时需要依赖各个connector实现。wget-P./Iib/https:/repol.maven.org/maven2/org/apache/flink/flink-json/1.1。.Q/f:1.ink-json-1.10.。.jarwget-P.libhttps:/!rep。1.InaVen.orgmaven2orgapacheflinkfIink-sql-connector-kafka_2.11/1.10.O/flink-sql-connector-kafka_2.11-1.10.0.jarwget-P./lib/:.:;:>:;<:sql-connector-elasticsearch6_2.11/1.10.0flink-sql-connector-elasticsearch6_2.11-1.10.0.jarwget-P.libhttpszrepol.maven.org/:raacheflinkfIink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jarwget-P.libhttps:/repol.maven.org/;:,sql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar4 .conf/flink-conf.yaml中的taskmanager.numberfTaskSlots修改成10,因为我们会同时运行多个任务。5 .执行binstart-cluster.sh,启动集群。运行成功的话,可以在http:l-l:8081访问到FlinkWebUIo并且可以看到可用Slots数为10个。6.执行bin/sql-client.shembedded启动SQ1.C1.1.便会看到如下的松鼠欢迎界面。使用DD1.创建Kafka表Datagen容器在启动后会往Kafka的user_behaviortopic中持续不断地写入数据。数据包含了2017年11月27日一天的用户行为(行为包括点击、购买、加购、喜欢),每一行表示一条用户行为,以JSON的格式由用户ID、商品ID、商品类目ID、行为类型和时间组成。该原始数据集来自阿里n;天池公开数据集,特此鸣谢。我们可以在docker-compose.yml所在目录下运行如下命令,查看Kafka集群中生成的前10条数据。docker-composeexeckafkabash-c,kafka-console-consumer.sh-topicuser_behavior-bootstrap-serverkafka:9094from-beginning-max-messages10'"user-id":"952483"z',item-id":"310884","category-id":"4580532","behavior":"pv",ntsn:112017-ll-27T00t00r00Z,user-id":"794777","item_id":"5119439","category_id":"982926”,"behavior":,'pv,"ts":,2017-ll-27T00:00:00Z"有了数据源后,我们就可以用DD1.去创建并连接这个Kafka中的topic了。在FlinkSQ1.C1.I中执行该DD1.oCREATETAB1.EUSeJbehaVior(user_idBIGINT,item_idBIGINTjcategory-idBIGINTzbehaviorSTRING,tsTIMESTAMP(3)zproctimeasPROCTIME(),一通过计算列产生一个处埋时间列WATERMARKFORtsasts-INTERVA1.'5,SECOND在ts上定义watermark,ts成为事件时间列)WITH(,connector.type*-*kafka,-使用kafkaconnector,connector.version'=,universal',-kafka版本,universal支持0.11以上的版本,connector.topic,=,user-behavior,一kafkatopic,connector.startup-mode,='earliest-offset1,从起始OffSet开始读取,connector.properties.zookeeper.connect,=,localhost:2181,一zookeeper地址,connector.properties.bootstrap.servers,",localhost:9092',-kafkabroker地址,format.type'=,json,-数据源格式为json);如上我们按照数据的格式声明了5个字段,除此之外,我们还通过计算列语法和Proctimeo内置函数声明了一个产生处理时间的虚拟列。我们还通过WATERMARK语法,在ts字段上声明了watermark策略(容忍5秒乱序),ts字段因此也成了事件时间列。关于时间属性以及DD1.语法可以阅读官方文档了解更多:时间属性:hllps:ci.apne,'.nix(心11inf1ink-docsrclCaSIT.10/dev/tableStreaming/timeattributes,htmlDD1.:https:"ci.apache.Org/PrOjeCtS/fEnk/flink-docs-release-1.10devIabIesqlcreale.hlinl#CreaIeTable在SQ1.C1.l中成功创建Kafka表后,可以通过showtables;和describeuser_behavior;来查看目前已注册的表,以及表的详细信息。我们也可以直接在SQ1.C1.I中运行SE1.ECT*I;ROMuSCrbehaViOr:预览下数据(按q退出)。接下来,我们会通过三个实战场景来更深入地了解FlinkSQ1.o统计每小时的成交量使用DD1.创建Elasticsearch表我们先在SQ1.C1.I中创建一个ES结果表,根据场景需求主要需要保存两个数据:小时、成交量。CREATETAB1

    注意事项

    本文(2022Apache Flink 十大技术难点实战.docx)为本站会员(夺命阿水)主动上传,课桌文档仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知课桌文档(点击联系客服),我们立即给予删除!

    温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。




    备案号:宁ICP备20000045号-1

    经营许可证:宁B2-20210002

    宁公网安备 64010402000986号

    课桌文档
    收起
    展开