Hadoop大数据基础与应用(习题答案).docx
第1章Hadoop技术概述1 .Hadoop2.0包含哪些核心组件?MapReduce>HDFS、YARN2 .Hadoop包含哪些优势?方便、弹性、健壮、简单3 .Hadoop有哪些应用领域?运营商、电子商务、在线旅游、欺诈检测、医疗保健、能源开采、金融、直播、在线教育等等4 .Hadoop有几种运行模式?单机模式、伪分布模式、完全分布式模式5 .Had。P伪分布集群包含哪些守护进程?DataNOde、NodeManager>ResourceManager>SecondaryNameNodeNameNode第2章HadOOP分布式文件系统(HDFS)1 .简述HDFS的设计理念?HDFS的设计理念来源于非常朴素的思想:即当数据文件的大小超过单台计算机的存储能力时,就有必要将数据文件切分并存储到由若干台计算机组成的集群中,这些计算机通过网络进行连接,而HDFS作为一个抽象层架构在集群网络之上,对外提供统一的文件管理功能,对于用户来说就感觉像在操作一台计算机一样,根本感受不到HDFS底层的多台计算机,而且HDFS还能够很好地容忍节点故障且不丢失任何数据。2 .简述FSImage和EditLog的合并过程?FSlmage和EditLog合并的详细步骤如下所示。(1) SecondaryNameNode(即从元数据节点)引导NameNode(即元数据节点)滚动更新EditLog,并开始将新的EditLog写进edits.newo(2)SeCondaryNameNOde将NameNode的FSlmage(fsimage)和EditLOg(edits)复制到本地的检查点目录。(3) SeCOndaryNameNode将FSlmage(fsimage)导入内存,并回放EditLOg(edits),将其合并到FSImage(fsimage.ckpt),并将新的FSImage(fsimage.ckpt)压缩后写入磁盘。(4) SeCOndaryNameNode将新的FSlmage(fsimage.ckpt)传回NameNodeo(5) NameNOde在接收新的FSImage(fsimage.ckpt)后,将fsimage.ckpt替换为fsimage,然后直接加载和启用该文件。(6) NameNOde将新的EditLOg(即edits.new)更名为EditLog(即edits)<>默认情况下,该过程1小时发生一次,或者当EditLog达到默认值(如64MB)也会触发,具体控制参数可以通过配置文件进行修改。3.简述HDFS的数据读写流程?HDFS读取数据流程主要包括以下几个步骤。1 .客户端通过调用FiIeSystem对象的open()方法来打开希望读取的文件,对于HDFS来说,这个对象是DiStribUtedFiIeSyStem的一个实例。数据节点数据节点数据节点图2-4客户端读取HDFS中的数据2 .DistributedFiIeSystem通过RPC获得文件的笫一批块的位置信息(LoCatiOns),同一个块按照重复数会返回多个位置信息,这些位置信息按照HadOoP拓扑结构排序,距离客户端近的排在前面。3 .前两步会返回一个文件系统数据输入流(FSDatalnPUtStream)对象,该对象会被封装为分布式文件系统输入流(DFSInputStream)对象,DFSInputStream可以方便地管理DataNode和NameNode数据流。客户端调用read()方法,DFSInputStream会找出离客户端最近的DataNode并连接O4 .数据从DataNode源源不断地流向客户端。5 .如果第一个块的数据读完了,就会关闭指向第一个块的DataNode的连接,接着读取下一个块。这些操作对客户端来说是透明的,从客户端的角度来看只是在读一个持续不断的数据流。6 .如果笫一批块全部读完了,DFSInputStream就会去NameNode拿下一批块的位置信息,然后继续读。如果所有的块都读完了,这时就会关闭所有的流。HDFS的写入数据流程主要包括以下几个步躲。1.客户端通过调用DistributedFiIeSystem的Createo方法创建新文件。数据节点数据节点数据节点图2-5HDFS的写数据流程Z-DistributedFiIeSystem通过RPC调用NameNode去创建一个没有块关联的新文件。在文件创建之前,NameNode会做各种校验,比如文件是否存在,客户端有无权限去创建等。如果校验通过,NameNode就会创建新文件,否则就会抛出I/O异常。3.前两步结束后,会返回文件系统数据输出流(FSDataOUtPUtStream)的对象,与读文件的时候相似,FSDataOutputStream被封装成分布式文件系统数据输出流(DFSOUtPlJtStream),DFSOutputStream可以协调NameNOde和DataNOde。客户端开始写数据到DFSOutputStream,DFSOutputStream会把数据切成一个个小的数据包(PaCket),然后排成数据队列(dataquene)4.接下来,数据队列中的数据包首先传输到数据管道(多个数据节点组成数据管道)中的第一个DataNode中(写数据包),第一个DataNode又把数据包发送到第二个DataNode中,依次类推。SQFSOutputStream还维护着一个响应队列(ackquene),这个队列也是由数据包组成,用于等待DataNode收到数据后返回响应数据包,当数据管道中的所有DataNode都表示已经收到响应信息的时候,这时ackquene才会把对应的数据包移除掉。6 .客户端写数据完成后,会调用dose()方法关闭写入流。7 .客户端通知NameNode把文件标记为己完成,然后NameNode把文件写成功的结果反馈给客户端。此时就表示客户端己完成了整个HDFS的写数据流程。4 .简述HDFS的副本存储策略?新版本的副本存放策略的基本思想如下。副本1存放在Client所在的节点上(假设Client不在集群的范围内,则第一个副本存储节点是随机选取的,当然系统会尝试不选择那些太满或者太忙的节点)。副本2存放在与第一个节点不同机架中的一个节点中(随机选择)。副本3和副本2在同一个机架,随机放在不同的节点中。假设还有很多其他的副本,那么剩余的副本就随机放在集群的各个节点中。5 .简述HDFS的高可用原理?HDFS集群中通常由2台独立的机器来配置NameNode角色,无论在任何时候,集群中只能有一个NameNode是Active状态,而另一个NameNode是Standby状态。Active状态的NameNode作为主节点负责集群中所有客户端操作,Standby状态的NameNOde仅仅扮演一个备用节点的角色,以便于在ActiveNameNode挂掉时能第一时间接替它的工作成为主节点,从而使得NameNode达到一个热备份的效果。为了让主备NameNode的元数据保持一致,它们之间的数据同步通过JournaINode集群完成。当任何修改操作在主NameNode上执行时,它会将EditLog写到半数以上的JoUrnalNode节点中。当备用NameNode监测到JournaINode集群中的EditLog发生变化时,它会读取JournaINode集群中的EditLog,然后同步到FSImage中。当发生故障造成主NameNode宕机后,备用NameNode在选举成为主NameNOde之前会同步JC)UrnaINode集群中所有的EditLog,这样就能保证主备NameNode的FSImage一致。新的ActiveNameNode会无缝接替主节点的职责,维护来自客户端的请求并接受来自DataNOde汇报的块信息,从而使得NameNode达到高可用的目的。为了实现主备NameNode故障自动切换,通过ZKFC对NameNode的主备切换进行总体控制。每台运行NameNOde的机器上都会运行一个ZKFC进程,ZKFC会定期检测NameNode的健康状况。当ZKFC检测到当前主NameNode发生故障时,会借助Zookeeper集群实现主备选举,并自动将备用NameNode切换为Active状态,从而接替主节点的工作对外提供服务。第3章HadOOP资源管理系统(YARN)1 .简述YARN解决了哪些问题?YARN解决了MaPRedUCeLo扩展性差、资源利用率低、通用性差、单点故障问题。2 .简述YARN的基本架构与工作原理?YARN主要是由资源管理器(ReSOUrCeManager),节点管理器(NodeManager)、应用程序管理器(AppIicationMaster)和相应的容器(Container)构成的。YARN的详细工作原理如下所示。(1)客户端(Client)向ResourceManager提交一个作业,作业包括AppIicationMaster程序、启动AppIicationMaster的程序和用户程序(如M叩RedUCe)。(2) ReSoUrCeManager会为该应用程序分配一个Container,它首先会跟NOdeManager进行通信,要求它在这个容器中启动应用程序的AppIicationMastero(3) AppIicationMaster一旦启动以后,它首先会向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源并监控它们的运行状态,直到任务运行结束。它会以轮询的方式通过RPC协议向ResourceManager申请和领取资源,一旦APPliCatiOnMaSter申请到资源,它会与NOdeManager进行通信,要求它启动并运行任务。(4)各个任务通过RPC协议向APPIiCatiOnMaSter汇报自己的状态和进度,这样会让AppIicationMaster随时掌握各个任务的运行状态,一旦任务运行失败,AppIicationMaster就会重启该任务,重新申请资源。应用程序运行完成后,AppIicationMaster就会向ResourceManager注销并关闭。在应用程序整个运行过程中,可以通过RPC协议向AppIicationMaster查询应用程序当前的运行状态,当然在YARN的Web界面也可以看到整个作业的运行状态。3 .简述YARN是如何实现容错的?YARN通过以下几个方面来保障容错性。I-ResourceManager的容错性保障ReSoUrCeManager存在单点故障,但是可以通过配置实现ResourceManager的HA(高可用),当主节点出现故障时,可以切换到备用节点继续对外提供服务。2.NodeManager的容错性保障NodeManager失败之后,ResourceManager会将失败的任务通知对应的AppIicationMaster,由AppIicationMaster来决定如何去处理失败的任务。S-AppIicationMaster的容错性保障APPIiCationMaSter失败后,由ReSoUrCeManager负责重启即可。其中,AppIicationMaster需要处理内部任务的容错问题。ResourceManager会保存已经运行的任务,重启后无须重新运行。4 .简述YARN的高可用原理?ResourceManagerHA由-对ACtiVe,Standby结点构成,ReSoUrCeManager它有个基于Zookeeper的选举算法,来决定哪个ResourceManager是active状态,哪个ResourceManager处于Standby状态。ZKFC是ReSoUrCeManager进程的一个服务,非独立存在,跟HDFS中的不太一样,而HDFS中Zkfc作为一个独立的进程存在(a)监控ResourceManager的健康状态(b)向ZK定期发送心跳。ResourceManager是通过RMStateStore存储内部数据和主要应用的数据及标记,目前支持的可替代的RMStateStore实现有:(a)基于内存的MemoryRMStateStore,(b)基于文件系统的FiIeSystemRMStateStore,(c)STzookeeperfiZKRMStateStoreo两个ResourceManager的数据共享通过RMStateStore来实现,保持信息状态的致当Active状态ResourceManager关掉了,它会将另外一个ResourceManager状态变为Active,并提供服务。ResourceManagerHA的架构模式同NameNodeHA的架构模式基本一致。5 .简述YARN的各种调度器原理及使用场景?先进先出调度器:将应用放置在一个队列中,然后按照提交的顺序(先进先出)运行应用。首先为队列中第一个应用的请求分配资源,第一个应用的请求被满足后再依次为队列中下一个应用服务。容量调度器:不会通过强行中止来抢占容器(container),因此,如果一个队列一开始资源够用,然后随着需求增长,资源开始不够用时,那么这个队列就只能等着其他队列释放容器资源。缓解这种情况的方法是,为队列设置一个最大容量限制,这样这个队列就不会过多侵占其他队列的容量了。当然,这样做是以牺牲队列弹性为代价的,因此需要在不断尝试和失败中找到一个合理的折中。公平调度器:假设有两个用户A和B,分别拥有自己的队列queueA和queueB。A启动一个作业jobl,在B没有需求时A会分配到全部可用资源;当A的作业仍在运行时B启动一个作业job2,一段时间后,按照我们先前看到的方式,每个作业都用到了一半的集群资源。这时,如果B启动第二个作业job3且其他作业仍在运行,那么job3和job2共享资源,因此B的每个作业将占用四分之一的集群资源,而A仍继续占用一半的集群资源。最终的结果就是资源在用户之间实现了公平共享。总的来说,如果应用场景需要先提交的Job先执行,那么就使用FIFoScheduler;如果所有的Job都有机会获得到资源,就得使用CapacitySChedUler和FairSChedUler,CapacityScheduler不足的地方就是多个队列资源不能相互抢占,每个队列会提前分走资源,即使队列中没有Job,所以一般情况下都选择使用FairSChedUler;FIFOSChedUIer一般不会单独用,公平调度支持在某个队列内部选择FairSChedUler还是FIFOSChedUIer,可以认为FairSChedUler是一个混合的调度器。第4章HadOoP分布式计算框架(M叩RedUCe)1 .简述MapReduce的基本设计思想?分而治之、抽象成模型、上升到架构2 .简述M叩RedUCe的优缺点?(1)有点M叩RediJCe易于编程、良好的扩展性、高容错性、适合PB级以上数据集的离线处理(2)缺点不适合实时计算、不适合流式计算、不适合DAG(有向无环图)计算3 .简述MapReduce的ShUffIe过程?M叩RedUCe的ShUffIe过程如下。(1) map端map任务开始输出中间结果时,并不是直接写入磁盘,而是利用缓冲的方式写入内存,并出于效率的考虑对输出结果进行预排序。每个map任务都有一个环形内存缓冲区,用于存储任务输出结果。默认情况下,缓冲区的大小为100MB,这个值可以通过mapreduce.task.io.sort.mb属性来设置。一旦缓冲区中的数据达到阈值(默认为缓冲区大小的80%),后台线程就开始将数据刷到磁盘。在数据刷写磁盘过程中,m叩任务的输出将继续写到缓冲区,但是如果在此期间缓冲区被写满了,那么m叩会被阻塞,直到写磁盘过程完成为止。在缓冲区数据刷写磁盘之前,后台线程首先会根据数据被发送到的reducer个数,将数据划分成不同的分区(partition)。在每个分区中,后台线程按照key在内存中进行排序,如果此时有一个combiner函数,它会在排序后的输出上运行。运行8mbiner函数可以减少写到磁盘和传递到reducer的数据量。每次内存缓冲区达到溢出阈值,就会刷写一个溢出文件,当map任务输出最后一条记录之后会有多个溢出文件。在m叩任务完成之前,溢出文件被合并成一个己分区且已排序的输出文件。默认如果至少存在3个溢出文件,那么输出文件写到磁盘之前会再次运行COmbiner。如果少于3个溢出文件,那么不会运行combiner,因为map输出规模太小不值得调用Combiner带来的开销。在map输出写到磁盘的过程中,还可以对输出数据进行压缩,加快磁盘写入速度,节约磁盘空间,同时也减少了发送给reducer的数据量。(2) reduce端map输出文件位于运行map任务的NodeManager的本地磁盘,现在NodeManager需要为分区文件运行reduce任务,而且reduce任务需要集群上若干个map任务的m叩输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。这就是reduce任务的更制阶段。默认情况下,reduce任务有5个更制线程,因此可以并行获取map输出。如果map输出结果比较小,数据会被复制到reduce任务的JVM内存中,否则,m叩输出会被复制到磁盘中。一旦内存缓冲区达到阈值大小,数据合并后会刷写到磁盘。如果指定了combiner,在合并期间可以运行COmbiner,从而减少写入磁盘的数据量。随着磁盘上的溢出文件增多,后台线程会将它们合并为更大的、已排序的文件,这样可以为后续的合并节省时间。复制完所有m叩输出后,reduce任务进入排序阶段,这个阶段将m叩输出进行合并,保持其顺序排序。这个过程是循环进行的:比如,如果有50个map输出,默认合并因子为10,那么需要进行5次合并,每次将10个文件合并为一个大文件,因此最后有5个中间文件。在最后的reduce阶段,直接把数据输入reduce函数,从而节省了一次磁盘往返过程。因为最后一次合并,并没有将这5个中间文件合并成一个已排序的大文件,而是直接合并到reduce作为数据输入。在reduce阶段,对已排序数据中的每个key调用reduce函数进行处理,其输出结果直接写出到文件系统,这里一般为HDFS。4 .简述MapReduce的运行机制?M叩RedUCe作业完整的运行过程如下。(1)提交MaPRedUCe作业客户端通过调用Job对象的WaitFOrCOmPIetiOn()方法来提交MapReduce作业。(2)初始化MaPRedUCe作业applicationmaster会对MapReduce作业进行初始化。(3)分配任务applicationmaster为作业中的map任务和reduce任务向ResourceManager请求容器。(4)执行任务ResourceManager的调度器为任务分配容器,执行map任务和reduce任务。(5)更新进度和状态任务通过这个接口向自己的applicationmaster报告进度和状态。(6)完成作业当applicationmaster收到作业最后一个任务己完成的通知后,就会将作业的状态设置为“成功二5 .简述MapReduce作业容错机制在作业运行过程中,我们需要考虑的容错实体包括:任务、applicationmasterNodeManager和ResourceManagero(1)任务容错当applicationmaster被告知一个任务尝试失败后,它将重新调度该任务的执行。(2) applicationmaster容错applicationmaster向ResourceManager发送周期性的心跳,当applicationmaster失败时,ResourceManager将检测到该失败,并在一个新的容器中重新启动一个applicationmaster实例。(3) NOdemanager容错对于出现故障的NodeManager节点,那么曾经在其上运行且成功完成的map任务,如果属于未完成的作业,那么applicationmaster会安排它们重新运行。(4) ResourceManagerISResourceManager出现故障是比较严重的,因为没有ResourceManager,作业和任务容器将无法启动。为了实现高可用(HA),有必要以一种active-standby配置模式运行一对ResourceManagero第5章Zookeeper分布式协调服务1 .Zookeeper的应用场景有哪些?分布式锁服务、配置管理服务、分布式消息队列、分布式通知与协调服务等。2 .简述Zookeeper监听机制?znode以某种方式发生变化时,观察(WatCher)机制(观察机制:一个WatCher事件是一个一次性的触发器,当被设置了WatCher的Znode发生了改变时,服务器将这个改变发送给设置了WatCher的客户端,以便通知它们)可以让客户端得到通知。可以针对Zookeeper服务的操作来设置观察,该服务的其他操作可以触发观察。在Zookeeper中,引入了WatCher机制来实现分布式的通知功能。Zookeeper允许客户端向服务端注册一个watcher监视器,当服务端的一些特定事件触发了这个watcher监视器之后,就会向指定客户端发送一个异步事件通知来实现分布式的通知功能。这种机制称为注册与异步通知机制。3 .Zookeeper包含哪些特性?最终一致性、可靠性、实时性、等待无关、原子性、顺序性。4 .Zookeeper有L部署方式?单机模式、伪分布式模式、分布式模式5 .简述Zookeeper工作原理?Zookeeper的核心就是原子广播,该原子广播就是对Zookeeper集群上所有主机发送数据包,通过这个机制保证了各个服务端之间的数据同步。那么实现这个机制在ZookeePer中有一个内部协议,这个协议有两种模式,一种是恢复模式,一种是广播模式。当服务刚启动或者主节点崩溃后,这个协议就进入了恢复模式,当主节点再次被选举出来,且大多数服务端完成了和主节点的状态同步以后,恢复模式就结束了,状态同步保证了主节点和服务端具有相同的系统状态。一旦主节点已经和多数的从节点(也就是服务端)进行了状态同步后,它就可以开始广播消息即进入广播状态。在广播模式下,服务端会接受客户端请求,所有的写请求都被转发给主节点,再由主节点发送广播给从节点。当半数以上的从节点完成数据写请求之后,主节点才会提交这个更新,然后客户端才会收到一个更新成功的响应。第6章Hadoop分布式集群搭建与管理1 .Hadoop包含几种运行模式?单机模式、伪分布模式、分布式模式2 .Hadoop集群包含哪些守护进程?其作用是什么?NameNode:管理文件系统的命名空间,维护着整个文件系统的目录树以及目录树中的所有子目录和文件。DataNode:在NameNode的指导下完成数据的1/0操作。ZKFailoverControIIer:对NameNode的主备切换进行总体控制。JournaINode:存储和管理EditLOg日志。ResourceManager:负责整个系统的资源管理和调度。NodeManager:是整个作业运行的一个执行者,是每个节点上的资源和任务管理器。3 .Hadoop集群如何利用ZOOkeePer实现高可用?Hadoop集群包含HDFS集群和YARN集群。HDFS中的NameNode存在单点故障,通过NameNode的主备为了实现高HDFS的高可用,NameNode的主备自动切换又利用了Zookeeper的锁机制。YARN中的ResourceManager也是同样的道理。4 .Hadoop集群各角色的启动JlI好?HDFS集群启动顺序:NameNode一DataNode一JournalNodeZKFailoverControIIerYARN集群启动顺序:ResourceManager一NodeManager5 .如何在Hadoop集群中动态地增加或删除节点?简单的说,HadooP集群扩容只需要新增节点纳入include文件进行管理即可,HadOOP集群缩容只需要删除节点纳入exclude文件进行管理即可。第7章HiVe数据仓库工具1 .简述Hive和关系型数据库的异同?Hive和关系型数据库都有数据库和表的概念,都可以使用SQL来处理数据。但是Hive中的表是逻辑表不是物理表,数据存储在HDFS之上,作业的运行依赖YARN集群,可以基于Hadoop集群处理海量数据。2 .简述Hive的运行机制?Hive的运行机制包含以下几个步骤:(1)用户通过用户接口连接HiVe,发布HQL。(2) HiVe解析查询并制定查询计划。(3) HiVe将查询转换成MaPRedUCe作业。(4) Hive在Hadoop上执行MapReduce作业。3 .简述Hive内部表和外部表的区别与使用?(1)区别删除HiVe内部表时,表结构和数据都会被删除。删除HiVe外部表时,只删除表结构,数据不会被删除。(2)使用场景根据实际工作经验,一般会遵循一个经验法则:如果所有数据处理都由HiVe来完成,应该选择使用内部表。如果同一个数据集需要由HiVe和其他工具同时来处理,应该选择使用外部表。一般的做法是将存放在HDFS中的初始数据集使用外部表进行处理,然后使用Hive的转换操作将数据移到Hive的内部表。4 .简述ORDERBY和SORTBY在使用上的区别和联系?(1)联系orderby和sortby都可以对数据进行排序。(2)区别orderby排序出来的数据是全局有序的,但是只能有一个PartitiOrlsortby排序出来的数据是局部有序的,但是全局无序。即partition内部是有序的,但是partition与partition之间的数据时没有顺序关系的。5 .简述Hive性能调优的常见手段?FetChTaSk的优化、本地模式执行的优化、JVM的优化、task并行度优化等等第8章HBaSe分布式数据库根据下面给出的表格,用HBaSeSheIl模式设计SCore成绩表,并对表进行操作。Score成绩表namescoreChineseMathEnglishLucy959086Lily907688Jack8592781 .查看Score表结构。# 创建表create,Score,z'score,# 插入数据put'Score','Lucy','score:ChineSe','95,put'Score','Lucy','score:Math','90'put'Score','Lucy','score:EngIiSh','86'put'Score','Lily','score:ChineSe','90'put,Score',LilyscorerMath76'put'Score','Lily','score:EngIiSh','88'put'Score','Jack','score:ChineSe','85'put'Score7Jack,'scorerMath,'92'put'Score','Jack','score:EngliSh','78'# 查询表结构describe'Score'2 .查询JaCk同学的ChineSe成绩。get,Score'Jack,scoreChinese,3 .将Lucy同学的English成绩修改为90分。put'Score',Lucy,z,scoreEnglish790'第9章Had。P生态圈其他常用开发技术1 .如何提高Sqoop导入导出的并发度?SqoOP通过参数-m可以指定作业的并发度。2 .Flume如何保证数据不丢失?Flume中的Event在系统流动过程中,通过事务的方式保证不丢失。EVent在Flume节点存储时,可以选择type的值为file将数据持久化磁盘确保数据不丢失。3 .Kafka的Partition为什么需要副本?Kafka中的一个Topic可以包含多个Partition,为了防止Kafka集群节点宕机或者磁盘损坏,Partition需要副本机制来实现容错,确保在硬件故障的情况下数据不丢失。4 .简述Kafka的优势?Kafka具备高吞吐量、低延迟、持久性、可靠性、容错性和高并发的特点和优势。5 .Spark和FIink编程题目假设日志数据如下所示,格式为:网站ID、访客ID、访问时间Sitel,USerl,2021-10-2002:12:22sitel,user2,2021-10-2804:41:23sitelzuser3,2021-10-2011:46:32sitelzuser3z2021-10-2311:02:11site2zuser4,2021-10-2015:25:22site3,user5z2021-10-2908:32:54site3,user6z2021-10-2208:08:26site4,user7,2021-10-2010:35:37site4,user7,2021-10-2411:54:58现在要对近7天的日志进行统计,统计结果格式如下。Key:(Date(日期),Hour(时段),Site(网站)。ValUe:(PV(访问次数),UV(独立访问人数,相同访客id去重)。分别使用Spark和Flink编写执行代码,并将统计结果保存到HBase数据库。Spark示例代码:packagecom.bigdataimportorg.apache.spark.SparkCotSparkContextJobjectSparkTestdefmain(args:ArrayString):Unit=获取SparkContextvalconf=newSparkConf().setAppName("SparkTest").setMaster("local2")valsc=newSparkContext(conf)读取日志文件valrdd=sc.textFile("G:learndatauser.log").map(t=>解析数据valarray=t.split("7')valday=array(2).split("s+")(0)valhour=array(2).split("s+")(l).substring(O,2)(array(0)zarray(l),day,hour)/.filter(t=>/过滤出最近7天的数据/TimeUtils.getDiffDay(currentDayzt.-3)<7/)统计PV访问次数valPVRDD=rdd.groupBy(t=>(t._3,t._4,t._l).map(t=>(t._l,t._2.size)print(pvRDD.collect().toBuffer)插入HBase省略统计UV独立访问人数.先按照用户去重,然后再聚合统计valuvRDDrdd.groupBy(t=>t).map(t=>(t.1.3zt14,t.1.1)2.size).reduceByKey(+)print(uvRDD.collect().toBuffer)插入HBase省略sc.stop();)Flink示例代码:packagecom.bigdata;mon.functions.FilterFunction;mon.functions.FlatMapFunction;mon.functions.MapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.util.Collector;publicclassFIinkTestpublicstaticvoidmain(Stringargs)throwsException获取ExecutionEnvironmentExecutionEnvironmentbenv=ExecutionEnvironment.getExecutionEnvironment();读取数据DataSet<String>ds=benv.readTextFile("G:learndatauser.log");统计pv访问次数DataSet<Tuple2<String,lnteger>>pvDS=ds.map(newMapFunctiorKStringzTuple2<StringJnteger>>()OverridepublicTuple2<StringJnteger>map(Strings)throwsExceptionStringarray=s.split("J);Stringday=array2.split("s+")0;Stringhour=array2.split(',s+")lsubstring(0,2);returnnewTuple2(day+""+hour+""+array0zl);).filter(newFilterFunction<Tuple2<String,lnteger>>()0VerTidepublicbooleanfilter(Tuple2<String,lnteger>t)throwsException/过滤出最近7天的数据/TimeUtils.getDiffDay(currentDay,t.-3)<7returntrue;).groupBy(0).sum(l);pvDS.print();插入HBaSe省略System.out.println("");统计UV独立访问人数.先按用户去重,然后再聚合统计DataSet<Tuple2<String,lnteger>>uvlDS=ds.map(newMapFunction<StringzTuple2<StringJnteger>>()OverridepublicTuple2<StringJnteger>m