欢迎来到课桌文档! | 帮助中心 课桌文档-建筑工程资料库
课桌文档
全部分类
  • 党建之窗>
  • 感悟体会>
  • 百家争鸣>
  • 教育整顿>
  • 文笔提升>
  • 热门分类>
  • 计划总结>
  • 致辞演讲>
  • 在线阅读>
  • ImageVerifierCode 换一换
    首页 课桌文档 > 资源分类 > DOCX文档下载  

    《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx

    • 资源ID:1242638       资源大小:60.77KB        全文页数:7页
    • 资源格式: DOCX        下载积分:5金币
    快捷下载 游客一键下载
    会员登录下载
    三方登录下载: 微信开放平台登录 QQ登录  
    下载资源需要5金币
    邮箱/手机:
    温馨提示:
    用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)
    支付方式: 支付宝    微信支付   
    验证码:   换一换

    加入VIP免费专享
     
    账号:
    密码:
    验证码:   换一换
      忘记密码?
        
    友情提示
    2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
    3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
    4、本站资源下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。
    5、试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓。

    《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx

    课题读取电影评分数据创建DStream课时2课时(90min)教学目标知识技能目标:(1)熟悉基础数据源(2)熟悉高级数据源(3)掌握读取数据创建DStream的方法素质目标:培养自我学习和持续学习能力,能够及时掌握新技术和工具,并将其应用到实际项目中教学重难点教学重点:基础数据源、高级数据源教学难点:读取数据创建DS(ream教学方法案例分析法、问答法、讨论法、讲授法教学用具电脑、投影仪、多媒体课件、教材教学过程主要教学内容及步骤课前任务【教师】布置课前任务,和学生负责人取得联系,让其提醒同学通过APP或其他学习软件,完成课前任务请大家了解什么是数据源,什么是DSlream.【学生】完成课前任务考勤【教师】使用APP进行签到【学生】班干部报请假人员及原因问题导入(5min)【教师】提出以下问题:什么是数据源?数据源可分为哪些类型?【学生】思考、举手回答传授新知【教师】通过学生的回答引入新知,介绍基础数据源和高级数据源的相关知识一、基础数据源【教师】介绍基础数据源的概念和类型在SparkStreaming中,基础数据源指的是可以用来读取实时数据并创建DStream的常见数据源。这些数据源已经被广泛使用和测试,并且被集成到了SparkStreaming框架中,用户只需调用相应的API即可读取数据。基础数据源包括文件流、套接字流和RDD队列流等。1.文件流在SparkStreaming中,文件流(filestream)是一种可以从本地文件系统或分布式文件系统(如HDFS)中读取数据的输入流。它允许将一个目录视为一个数据源,并读取目录中实时生成或更新的文件。在SParkStreaming中,可以使用textFileStream()方法创建DStream定义一个输入流用于监视HadOOP兼容的文件系统中的新文件,并将其作为文本文件读取。文件必须通过同一文件系统中的另一个位置移动到监视目录中。该方法的基本格式如下。(extFileStream(directory)其中,参数directory表示指定的目录。读取不同文件流创建DStream的参考示例如下。ssc=SIreamingCOnIeXl(SC,10)#读取本地文件流dstream_(ext=ssc.IexiFileSlream("file:/spark_dstream")曦取HDFS文件流dstream-hdfs=ssc.textFileStream("hdfs:/spark_dstream")【教师】通过例子,帮助学生掌握文件流的应用【例4-1以读取HDFS文件为例,编写SparkStreaming应用程序实时监视HDFS文件目录,当发现新文件到达后,输出文件中的数据。打开第1个终端,执行以下命令,启动HDFS服务并创建"spark_dstream”目录.hadoopbogon$Cdusrlocalhadoopsbinhadoop(三)bogonsbin$./start-dfs.sh#在HDFS上新建一个Hspark_dstreamH目录hadoo(3)bogonsbin$Cdusrlocalhadoopbinhadoopbogonbin$hdfsdfs-mkdirspark-dstream在"usrlocalsparkmycodeDSIream”目录下新建3个文件,分别为filel.txl、file2.lxt和file3.ixl,其内容如图4-9所示。耽3,Bfi,elttt保存三*HRio1,aME夕存二X11(O),B3保存三JM-IloveSpark10veHddOoPIloveDStrean1amIearnllgSMra】nlearningHadoopXanlearningDStrcanSparkXSverySiNIeHadoopisverysinpleOStreamisverySinple女本8-«3fi,»215114人文本8我符3L8。第3行,第7列福人文本.帮表为贡度:8淤3行,第8外Ja入图4-93个文件的内容打开第2个终端,执行以下命令,进入PySpark交互式执行环境,编写代码,监视HDFS文件目录。SparkStreaming实时计算启动后,还未接收到数据时,终端显示的信息如图4-10所示。hadoopbogon$pyspark»>frompyspak.streamingimportStreamingContext舱(J建StreamingContext对象,设置批处理时间间隔为20秒»>ssc=StreamingContext(sc,20)跄J建DStream,监视HDFS文件目录»>dstream=ssc.textFileStream("hdfs:/spark_dstream")# 打印监懒!1的瘫»>dstream.pprint()# 启动StreamingContext对象»>ssc.start()# 等待StreamingContexi对象终止»>ssc.awaitTermina(ion()图4-10未接收至媵煽时的终端显示信息在第1个终端上执行以下命令,将filel.txt.file2.txt和file3.txt文件依次上传到HDFS的wspark-dstreamw目录下。hadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile1.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile2.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile3.txtspark-dstream在第2个终端可以监视到HDFS不断有数据流入,并输出结果,如图4-11所示。hadoopbogon:X文件(F)嫡辑(E)查看M搜索终端(T)帮助(三)Time:2023-08-0314:02:40IloveSparkIamlearningSparkSparkisverysimpleTime:2023-08-0314:03:00IloveHadoopIamlearningHadoopHadoopisverysimpleTime:2023-08-0314:03:20IloveDStreamIamlearningDStreamDStreamisverysimple图4-11SparkStreaming监视HDFS并输出结果2.套接字流套接字流(socketstream)是SParkStreaming中用于从网络套接字接收数据的输入流。它可以连接到指定的主机和端口,并实时接收通过套接字发送的数据。在SparkStreaming中,可以使用SOCkelTeXlStream()方法读取套接字流创建DStreame该方法的基本格式如下.socketTextStream(hostname,port,StorageLevel)其中,参数的含义如下.(1)hostname:表示要连接的主机名或IP地址。(2)port:表示要连接的端口号。(3)StOrageLeVeI(可选):表示流数据的存储级别,常见的存储级别包括MEMORY-ONLY.MEMoRY_AND_DISK、MEMORY_ONLY_SER和MEMORY_AND_DISK_SER等,默认值为MEMORY_AND_DISK_SER_2e(详见教材)3RDD队列流RDD队列流由一个RDD队列构成,其中每个RDD包含作为输入源数据的批次内容。在SparkStreaming中,可以使用queueStream()方法创建基于RDD队列的DStreame该方法的基本格式如下。queueStream(rdds,OneAtATime,deult)其中,参数的含义如下.(1)rdds:RDD队列。(2)OneAtATime(可选):每次选取一RDD还是一次性选取所有RDDe默认值为True,即每次只选取一个RDDe(3)default(可选):当队列为空时返回的默认值。如果队列中没有可用的RDD时,返回此默认值。(详见教材)【教师】通过例子,帮助学生掌握RDD队列流的应用【例4-2读取RDD队列流创建DStreame首先创建一个RDD队列作为数据源,然后使用queueStream()方法创建DStream定义一介输入流inputstream,SparkStreaming每两秒从RDD队列中获取一批数据,最后输出RDD队列流中的数据,如图4/2所示.hadoopbogon$pyspark»>frompyspark.StreamingimportStreamingContext»>ssc=StreamingContext(sc,2)舱U建一个空的RDD队列»>rddQueue=|»>foriinrange(5):rddQueue.append(sc.parallelize(range(1.1001),10)跄J建DStream,定义输入流生成RDD队列流»>inputStream=ssc.queueStream(rddQueue)#打印RDD队列流中的数据»>inputStream.pprint()»>ssc.start()»>ssc.awaitTermination()»>inputstream.pprint()»>ssc.start()Time:2023-08-0314:35:5035678910Time:2023-08-0314:35:52234567810图4/2输出RDD队列流中的嫡二、高级数据源【教师】介绍高级数据源的类型和应用除了文件流、套接字流f口RDD队列流等基础数据源外,SparkStreaming还支持KaHa和Kinesis等高级数据源。SparkSlreaming可以让高级雌源产生的数据发送给应用程序,应用程序再对接收到的数据进行实时处理,从而完成一个典型的实时计算过程。1 .KafkaKatla最初由LinkedIn开发,是一种高性能、分布式的消息传递系统。它支持水平伸缩,可以通过添加更多的代理服务器来增加处理能力。此外,Kaki还具有许多特性,使得它在实时数据处理和大数据场景下的应用非常灵活.因此,KaRa被广泛应用于实时流处理、日志收集、大数据分析等领域.在Spark中,可以读取Kafka数据源,实现方法是先使用SparkSession对象的readStream属性返回DaIaSIreanIReader对象;然后使用该对象的formal。方法指定数据源类型为Kafka;接着使用OPtionO方法设置Kafka的相关选项,如Kafka服务器地址和端口(kafka.btstrap.servers),以及要订阅的主题名称(subscribe)等;最后使用IOadO方法加载流数据,并返回一个DataFramee参考示例如下。dstream=SparkSession.readStream.format("kafka").option("kafka.bootstrap.servers","kafka_serverl:port,kafka_server2:port.kafka-server3:port").oPtion("subscribe","IoPijname").load()2 .KinesisKinesis是一项托管的流式数据处理服务,它可用于实时收集、处理和分析大规模数据流。Kinesis可以帮助用户轻松地处理和存储来自各种数据源(如传感器、应用程序日志、网站点击流等)持续生成的数据。在Spark中,可以读取Kinesis数据源,实现方法与读取Kafka数据源类l以。不同的是,format。方法指定的数据源类型为Kinesis;oPtion()方法用于设置Kinesis的相关选项,如Kinesis流的名称、Kinesis数据流所在的AWS区域和Kinesis服务的终端URL参考示例如下。kinesis_df=SParkSeSSion.readSlream.fonat("kinesis").OptionCstreamName","stream_name").option("region",',region-name").option("endpointUrl","httpsz',).load()【学生】聆听、思考、理解、记录【教师】介绍“读取电影评分数据创建DStream”的大概流程,安排学生扫描微课二维码观看视频“读取电影评分数据创建DStream“(详见教材),并要求学生进行相应操作电影评分数据存放在"usrlocalSPark/mycode/DSiream/movie_daia.ixl”文件中,该文件中包含用户ID、电影名称和电影评分3个字段,如图4-13所示.打开(。)amovie_data保存=X;JIJusrocalsparkIIUSer1,我不是药神,9.0心©2,红海行动,7.8第63,烈火英雄,7.6”4,红海行动,7.7user%功夫,8.2心63,我不是药神,9.3user*南京哺京!,8.6USerl,筑影长城,7.7心©4,中国合伙人,8.2心2烈火英雄,7.8user4,西游记之大圣归来,8.0图4-13°movie_data.txtM文件的瘫(部分)课堂实践为实现更好的案例效果,我们先以电影评分数据为基础设置数据源的自动生成方式.具体实现方法是设置每隔】()秒从Mmovie_data.txt/,文件中随机荻取IO行数据并写入新的日志文件中,新生成的日志文件存放在新建的"usrlocalSParkmycodeDStream/movie”目录下。接下来,使用SparkStreaming监视,7usr/local/spark/mycode/DStream/niovieH目录,每隔10秒读取新产生的日志文件,并输出读取到的日志文件内容。1 .自动生成数据源打开PyCharm,在"DStream"目录下新建"MsgProducepy"文件,并在该文件中编写应用程序,实现自动生成数据源。实现步骤如下。步骤IA步骤I定义generate_log_file()函数.步骤2A在函数内部,定义两个变量log_file_directory和data_file_path,分别表示存储日志文件的目录和数据文件路径。步骤3A在函数内部,使用无限循环whileTrue不断生成日志文件。(详见教材)【参考代码】importtimeimportrandomdefgenerate_log_file():#存储日志文件的目录和数据文件路径logfiledirectory='7usrlocalsparkmycodeDStreammovie"data_file_path='7usr/local/spark/mycode/DStream/movie_data.txt"whileTrue:# 获取当前时间的时间戳timestamp=int(time.time()# 构建日志文件路径log_file_path=f'log_file_directory/log_timestamp.txt"# 打印时间戳print(timestamp)# 读取数据文件的所有行withopen(data_file_path,"r")asdata-file:lines=data-file.readlines()# 打乱列表中的元素顺序random.shuffle(lines)# 获取列表中的前10个元素SelectedJines=lines:10# 将获取到的数据写入日志文件withopen(log_file_path,"w")aslog-file:Iog_file.writelines(selectedjines)# 程序执行暂停】()秒time.sleep(10)if_name_="_main_":#调用generaieog_file。函数开始生成日志文件generate_log_file()【运行结果】在PyCharm中运行代码,控制台显示时间戳提示信息,如图4-14所示.,7usrlocalsparkmycodeDStreammovie,目录下生成包含电影评分数据的日志文件含10行数据,如图4-15所示。,每个日志文件中包169147558mycodeDStreammovieQ三=三×16910475681691047578169104758811log_log.log.log.log.169147598169104755169104756169104757169104758169104759169147688.txt8.txt8.txt8.txt8.txt1691047618169104762816Q10476389-o9-09-IOg-o9-1691047601691047611691047621691047631691047641691476488.txt8.txt8.txt8.t×t8.txt图4-14时间戳提示信息图4-15包含电影评分数据的日志文件(部分)2.读取电影评分数据创建DStream打开PyCharm,在"DSlream"目录下新建"MsgReadpy"文件,并在该文件中编写应用程序,监视“usrIOCalsparkmycodeDStream/movie”目录读取电影评分数据。实现步骤如下。步骤I创建SparkContext对象。步骤2A创建StreamingComext对象并将批处理时间间隔设置为10秒。步骤3A定义监视目录directory.(详见教材)【参考代码】frompysparkimportSparkContextfromPySPark.streamingimportStreamingContext船0建SparkContext对象sc=SparkContext(,local2","MovieStreamingApp")#创建StreamingContext对象并将处理间隔设置为IO秒ssc=StreamingContext(sc,10)#定义监视新文件的目录directory="file:/usr/local/spark/mycode/DStream/movie/"# 创建DStreamRljstream=ssc.textFileStream(directorj,)# 打印监视目录中新文件的数据file_stream.pprint()# 启动StreamingContext对象ssc.start()# 等待StreamingContext对象终止ssc.awaitTermination()【运行结果】在PyCharm中运行代码,控制台每10秒显示一次监视目录中新文件的数据,如图4-16Tiae:223-8-315:26:46心。2,夺冠,7.6269,飞第人生,7.58。06,飞第人生,7.3USer5,西游记之大圣归来,8.1USeM2,寻龙诀,7.7USerI7,康熙王朝,9.0USerl9,流浪地坳,7.1824,一代宗师,7.8USer34,中国合伙人,7.9USer36,大鱼海臬,8.1Tiee:2623-08-8315:26:56USer8,大鱼海案,8.4user33,三缺好人,8.USeM。,中国机长,8.4866,功夫,7.8USer8,西游记之大至归来,8.1USer24,Tt宗fli,7.8USerI,湄公河行动,6.8USerl7,大鱼海棠,8.31»566,飞驰人生,7.38。r3,看王别姬,9.6图4-16监视目录中新文件的数据【学生】自行扫码现看配套微课,按照要求进行操作,如遇问题可询问老师【教师】巡堂辅导,及时解决学生遇到的问题课堂小结【教师】简要总结本节课的要点基础数据源高级数据源【学生】总结回顾知识点作业布置教学反思【教师】布置课后作业(1)完成项目四项目实训中与本课相关的习题;(2)根据课堂知识,课后自己尝试读取数据创建DStreame完成课后任务

    注意事项

    本文(《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx)为本站会员(夺命阿水)主动上传,课桌文档仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知课桌文档(点击联系客服),我们立即给予删除!

    温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。




    备案号:宁ICP备20000045号-1

    经营许可证:宁B2-20210002

    宁公网安备 64010402000986号

    课桌文档
    收起
    展开