大数据基础平台及其框架

前言

 通过了这一段时间的学习,趁着新的一年,整理了一下自己的学习笔记以及理解,将这些内容发到自己的博客上,希望能对新入门的同学有所帮助,也希望有老司机能够指出理解不到位的地方,帮助我们我以及其他新司机一起不断进步。

Hadoop基础

首先说下Hadoop的特点:

  • 扩容能力(Scalable):能可靠地(reliably)存储和处理千兆字节(PB)数据。
  • 成本低(Economical):可以通过普通机器组成的服务器群来分发以及处理数据。这些服务器群总计可达数千个节点。
  • 高效率(Efficient):通过分发数据,hadoop可以在数据所在的节点上并行地(parallel)处理它们,这使得处理非常的快速。
  • 可靠性(Reliable):hadoop能自动地维护数据的多份副本,并且在任务失败后能自动地重新部署(redeploy)计算任务。

hadoop1.0与hadoop2.0的对比:

had

可以从图中看出,Hadoop1.0时代,MapReduce的功能很复杂,既要负责集群的资源管理又要处理数据,耦合度很高,到了2.0时代,Hadoop框架将原先的MapReduce的庞大功能模块细分出来,让YARN来管理集群的资源调度,MapReduce就专注与处理数据逻辑,这样不仅仅解耦合,YARN还支持结合其他的数据处理组件,大大增强了框架的可复用性。   

  其他什么介绍之类虚的东西就不说了,直接进入正题,首先是Hadoop平台的搭建,创建的用户名是hadoop:

  1. 准备Linux环境:

  因为没有条件组成真正的分布式集群,所以只有用虚拟机来实现伪分布式集群的搭建。这里我使用的虚拟机是VMware,Linux是CentOS6.5发行版。安装Linux虚拟机就不赘述了,这里说一下虚拟机连接外面实体机网络的设置,首先到VM这软件的虚拟网络编辑器中设置VMnet8的子网IP和子网掩码,桥接方式我这边选的是NAT模式,如下图所示:

h1

然后回到Windows下,打开网络和共享中心 -> 更改适配器设置,如下图所示:

h2

  这样就设置好虚拟机联网问题了。emmm…. 刚好有NAT模式为啥这样配置的原理图,这里贴上来,但是就不解释说明了,大家应该都懂!!

y1

  1. 修改虚拟机主机名:

  vim /etc/sysconfig/network

  NETWORKING=yes

  HOSTNAME=hdp-node-01 ### 这里修改主机名,永久修改,重启后生效

  1. 修改IP:

  因为要搭建集群,所以IP不能使用动态获取方式,而要设置为静态IP。这里有两种修改方式:

  第一种:通过Linux图形界面进行修改

  进入Linux图形界面 -> 右键点击右上方的两个小电脑 -> 点击Edit connections -> 选中当前网络System eth0 -> 点击edit按钮 -> 选择IPv4 -> method选择为manual -> 点击add按钮 -> 添加IP:192.168.33.101 子网掩码:255.255.255.0 网关:192.168.33.1 -> apply

  第二种:修改配置文件方式

  vim /etc/sysconfig/network-scripts/ifcfg-eth0

  BOOTPROTO=”static” ##这里要改成静态方式

  IPADDR=”192.168.3.101”

  NETMASK=”255.255.255.0”

  GATEWAY=”192.168.33.1”

以上列出的是要修改的,其他可以不去动它。如果发现没有ifcfg-eth0 配置文件的话,原因是centos6改用NetworkManager方式管理网络了,可以运行如下命令进行确认:

  chkconfig --list | grep -i netw

结果如下:

  NetworkManager 0:off 1:off 2:on 3:on 4:on 5:on 6:off //不同级别登陆的状态

  network 0:off 1:off 2:off 3:off 4:off 5:off 6:off

  这里可以看到,NetworkManager是开机启动状态,network是关闭状态。解决办法就是关闭NetworkManager,用传统的network方式来管理网络,并补充上ifcfg-eth0文件即可

  修复步骤如下:

  #关闭NetworkManager服务:$ service NetworkManager stop

  #关闭NetworkManager开机启动:$ chkconfig NetworkManager off

  #添加 /etc/sysconfig/network-scripts/ifcfg-eth0 文件

  DEVICE=eth0 #描述网卡对应的设备别名

  BOOTPROTO=static #设置网卡获得IP地址方式,可选的选项为static、DHCP或bootp,分别对应静态指定的IP地址,通过DHCP协议获得IP地址,通过bootp协议获得IP地址

  IPADDR=146.175.139.13 #IP地址

  NETMASK=255.255.255.0 #网卡对应的网络掩码

  GATEWAY=146.175.139.255 #网关IP

  HWADDR=00:25:90:81:5e:64 #MAC地址

  NM_CONTROLLED=no #表示该接口将通过该配置文件进行设置,而不是通过网络管理器进行管理

  ONBOOT=yes #表示系统将在启动时开启该接口

  TYPE=Ethernet

  IPV6INIT=no

  注:文件内容的值根据实际情况修改

  #开机启动network:$ chkconfig network on

  #开启network服务:

    service network start

    service network restart

这样就可以生效了

  1. 修改主机名和IP的映射关系

  192.168.33.101 hdp-node-01

  192.168.33.102 hdp-node-02

  192.168.33.103 hdp-node-03

  这里我配置了三台虚拟机,所以一次性全写上了。

  1. 关闭防火墙

  #查看防火墙状态:service iptables status

  #关闭防火墙:service iptables stop

  #查看防火墙开机启动状态:chkconfig iptables –list

  #关闭防火墙开机启动:chkconfig iptables off

可以使用 setup 命令来启动文字模式配置实用程序,来关闭防火墙

  1. 给普通用户添加sudo权限

  一般不介意直接使用root用户来进行集群上的操作,所以需要创建一个普通用户,如hadoop用户,给其分配权限。步骤如下:

  #进入root用户模式:su -

  #添加用户:adduser hadoop

  #修改密码:passwd hadoop

  #添加sudoers文件写权限:[root@localhost ~]# chmod u+w /etc/sudoers

  #添加hadoop用户到sudoers文件里:在”root ALL=(ALL) ALL”这行下添加”hadoop ALL=(ALL) ALL”.

[root@localhost ~]# vim /etc/sudoers
.........................
.........................
root    ALL=(ALL)   ALL
hadoop    ALL=(ALL)   ALL   <-----添加到这里. :wq保存退出
.........................
.........................

  #撤销sudoers写权限,记得撤销写权限:[root@localhost ~]# chmod u-w /etc/sudoers

  1. 配置SSH免密登陆:

  因为集群需要从master机器自动帮我们从slaves机器中启动东西,所以需要配置一下SSH免密登陆。

#生成ssh免登陆密钥
#进入到我的home目录
cd ~/.ssh

ssh-keygen -t rsa (四个回车,都表示默认)
执行完这个命令后,会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
将公钥拷贝到要免登陆的机器上
ssh-copy-id localhost

  网上找了一个免密登陆的机制原理图,感觉画的不错:

SSH

  1. 安装JDK:

  #先将jdk安装包上传到虚拟机中,上传工具使用xshell附带的xftp就行

  #解压jdk

    · 创建文件夹:mkdir /home/hadoop/apps

    · 解压:tar -zxvf jdk1.8.0_144 -C /home/hadoop/apps

  #将Java添加到环境变量中

vim /etc/profile

#在文件最后添加
export JAVA_HOME=/home/hadoop/apps/jdk1.8.0_144
export PATH=$PATH:$JAVA_HOME/bin

#刷新配置文件,使环境变量生效
. /etc/profile
  1. 安装hadoop:

  先上传hadoop的安装包到服务器上去

  #配置hadoop的环境变量

vi /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_144
export HADOOP_HOME=/itcast/hadoop-2.8.1
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

#刷新配置文件
. /etc/profile

  伪分布式需要修改5个配置文件($HADOOP_HOME/etc/hadoop/下)

  #第一个:hadoop-env.sh

vi hadoop-env.sh
#添加Java的环境变量路径
export JAVA_HOME=${JAVA_HOME}

  #第二个:core-site.xml

<!-- 指定HADOOP所使用的文件系统schema(URI),HDFS的老大(NameNode)的地址 -->
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://weekend-1206-01:9000</value>
</property>
<!-- 指定hadoop运行时产生文件的存储目录 -->
<property>
    <name>hadoop.tmp.dir</name>
    <value>/home/hadoop/hadoop-2.4.1/tmp</value>
</property>

  #第三个:hdfs-site.xml(mv hdfs-default.xml hdfs-site.xml)

<!-- 指定HDFS副本的数量 -->
<property>
    <name>dfs.replication</name>
    <value>3</value>
</property>
<!-- 指定集群文件存储路径 -->
<property>
    <name>dfs.name.dir</name>
    <value>/home/hadoop/apps/hadoop-2.4.1/ClusterData</value>
</property>

  #第四个:mapred-site.xml(mv mapred-site.xml.template mapred-site.xml)

<!-- 指定mr运行在yarn上 -->
<property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
</property>

  #第五个:yarn-site.xml

<!-- 指定YARN的老大(ResourceManager)的地址 -->
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>hdp-node-01</value>
</property>
<!-- reducer获取数据的方式 -->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>

  最后在修改一下 slaves 文件,这个文件是表示hadoop集群中从结点的机器是哪些,master机器启动时会查看此文件,依次帮我们去启动这些从节点作为datanode,只要在这个文件里面加入需要作为datanode的主机名即可,如:

vi slaves

hdp-node-01
hdp-node-02
hdp-node-03

  在master上配置好后,需要将hadoop复制到其他结点的机器上,这样就省的再配置一次了:scp -r hadoop-2.8.1 hadoop@hdp-node-02:/home/hadoop/apps

  配置好后,首次启动需要格式化namenode,对namenode进行初始化:hdfs namenode -format

  初始化成功后,启动hadoop:

#启动HDFS
start-dfs.sh

#启动YARN
start-yarn.sh

  验证是否成功:使用 jps 命令验证,如果有NameNode、Jps、SecondaryNameNode、NodeManager、ResourceManager、DataNode这些进程,就说明启动成功。可以通过 http://192.168.33.101:50070 (HDFS管理界面)和 http://192.168.33.101:8088 (MR管理界面)访问UI管理界面,其中50070和8088是默认端口号。

HDFS Shell

#查看帮助
hadoop fs -help <cmd>

#上传
hadoop fs -put <linux上文件> <hdfs上的文件>

#查看文件内容
hadoop fs -cat <hdfs上的路径>

#下载文件
hadoop fs -get <hdfs上的路径> <linux上文件>

#HDFS Shell 基本使用和linux指令类似,这里就不过多赘述了

hadoop核心

  • HDFS:Hadoop Distributed File System 分布式文件系统
  • YARN:Yet Another Resource Negotiator 资源管理调度系统
  • MapReduce:分布式运算框架

HDFS

HDFS的架构:

HDFS

  • 主从结构

  主节点:NameNode/Secondary NameNode

  从节点,有很多个:datanode

  • namenode负责:

  接收用户操作请求

  维护文件系统的目录结构

  管理文件与block之间关系,block与datanode之间关系

  • datanode负责:

  存储文件

  文件被分成block存储在磁盘上

  为保证数据安全,文件会有多个副本

DataNode

  • datanode是提供真实文件数据的存储服务
  • 文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128M,以一个256MB文件为例,这个文件共有 256/128=2个Block。可以在dfs.block.size属性中设置

  存储DataBlock的机器叫做DataNodes。在读写过程中,DataNodes负责直接把用户读取的文件block传给client,也负责直接接收用户写的文件。

NameNode

  • NameNode是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的元信息和每个文件对应的数据块列表。接收用户的操作请求。
  • 文件包括:

  1.fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息

  2.edits:操作日志文件

  3.fstime:保存最近一次checkpoint的时间

以上数据保存在服务器的文件系统中(也就是hdfs-site.xml的dfs.name.dir所设置的路径下)

  • NameNode元数据管理机制

meta

namenode中会记录一个文件在HDFS上的名称,在集群上有几份,分别在哪几台DataNode上等信息。在HDFS里,NameNode保存了整个文件系统信息,包括文件和文件夹的结构,与Linux文件系统很类似,HDFS把文件和文件夹表示为inode,每个inode都有自己的所有者,权限,创建和修改时间等等信息。HDFS可以存很大的文件,所以每个文件都被分成一些DataBlock,存在不同机器上,NameNode就负责记录一个文件有哪些DataBlock,以及这些DataBlock分别存放在哪些机器上。NameNode还负责管理文件系统常用的操作,如创建一个文件,重命名一个文件,创建一个文件夹等等。当我们通过HDFS client向HDFS读取或写文件时,所有的读写请求都是先发给NameNode,它负责创建或查询一个文件,然后再让HDFS Client和DataNodes联系具体的数据传输,当Client获取了需要的数据存放在哪里后,就直接和DataNode进行交互而不再需要NameNode了。NameNode也知道每个DataNode所在的Rack。

NameNode的工作特点

  • Namenode始终在内存中保存metadata,用于处理“读请求”
  • 有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
  • Hadoop会维护一个fsimage文件,也就是namenode中metadata的镜像,但是fsimage不会随时与namenode内存中的metadata保持一致,而是每隔一段时间通过合并edits文件来更新内容。SecondaryNamenode就是用来合并fsimage和edits文件来更新NameNode的metadata的。

hbjz

SecondaryNameNode是HA的一个解决方案,但是不支持热备份,合并流程如上图

  • secondary通知namenode切换edits文件,此时主节点产生edits.new
  • secondary通过http get方式从namenode获得fsimage和edits文件(在secondaryNamenode的current同级目录下可见到temp.check-point或者previous-checkpoint目录,这些目录中存储着从namenode拷贝来的镜像文件)
  • secondary将fsimage开始合并获取的上述两个文件,产生一个新的fsimage文件fsimage.ckpt
  • secondary将新的fsimage通过http post方式发送回NameNode
  • namenode将fsimage.ckpt与edits.new文件分别重命名为fsimage与edits,然后更新fstime,整个checkpoint过程到此结束。

什么时候checkpoint

  • fs.checkpoint.period指定两次checkpoint的最大时间间隔,默认3600秒
  • fs.checkpoint.size 规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否达到最大时间间隔。默认大小是64M。

上述配置可在core-site.xml中配置

HDFS读过程

read

  1. 初始化FileSystem,然后客户端(client)用FileSystem的open()函数打开文件
  2. FileSystem用RPC调用元数据节点,得到文件的数据块信息,对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。
  3. FileSystem返回FSDataInputStream给客户端,用来读取数据,客户端调用stream的read()函数开始读取数据。
  4. DFSInputStream连接保存此文件第一个数据块的最近的数据节点,data从数据节点读到客户端(client)
  5. 当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。
  6. 当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。
  7. 在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。
  8. 失败的数据节点将被记录,以后不再连接。

  总的来说,当HDFS Client读取一个文件时,它首先联系NameNode从NameNode获取这个文件所有的blocks和每个block的所有备份所在的机器位置。然后HDFS Client联系DataNodes,进行具体的读写操作。当client开始读取block时,client会选择从网络延迟最短的一台机器读取备份,如果第一个备份出现问题,那client就从第二个备份读,以此类推。在读写一个文件时,当我们从NameNodes得知应该向哪些DataNodes读写之后,Client就直接和DataNode打交道,不再通过NameNodes了。

reader

HDFS写过程

write

  1. 初始化FileSystem,客户端调用create()来创建文件
  2. FileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件,元数据节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。
  3. FileSystem返回DFSOutputStream,客户端用于写数据,客户端开始写入数据。
  4. DFSOutputStream将数据分成块,写入data queue。data queue由Data Streamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块)。分配的数据节点放在一个pipeline里。Data Streamer将数据块写入pipeline中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。
  5. DFSOutputStream为发出去的数据块保存了ack queue,等待pipeline中的数据节点告知数据已经写入成功。
  6. 当客户端结束写入数据,则调用stream的close函数。此操作将所有的数据块写入pipeline中的数据节点,并等待ack queue返回成功。最后通知元数据节点写入完毕。
  7. 如果数据节点在写入的过程中失败,关闭pipeline,将ack queue中的数据块放入data queue的开始,当前的数据块在已经写入的数据节点中被元数据节点赋予新的标示,则错误节点重启后能够察觉其数据块是过时的,会被删除。失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点。元数据节点则被通知此数据块是复制块数不足,将来会再创建第三份备份。

  总的来说,当Client写文件创建新的block后,NameNode会为这个block创建一个整个HDFS Cluster里独有的block ID,并且决定哪些DataNodes来存储这个block的所有备份。这些被选中的DataNodes组成一个队列,Client向队列的第一个DataNode写,那么第一个DataNode除了把数据存在自己的硬盘上以外,还要把数据传给队列里的下一个DataNode,以此类推,直到最后一个DataNode接收数据完毕。

DataPipeline

hadoop中的RPC框架机制

rpc

  其实就是,客户端与服务端都实现同一个接口,然后通过socket来进行通信,利用Java的动态反射来拿到实例对象进行远程过程调用,这里由于要进行socket传输信息,所以要进行序列化封装。序列化(Serialization)是指把结构化对象转化为字节流,方便网络传输,而反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。

  • RPC——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
  • RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
  • hadoop的整个体系结构就是构建在RPC之上的

Hadoop的序列化:Writable接口

  1. Writable接口,是根据DataInput和DataOutput实现的简单、有效的序列化对象
  2. MR的任意Key和Value必须实现Writable接口
  3. MR的任意key必须实现WritableComparable接口

xlh

所以如果需要在MapReduce中的key、value使用自定义的Bean,则此Bean对象需要实现Writable接口。

YARN

  由于第一版的Hadoop框架中,JobTracker是map-reduce的集中处理点,存在单点故障,并且JobTracker完成了太多的任务,造成了过多的资源消耗,当map-reduce的job非常多的时候,会造成很大的内存开销,存在JobTracker fail的风险,而在TaskTracker端,以map/reduce task的数目作为资源的表示过于简单,没有考虑到CPU/内存的占用情况,如果两个很大内存消耗的task被调度到了一块,就很容易出现Out Of Memory。在TaskTracker端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费。

  基于以上旧机制的缺点,新版本的Hadoop中将MapReduce框架完全重构了,将原先的资源管理和任务调度的功能从MapReduce中分离出来形成了现在的YARN框架。

yarn

  重构根本的思想是将JobTracker两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度/监控。新的资源管理组件管理着所有应用程序计算资源的分配每一个任务都由MRAppMaster负责相应的调度和协调。同时,重构后的框架也解决了单点故障问题,可靠性随之增强,并且正如一开始的图所示,分离出来的YARN组件不仅可以接收来自MarReduce的任务,还能分配从其他组件中提交的任务,减少了框架的耦合度,增加了可复用性。

新旧对比

  首先客户端不变,其调用API及接口大部分保持兼容,但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。

  ResourceManager 是一个中心的服务,它做的事情是调度、启动每一个 Job 所属的 ApplicationMaster、另外监控 ApplicationMaster 的存在情况。ResourceManager 负责作业与资源的调度。接收 JobSubmitter 提交的作业,按照作业的上下文 (Context) 信息,以及从 NodeManager 收集来的状态信息,启动调度过程,分配一个 Container 作为 MRAppMaster。

  NodeManager 功能比较专一,就是负责 Container 状态的维护,并向 RM 保持心跳。

  ApplicationMaster 负责一个 Job 生命周期内的所有工作,类似老的框架中 JobTracker。但注意每一个 Job(不是每一种)都有一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。

YARN框架的优势:

  1. 大大减小了JobTracker(即现在的ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
  2. 在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
  3. 老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。

MapReduce

MapReduce概述

  • MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题
  • MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单
  • Map和Reduce这两个函数的形参是key、value对,表示函数的输入信息

MapReduce原理

mr

map任务处理
  • 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
  • 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出
  • 每次读取一行的数据,以文本偏移量作为key,这一行的内容作为value,组合成一个k-v对作为Map的输入
reduce任务处理
  • 在reduce之前,有一个shuffle的过程对多个map任务的输出进行合并、排序。
  • 写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
  • 把reduce的输出保存到文件中
map、reduce键值对格式
函数 输入键值对 输出键值对
map() <k1,v1> <k2,v2>
reduce() <k2,{v2…} <k3,v3>
MR过程各个角色的作用
  • jobClient:是用户作业与JobTracker交互的主要接口。负责提交作业、启动、跟踪任务执行、访问任务状态和日志等
  • JobTracker:负责接收用户提交的作业,负责启动、初始化任务、跟踪任务执行,分配作业,TaskTracker与其进行通信,协调监控整个作业
  • TaskTracker:定期与JobTracker通信,执行Map和Reduce任务
  • HDFS:保存作业的数据、配置、jar包、结果

job job1

  当我们写好mapreduce程序打包成jar包后,在集群的中,使用 hadoop jar 主类名 指令来提交任务,程序中最后的job.waitforcompletion() 才是真正提交任务,这段代码之前是配置信息,配置完成后通过JobClient提交,与JobTracker通信得到一个jar的存储路径和JobId,然后进行输入输出路径检查,将jobjar拷贝到HDFS,计算输入分片,将分片信息写入到job.split中,然后会写一个job.xml文件,这些操作做完才真正提交作业。提交作业的时候会由一个RunJar进程来提交,这个进程在将作业提交完成后就会注销,可以在提交过程中,不断使用jps指令来查看进程变化。

  客户端提交作业后,RM的JobTracker会将作业加入到队列,然后进行调度,默认是FIFO方式。NM的TaskTracker与JobTracker之间的会通过心跳机制来进行通信和任务分配,TaskTracker会主动定期向JobTracker发送心跳信息,询问是否有任务要做,如果有,就会申请到任务,然后RM会分配给他们运行资源,RM会启动一个NM作为MRAppMaster,由它来调度map或reduce的任务进程(yarnChild),多少个任务进程,jsp的时候就可以看到几个yarnChild进程。在map任务完成后,会通知MRAppMaster,由它启动reduce任务,完成任务后,会将结果数据写入到HDFS中,MRAppMaster会向RM注销自己,RM会回收资源。一次任务完成。

  当然,Task会定期向TaskTracker汇报执行情况,TaskTracker会定期收集所在集群上的所有Task的信息,并向JobTracker汇报,JobTracker会根据所有TaskTracker汇报上来的信息进行汇总。如果TaskTracker出错了,就会停止向JobTracker发送心跳信息,那么JobTracker会将TaskTracker从等待的任务池中移除,并将该任务转移到其他的地方执行。同样的,如果一个任务执行很长时间还没有结果,JobTracker会起另外一个TaskTracker来执行同一个任务,哪个完成的快就采用哪个的结果,取消另一个的执行。

shuffle3

Shuffle

shuffle shuffle2

  1. 每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性设置),一旦达到阀值0.8(io.sort.spill.percent可设置),一个后台线程就把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件
  2. 写磁盘前,会进行分组(partition)和排序(sort)。如果设定了合并(combiner)程序,则进行合并再将数据分组排序。
  3. 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件
  4. Reducer通过http方式得到输出文件的分区,每个Reduce拉取属于自己要处理的分组数据
  5. TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出
  6. 排序阶段合并map输出。然后走Reduce阶段

以上过程就包含了shuffle的执行。从流程中可以看出,我们可以自己控制排序和分组的过程。在map和reduce阶段进行排序,默认情况下,比较的是key的值,value是不参与排序比较的。如果要想让value也参与排序,需要把key与value组装成新的类,作为key,才能参与比较。同样,分组时默认也是按照key进行比较的。

Partitioner && Combiners

  • Partitioner

  Partitioner<K,V>是partitioner的基类,如果需要定制partitioner需要继承该类。(public class PartitionerArea<KEY, VALUE> extends Partitioner<KEY, VALUE>{})Partition所处的位置如下所示:

part

  Partition主要作用就是将map的结果发送到相应的reduce,所以要求partition要做到负载均衡,尽量的将工作均匀的分配给不同的reduce,以及保持效率,分配速度一定要快。HashPartitioner是mapreduce的默认partitioner。计算方法是 reducer = =(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,以此来得到当前的目的reducer。

partitioner

  使用自定义分区需要在job中设置要使用的分区类:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);
//设定自定义分区定义
job.setPartitionerClass(PartitionerArea.class);
//应该要等于分区的个数,如果大于分区个数,多的部分会没数据,如果少于分区数,会报错,只设置为1个任务则没问题
job.setNumReduceTasks(6);

job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
  • Combiners

  每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reduce的数据量。combiner最基本的实现是实现本地key的归并,combiner具有类似本地的reduce功能。如果不使用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝对不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

Hadoop整体框架

hadoopall

  上图显示了一个MR程序的完整流程,可以看到这个前后有一个InputFormat和OutFormat组件,由这两个组件来与数据的输入和输出有关(FileInputFormat.setInputPaths(job, new Path(args[0]))与FileOutputFormat.setOutputPath(job, new Path(args[1])))。InputFormat与OutputFormat都只是接口,将Map的数据输入与真实的数据读取之间相隔开,这样就可以适应不同的文件系统来使用Map程序,同样OutputFormat将Reduce的数据输出与真实的文件系统相隔开,使之适应于不同的文件系统。这样就将MapReduce框架与数据的读写完全解耦合。InputFormat与OutputFormat都有一个默认的实现类,分别为TextInputFormat与TextOutputFormat,也就是读取文本文件,和写文本文件。如果要处理其他类型的数据或者换一种方法读取文本数据,则需要自己去实现一种读取写入的类。

  InputFormat会将数据分成很多切片(split),然后每个切片会用一个RecordReaders类来读取数据封装成k-v对给map接收,一个切片就对应一个map进程。同样,OutputFormat输出的时候也是由一个RecordWriters来写数据磁盘文件。

  在写mapreduce程序的时候,可以把mapred-site.xml、yarn-site.xml等配置文件复制到项目中,省去conf单独配置太多配置项的麻烦。

MapReduce的输入输出处理类

fs

  • FileInputFormat

  FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是由不同的子类(如TextInputFormat)进行实现的。

  InputFormat负责处理MR的输入部分:

InputFormat

其起到以下作用:

  1. 验证作业的输入是否规范
  2. 把输入文件切分成InputSplit
  3. 提供RecordReader的实现类,把InputSplit读到Mapper中进行处理
  • InputSplit

  在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个记录(key-value对),map会依次处理每一个记录。

  FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或是这个文件中的一部分。如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理小文件的效率高的原因。

  当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当作一个split并分配一个mao任务,导致效率低下。

  比如:一个1GB的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。

  • TextInputFormat

  TextInputFormat是默认的处理类,处理普通文本文件,文件中每一行作为一个记录,它将每一行在文件中的起始偏移量作为key,每一行的内容作为value;它默认以 \n 或回车键作为一条记录。当然,TextInputFormat继承了FileInputFormat。

  InputFormat类的层次结构如下所示:

InputFormat2

  • 其他输入类 & 自定义输入格式

  Hadoop的输入类有很多,下面就简单的列举几个:

  1. CombineFileInputFormat:相对于大量的小文件来说,hadoop更适合处理少量的大文件。CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
  2. KeyValueTextInputFormat:当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。
  3. NLineInputFormat:NLineInputformat可以控制在每个split中数据的行数。
  4. SequenceFileInputFormat:当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。
  5. 自定义输入格式:如果要自定义输入格式的话,需要自己写一个类,继承FileInputFormat基类,并重写里面的getSplits(JobContext context)方法,以及重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。
  • MapReduce的输出

  输出方面简单的说几个输出类,大致和输入类似:

  1. TextOutputFormat:默认的输出格式,key和value中间值用tab隔开的。
  2. SequenceFileOutputFormat:将key和value以sequencefile格式输出。
  3. SequenceFileAsOutputFormat:将key和value以原始二进制的格式输出。
  4. MapFileOutputFormat:将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
  5. MultipleOutputFormat:默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。

Hadoop的HA机制 —— zookeeper

什么是zookeeper

  Zookeeper是Google的Chubby的一个开源的实现,是Hadoop的分布式协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。这里不做过多介绍。

zookeeper1

  通过使用zookeeper集群,应用就可以不要自己持有配置信息等内容,而通过共享的方式来达到数据的同步,并且即使zookeeper集群中一个节点挂掉了,它可以立马切换到另一个节点继续提供服务,这样就大大增加了可靠性,这里面的实现过程全由zookeeper来自动完成。zookeeper集群功能其实主要就是提供少量数据的存储和管理,以及提供对数据节点的监听功能,zookeeper集群最好是搭建奇数个节点。

为什么使用Zookeeper

大部分分布式应用需要一个主控、协调器或控制器来管理物理分布的子进程(如资源、任务分配等)

目前,大部分应用需要开发私有的协调程序,缺乏一个通用的机制

协调程序的反复编写浪费,且难以形成通用、伸缩性好的协调器

Zookeeper可以提供通用的分布式锁服务,用以协调分布式应用

在Hadoop2.0中,使用Zookeeper的事件处理可以确保整个集群只有一个活跃的NameNode、存储配置信息等,极大增加了可用性。

HBase使用Zookeeper的事件处理确保整个集群只有一个HMaster,察觉HRegionServer联机和宕机,存储访问控制列表等。

Not HA

  如果Hadoop没有使用高可用架构的话,虽然有SN来保证数据的同步,但是如果正在使用的NN节点挂掉了,那么客户端再次访问的话,整个集群就不知道该去找哪个NN来处理访问,这样不仅会导致数据丢失,也会使服务无法进行,所以没有使用HA架构的Hadoop集群,只能保证元数据的可靠性,但是没办法保证服务的可靠性

zookeeper集群中的角色

zookeeper1

  zookeeper集群上的角色通常讲的比较多的是follower和leader,从名字中就可以看出,leader是一个主节点,相当于一个管理节点,所有数据的写操作都是由leader来实现的(Zab协议),任何一个客户端向集群写数据的时候,要想保证每个节点上的数据的一致性,都要通过leader来实现,leader先把数据更新了,然后会通知follower节点来更新数据,并且leader会认为只要集群中有超过一半的节点更新数据成功,便认为此数据更新成功。

  leader和follower并不是在集群启动前由配置文件决定谁是leader,谁是follower,没有事先的分配。集群启动时大家都是普通的zkServer,启动过程中会通过一种选举机制来选举leader,一旦leader选举出来,其他的就都变成follower。其中leader负责进行投票的发起和决议,更新系统状态等。当然,集群中其实还有一种角色————observer。它和follower都属于learner角色,但是follower用于接受客户端请求并向客户端返回结果,在选举过程中参与投票,而observer可以接受客户端连接,将写请求转发给leader,但是observer不参加投票过程,只同步leader的状态。所以,observer的目的是为了扩展系统,提高读取速度。client客户端就是请求发起方。

Leader的选举

  Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举(Paxos协议)。进行leader选举,则至少需要两台机器,以3台机器为例。

  首先先了解一下以下概念:

  服务器状态:

  • LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。

  • FOLLOWING:跟随者状态。表明当前服务器角色是Follower。

  • LEADING:领导者状态。表明当前服务器角色是Leader。

  • OBSERVING:观察者状态。表明当前服务器角色是Observer。

  投票数据结构:

  每个投票中包含了两个最基本的信息,所推举服务器的SID和ZXID,投票(Vote)在zookeeper中包含字段如下:

  • id:被推举的Leader的SID

  • zxid:被推荐的Leader事务ID

  • electionEpoch:逻辑时钟,用来判断多个投票是否在同一轮选举周期中,该值在服务端是一个自增序列,每次进入新一轮的投票后,都会对该值进行加1操作。

  • peerEpoch:被推举的Leader的epoch。

  • state:当前服务器的状态。

  1. 服务器初始化启动时

  在集群初始化阶段,当有一台服务器Server1启动时,其单独无法进行和完成Leader选举,当第二台服务器Server2启动时,此时两台机器可以互相通信,每台机器都试图找到Leader,于是进入Leader选举过程。选举过程如下:

  • 每个Server发出一个投票。由于是初始情况,每个server都会将自己作为Leader服务器来进行投票,每次投票会包含说推举的服务器的myid和ZXID,使用(myid,ZXID)来表示,此时server1的投票为(1,0),server2的投票为(2,0),然后各自将这个投票发给集群中其他机器。
  • 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
  • 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:
      · 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
      · 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。
    

      对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

  • 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。

  • 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。
  1. 服务器运行期间无法和Leader保持连接时

  在Zookeeper运行期间,Leader与非Leader服务器各司其职,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下

  • 变更状态。Leader挂后,余下的非Observer服务器都会讲自己的服务器状态变更为LOOKING,然后开始进入Leader选举过程。

  • 每个Server会发出一个投票。在运行期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123),(3, 122),然后各自将投票发送给集群中所有机器。

  • 接收来自各个服务器的投票、处理投票、统计投票、改变服务器的状态。这些都与启动时过程相同。

  选完leader以后,zk就进入状态同步过程,follower连接leader,将最大的zxid发送给leader,leader根据follower的zxid确定同步点,完成同步后通知follower已经成为uptodate状态,follower收到uptodate消息后,又可以重新接受client的请求进行服务了。

  由上面的过程可知,通常哪台服务器上的数据越新(zxid会越大),其成为leader的可能性也越大,也就越能保证数据的恢复。如果zxid相同,则sid越大机会越大。

select1 select2

  由于observer不参与选举,所以当集群从正常工作状态转变为寻找leader状态的时候,observer结点就会处于等待状态,直到选出leader,才与leader继续保持通信。

zookeeper的数据模型

  Zookeeper的数据模型结构类似标准的文件系统,但这个文件系统中没有文件和目录,而是统一使用节点(node)的概念,称为znode。Znode作为保存数据的容器(限制在1mb以内),也构成了一个层次化的命名空间。

znode

znode

  zookeeper目录中的每一个节点对应着一个znode,每个znode维护着一个属性结构,它包含数据的版本号、时间戳等信息。Zookeeper就是通过这些属性来实现它特定的功能。每当znode的数据改变时,相应的版本号会增加,每当客户端查询、更新和删除数据时,也必须提供要被操作的znode版本号,如果所提供的数据版本号与实际的不匹配,那么将会操作失败。

节点属性如下:

znode1

  Znode有四种形式的目录节点:PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMRAL、EPHEMERAL_SEQUENTIAL。

  zookeeper的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为session,如果znode是临时节点,这个session失效了,znode也就删除了;持久化目录节点存储的数据不会丢失

Znode是客户端访问zookeeper的主要实体,它包含了以下主要特征:

Watch

  • Znode状态发生改变时(增删改等操作),watch(监视器)机制可以让客户端得到通知,并且仅仅只会触发一次watch。当某个目录节点中存储的数据被修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应达到实时的信息同步。

数据访问控制

  • 每个znode创建时都会有一个ACL列表(Access Control List 访问控制列表),用于决定谁可以执行哪些操作。每个ACL都是身份验证模式、符合该模式的一个身份和一组权限的组合。

临时节点

  • zookeeper节点类型有两种,短暂的(ephemeral)和持久的(persistent)。短暂znode是临时节点,一旦创建这个znode的客户端会话结束与服务器失去联系时,zookeeper会将该znode删除,短暂znode不可以有子节点。持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除。

顺序节点

  • 当创建znode时设置顺序标识,那么该znode路径之后会附加一个单调递增的计数,这个计数器由父节点维护,在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

zookeeper工作原理

  Zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式和广播模式。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。

  一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个server加入zookeeper服务中,它会在恢复模式下启动,发现leader,并和leader进行状态同步。待到同步结束,它也参与消息广播。Zookeeper服务一直维持在Broadcast状态,直到leader崩溃了或者leader失去了大部分的followers支持。

  广播模式需要保证proposal被按顺序处理,因此zk采用了递增的事务id号(zxid)来保证。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64为的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch。低32位是个递增计数。

  当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的server都恢复到一个正确的状态。

  Broadcast模式极其类似于分布式事务中的2pc(two-phrase commit 两阶段提交),即leader提起一个决议,由于followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过则执行该决议(事务),否则什么也不做。

HA架构

HA

  要搭建Hadoop的高可用架构,首先要考虑以下几个问题:

  1. 能否让两个NN都正常响应客户端请求:

  不能,应该让两个NN节点在某个时间只能有一个节点正常响应客户端请求,响应请求的必须为ACTIVE状态的那一台

  1. standby状态的节点必须能够快速无缝的切换为active状态:

  当处于active的NN节点挂掉了,不能处理客户端请求的时候,处于standby状态的NN节点必须能够快速无缝的切换为active状态,代替原先的节点来处理客户端请求,这意味着两个NN必须时刻保持元数据的一致。

  1. 如何避免状态切换时发生brain split现象

  brain split叫做“脑裂”,也就是切换时发送原先那台NN节点没死,导致两台机子都处于active状态。解决办法:fencing(隔离)机制:就是保证在任何时候只有一个主NN,包括三个方面:

  • 共享存储fencing,确保只有一个NN可以写入edits
  • 客户端fencing,确保只有一个NN可以响应客户端请求
  • DataNode fencing,确保只有一个NN可以向DN下发命令,譬如删除块,复制块,等等。

  所以,为了解决以上的问题,可以将edits文件放到zookeeper集群中管理,不要让任何一个NN节点单独管理edits文件,这样就可以保证元数据的一致性。而为了使放在集群的元数据可靠,不会出现NN没挂掉,元数据挂掉的情况,Hadoop的其中一种HA架构中,使用了基于zookeeper的qjournal元数据存储管理系统,每个存放edits的节点就称为journalnode节点,它和zookeeper的znode有一样的特性,数据之间保持一致,即使挂了几台,只要有半数以上的节点还存活着就能正常运行,这样就保证了极大的可靠性。

  每一个NN都有一个状态监控进程叫zkfc(zookeeper failover controller),监控各自节点的状态,zkfc进程会一直访问zk来写入和获取NN的状态,当处于active状态的NN节点运作不正常的时候,监控它的zkfc会将不正常的信号写入到zk中,另一个监控standby节点的zkfc读取到这个信息后,会先确保原先的NN关闭,然后将自己监控那台节点状态从standby状态变成active状态,它会用两种方式来确保原先的NN节点是真的挂掉,而不是不稳定或假死但是还活着的“脑裂”状况。原先监控standby节点的进程将NN变成active接替原先的NN处理客户端请求的时候,为了防止“脑裂”的情况发生,会首先通过ssh发送一个关闭原先NN节点的指令,如果一定时间内没有收到回应,就会调用自定义的shell脚本直接关闭原先的NN节点,确保同一时刻只有一个节点处于active状态。

  为什么不采用NN与zookeeper进行心跳等信息同步而采用zkfc进程来监控NN节点来确保节点切换等功能呢?最简单的原因是一次FullGC就可以让NN挂起十几分钟,所以,必须要有一个独立的短小精悍的watchdog来专门负责监控。同时这也是一个松耦合的设计,便于扩展或更改,目前版本里是用zookeeper来做同步锁,但用户可以方便的把这个zookeeper FailoverController(zkfc)替换为其他的HA方案或leader选举方案。

  Hadoop2.x版本以后才有这种高可用机制,并且还有一个Federation架构

hdfs-federation

  Federation能够快速的解决大部分单Namenode的问题。为了水平扩展namenode,federation使用了多个独立的namenode/namespace。这些namenode之间是联合的,也就是说他们之间互相独立不需要相互协调,各自分工,管理自己的区域。一个federation里面有多个namenode,即使一个挂了也不会影响其他namenode,并且访问时可以访问federation的命名空间来访问不同的数据,这样可以扩大集群的规模。如上上图的hdfs://ns1/aa与hdfs://ns2/aa就是访问命名空间为ns1的aa和命名空间为ns2的aa。

yarnHA

  同样的,yarn集群也可以采用这种集群的方式实现高可用性。但是namenode即使客户端在访问时。如上传数据,下载文件时,宕机了,这些操作依旧可以成功,但是由于提交任务到集群时,涉及很复杂的资源调度问题,所以如果任务提交,运行在一半的时候resourceManager节点挂了,那这个任务会报错失败。

zookeeper HA的部署

  1. 上传zk安装包(zookeeper的环境变量配置和Hadoop类似)
  2. 解压到apps目录:tar -xvzf ……
  3. 配置(先在一台节点上配置)

  zookeeper的默认配置文件为zookeeper/conf/zoo_sample.cfg,需要将其修改为zoo.cfg。其中各配置项的含义,解释如下:

  1. tickTime:CS通信心跳时间。Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。默认 tickTime=2000
  2. initLimit:LF初始通信时限。集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。默认 initLimit=5 也就是说最长可以 5*2000 毫秒不联系。
  3. syncLimit:LF同步通信时限。集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)。默认 syncLimit=2
  4. dataDir:数据文件目录。Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里。dataDir=/home/hadoop/apps/zookeeper-3.4.5/data
  5. clientPort:客户端连接端口。客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。默认 clientPort=2181
  6. 服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)。这个配置项的格式比较特殊,规则如下:

  server.N=YYYY:A:B

如:

server.1=hdp-node-01:2888:3888

server.2=hdp-node-02:2888:3888

server.3=hdp-node-03:2888:3888

配置文件修改完之后,需要在dataDir=/home/hadoop/apps/zookeeper-3.4.5/data 路径下创建一个myid文件,里面内容是 server.N 中的N:echo 1 > myid

  然后将配置好的zk拷贝到其他需要安装zk的结点:scp -r …

  注意:在其他节点上一定要修改myid的内容

  当然,因为要实现HA机制,所以原先的Hadoop集群需要修改一些配置文件,这里就贴一下大致的配置文件,不做太详细的说明了。注意,这部只用了三台机子配置所有的,实际中最好能做一些区分,不该配置在一台的就分开配置。网上搜罗了一下,看他人的这样分配挺可以的。

集群规划:
	主机名		IP		安装的软件			运行的进程
	weekend01	192.168.1.201	jdk、hadoop			NameNode、DFSZKFailoverController(zkfc)
	weekend02	192.168.1.202	jdk、hadoop			NameNode、DFSZKFailoverController(zkfc)
	weekend03	192.168.1.203	jdk、hadoop			ResourceManager
	weekend04	192.168.1.204	jdk、hadoop			ResourceManager
	weekend05	192.168.1.205	jdk、hadoop、zookeeper		DataNode、NodeManager、JournalNode、QuorumPeerMain
	weekend06	192.168.1.206	jdk、hadoop、zookeeper		DataNode、NodeManager、JournalNode、QuorumPeerMain
	weekend07	192.168.1.207	jdk、hadoop、zookeeper		DataNode、NodeManager、JournalNode、QuorumPeerMain

修改core-site.xml

<configuration>
    <!-- 指定hdfs的nameservice为ns1 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://ns1/</value>
    </property>
    <!-- 指定hadoop临时目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/hadoop/app/hadoop-2.8.1/data</value>
    </property>
    
    <!-- 指定zookeeper地址 -->
    <property>
        <name>ha.zookeeper.quorum</name>
        <value>dap-node-01:2181,hdp-node-02:2181,hdp-node-03:2181</value>
    </property>
</configuration>

修改hdfs-site.xml

<configuration>
    <!--指定hdfs的nameservice为ns1,需要和core-site.xml中的保持一致 -->
    <property>
        <name>dfs.nameservices</name>
        <value>ns1</value>
    </property>
    <!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
    <property>
        <name>dfs.ha.namenodes.ns1</name>
        <value>nn1,nn2</value>
    </property>
    <!-- nn1的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.ns1.nn1</name>
        <value>hdp-node-01:9000</value>
    </property>
    <!-- nn1的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.ns1.nn1</name>
        <value>hdp-node-01:50070</value>
    </property>
    <!-- nn2的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.ns1.nn2</name>
        <value>hdp-node-02:9000</value>
    </property>
    <!-- nn2的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.ns1.nn2</name>
        <value>hdp-node-02:50070</value>
    </property>
    <!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hdp-node-01:8485;hdp-node-02:8485;hdp-node-03:8485/ns1</value>
    </property>
    <!-- 指定JournalNode在本地磁盘存放数据的位置 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/home/hadoop/app/hadoop-2.8.1/journaldata</value>
    </property>
    <!-- 开启NameNode失败自动切换 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
    <!-- 配置失败自动切换实现方式 -->
    <property>
        <name>dfs.client.failover.proxy.provider.ns1</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>
    <!-- 使用sshfence隔离机制时需要ssh免登陆 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/hadoop/.ssh/id_rsa</value>
    </property>
    <!-- 配置sshfence隔离机制超时时间 -->
    <property>
        <name>dfs.ha.fencing.ssh.connect-timeout</name>
        <value>30000</value>
    </property>
</configuration>

修改yarn-site.xml

<configuration>
    <!-- 开启RM高可用 -->
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <!-- 指定RM的cluster id -->
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>yrc</value>
    </property>
    <!-- 指定RM的名字 -->
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <!-- 分别指定RM的地址 -->
    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>hdp-node-02</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>hdp-node-03</value>
    </property>
    <!-- 指定zk集群地址 -->
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

启动zookeeper集群:分别在每个节点上执行:zkServer.sh start,如果没有配置zookeeper的环境变量,则需要在zookeeper安装路径下的bin或sbin目录中执行此指令。启动后可用 jps 指令查看进程是否已经起来,也可以使用 zkServer.sh status 查看各个zookeeper的状态以及身份

启动journalnode:hadoop-daemon.sh start journalnode,然后可以使用jps查看是否多了JournalNode进程。

格式化HDFS,在master节点机器上执行:hdfs namenode -format,格式化后会根据core-site.xml中的hadoop.tmp.dir配置生成一个文件夹,将这个文件夹拷贝到作为namenode的节点机器上

格式化ZKFC,master机器上执行就可以:hdfs zkfc -formatZK

启动HDFS、启动YARN

注意,zookeeper启动成功的条件是至少有配置集群的一半以上数量的节点还存活,这样才可以选出leader,zookeeper集群才能正常运行。

  顺便附上HDFS_HA的Java代码

package cn.zhxiqi.hadoop.hdfs;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFS_HA {
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://ns1");
		conf.set("dfs.nameservices", "ns1");
		conf.set("dfs.ha.namenodes.ns1", "nn1,nn2");
		conf.set("dfs.namenode.rpc-address.ns1.nn1", "hdp-node-01:9000");
		conf.set("dfs.namenode.rpc-address.ns1.nn2", "hdp-node-02:9000");
		//conf.setBoolean(name, value);
		conf.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
		FileSystem fs = FileSystem.get(new URI("hdfs://ns1"), conf, "hadoop");
		InputStream in =new FileInputStream("c://eclipse.rar");
		OutputStream out = fs.create(new Path("/eclipse"));
		IOUtils.copyBytes(in, out, 4096, true);
	}
}

datanode节点超时时间设置

datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。

HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:

timeout  = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval

而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
需要注意的是hdfs-site.xml 配置文件中的
heartbeat.recheck.interval的单位为毫秒,
dfs.heartbeat.interval的单位为秒。

所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。

在hdfs-site.xml中的参数设置格式:

<property>
    <name>heartbeat.recheck.interval</name>
    <value>2000</value>
</property>
<property>
    <name>dfs.heartbeat.interval</name>
    <value>1</value>
</property>

HDFS冗余数据块的自动删除

在日常维护hadoop集群的过程中发现这样一种情况: 某个节点由于网络故障或者DataNode进程死亡,被NameNode判定为死亡,HDFS马上自动开始数据块的容错拷贝;当该节点重新添加到集群中时,由于该节点上的数据其实并没有损坏,所以造成了HDFS上某些block的备份数超过了设定的备份数。通过观察发现,这些多余的数据块经过很长的一段时间才会被完全删除掉,那么这个时间取决于什么呢?该时间的长短跟数据块报告的间隔时间有关。Datanode会定期将当前该结点上所有的BLOCK信息报告给Namenode,参数dfs.blockreport.intervalMsec就是控制这个报告间隔的参数。

hdfs-site.xml文件中有一个参数:

<property>
    <name>dfs.blockreport.intervalMsec</name>
    <value>10000</value>
    <description>Determines block reporting interval in milliseconds.</description>
</property>

其中3600000为默认设置,3600000毫秒,即1个小时,也就是说,块报告的时间间隔为1个小时,所以经过了很长时间这些多余的块才被删除掉。通过实际测试发现,当把该参数调整的稍小一点的时候(60秒),多余的数据块确实很快就被删除了。

hbase & hive

HBase

hadoopHbase

  • Hbase —— Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用Hbase技术可在廉价PC Server上搭建起大规模结构化存储集群。HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具。

HBase集群中的角色

hbaseJG

  • Zookeeper集群

zookeeperinHBase

  提供HMaster的HA以及分布式配置管理服务,Master和所有RegionServer都向zookeeper注册。

  • HMaster启动的时候会将HBase系统表 -ROOT- 加载到 zookeeper cluster,通过zookeeper cluster 可以获取当前系统表 .META. 的存储所对应的RegionServer信息
  • 保证任何时候,集群中只有一个master
  • 存贮所有Region的寻址入口
  • 实时监控RegionServer的状态,将RegionServer的上线和下线信息实时通知给Master
  • 存储HBase的schema,包括有哪些table,每个table有哪些columnFamily

  zookeeper是HBase集群的“协调器”。由于zookeeper的轻量级特性,因此我们可以将多个HBase集群共用一个zookeeper集群,以节约大量的服务器。多个HBase集群共用zookeeper集群的方法是使用同一组IP,修改不同HBase集群的“zookeeper.znode.parent”属性,让它们使用不同的根目录。比如cluster1使用/hbase-c1,cluster2使用/hbase-c2,等等。

  • HMaster

HMaster是HBase主/从集群架构中的中央节点。通常一个HBase集群存在多个HMaster节点,其中一个为Active Master,其余为Backup Master.

Hbase每时每刻只有一个HMaster主服务器程序在运行,HMaster将region分配给region服务器,协调region服务器的负载并维护集群的状态。HMaster不会对外提供数据服务,而是由region服务器负责所有regions的读写请求及操作。

由于HMaster只维护表(table的元数据信息保存在zookeeper上)和region的元数据,而不参与数据的输入/输出过程,HMaster失效仅仅会导致所有的元数据无法被修改,但表的数据读/写还是可以正常进行的。

HMaster的作用:

  • 管理HRegionServer,实现其负载均衡。
  • 管理和分配HRegion,为RegionServer分配region,发现失效的RegionServer并重新分配其上的Region。比如在HRegion split时分配新的HRegion;在HRegionServer退出时迁移其内部的HRegion到其他HRegionServer上。
  • 实现DDL操作(Data Definition Language,namespace和table的增删改,column family的增删改等)。
  • 管理namespace和table的元数据(实际存储在HDFS上),管理HDFS上的垃圾文件回收
  • 权限控制(ACL)
  • HRegionServer
  • 存放和管理本地HRegion,读写HDFS,管理Table中的数据
  • 维护master分配给它的region,处理对这些region的io请求,Client直接通过HRegionServer读写数据(从HMaster中获取元数据,找到RowKey所在的HRegion/HRegionServer后)
  • 负责切分正在运行过程中变的过大的region

具体的Region切分细节可以参考这个博客:HBase原理 – 所有Region切分的细节都在这里了

  HRegionServer一般和DataNode在同一台机器上运行,实现数据的本地性。HRegionServer包含多个HRegion,由WAL(HLog)、BlockCache、MemStore、HFile组成。

RegionServer

  BlockCache是一个读缓存,即“引用局部性”原理。

put1 put2

  当客户端发起一个Put请求时,首先它从HBase:meta表中查出该Put数据最终需要去的HRegionServer。然后客户端将Put请求发送给相应的HRegionServer,在HRegionServer中它首先会将该Put操作写入WAL日志文件中,写完WAL日志文件后,HRegionServer根据Put中的TableName和RowKey找到对应的HRegion,并根据ColumnFamily找到对应的Store,并将Put写入到该Store的MemStore中。此时写成功,并返回通知客户端。

  • HRegion

  所有的数据库数据一般是保存在Hadoop分布式系统上面的,用户通过一系列HRegion服务器获取这些数据。一台机器上一般只运行一个HRegion服务器,而且每一分区段的HRegion也只会被一个HRegion服务器维护。HRegion服务器包含两大部分:HLog部分和HRegion部分。其中HLog用来存储数据日志,采用的是先写日志的方式。HRegion部分由很多的HRegion组成,存储的是实际的数据。每个HRegion又由很多的Store组成,每一个Store存储的实际上是一个列簇(ColumnFamily)下的数据。此外,在每一个Store中又包含一块MemStore。MemStore驻留在内存中,数据到来时首先更新到MemStore中,当到达阈值之后再更新到对应的HFile中。每一个Store包含了多个HFile,HFile负责的是实际数据存储,为HBase中最小的存储单元。

  • Store

  一个HRegion中存在多个Store,Store可以认为与表的ColumnFamily对应,也就是不同的ColumnFamily进行分别存储,以便针对不同的ColumnFamily采用不同的压缩算法进行压缩。Store由MemStore和一系列的StoreFile组成,用户的数据首先写入到MemStore中,当MemStore满了以后会Flush成一个StoreFile(底层实现是HFile)

  • MemStore

  MemStore是一个In Memory Sorted Buffer,在每个HStore中都有一个MemStore,即它是一个HRegion的一个Column Family对应一个实例。它的排列顺序以RowKey、ColumnFamily、Column的顺序以及Timestamp的倒序,如下所示:

MemStore

  每一次Put/Delete请求都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile,即一个Store(ColumnFamily)可以有0个或多个StoreFile。有三种情况会触发MemStore的Flush动作,不过MemStore的最小Flush单元是HRegion而不是MemStore。

  1. 当一个MemStore的大小超过了hbase.hregion.memstore.flush.size的大小,默认128MB。此时当前的HRegion中所有的MemStore会Flush到HDFS中。
  2. 当全局MemStore的大小超过了hbase.regionserver.global.memstore.upperLimit的大小,默认40%的内存使用量。此时当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush顺序是MemStore大小的倒序(一个HRegion中所有MemStore总和作为该HRegion的MemStore的大小还是选取最大的MemStore作为参考?有待考证),直到总体的MemStore使用量低于hbase.regionserver.global.memstore.lowerLimit,默认38%的内存使用量。
  3. 当前HRegionServer中WAL的大小超过了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush使用时间顺序,最早的MemStore先Flush直到WAL的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs。

  在MemStore Flush过程中,还会在尾部追加一些meta数据,其中就包括Flush时最大的WAL sequence值,以告诉HBase这个StoreFile写入的最新数据的序列,那么在Recover时就直到从哪里开始。在HRegion启动时,这个sequence会被读取,并取最大的作为下一次更新时的起始sequence。

MemStore1

  • StoreFile

  一个Store中包含一个或多个StoreFile,是将MemStore中的数据Flush到磁盘上的存储文件

  • HLog(WAL)

HLog

  HBase中WAL(Write Ahead Log)的存储格式,物理上是Hadoop的Sequence File。

  Client访问HBase时,首先将操作日志写入到HLog中,成功之后才会真正的更新MemStore,最后写入HFile。采用这种模式,可以保证HRegionServer宕机后,我们依然可以从该Log文件中读取数据。假如写HRegion时,此时的Hregion失败导致写失败,此时可以从HLog中重新读取写失败的数据。

  • HFile

  HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级包装。在MemStore的Flush过程中生成HFile,由于MemStore中存储的Cell遵循相同的排列顺序,因而Flush过程是顺序写。

关于HFile文件格式可以参考这个:HFile文件格式与HBase读写

HFile

  • KeyValue

NoSQLTable

  在NoSQL领域,数据表的模样如上所示,行由看似“杂乱无章”的列组成,行与行之间也无须遵循一致的定义,这种定义恰好符合半结构化数据或非结构化数据的特点。HBase就属于该流派的一个典型代表。这些“杂乱无章”的列所构成的多行数据,被称之为一个“稀疏矩阵”,图中的每一个“黑块”,在HBase中称之为一个KeyValue。每一行中的每一列数据,都被包装成独立的拥有特定结构的KeyValue,KeyValue中包含了丰富的自我描述信息:

数据模型

RowKey

  用来表示唯一一行记录的主键,HBase的数据是按照RowKey的字典顺序进行全局排序的,所有的查询都只能依赖于这一个排序维度。访问HBase table中的行时有三种方式:

  1. 通过单个row key访问
  2. 通过row key的range
  3. 全表扫描

Region

  将HBase中拥有的数亿行的一个大表,横向切割成一个个“子表”,这一个个“子表”就是Region,Region是HBase中负载均衡的基本单元,当一个Region增长到一定大小以后,会自动分裂成两个。

Region

ColumnFamily

  如果将Region看成是一个表的横向切割,那么,一个Region中的数据列的纵向切割,称之为一个ColumnFamily。每一个列,都必须归属于一个ColumnFamily,这个归属关系是在写数据时指定的,而不是建表时预先定义。

ColumnFamily

  列族在创建表的时候声明,一个列族可以包含多个列,列中数据都是以二进制形式存在,没有数据类型。

timestamp

  HBase中通过row和columns确定的一个存贮单元称为cell。每个cell都保存着同一份数据的多个版本。版本通过时间戳来索引。

HBase表结构

  HBase中有两张特殊的Table,-ROOT- 和 .META.

  -ROOT-:记录了 .META. 表的Region信息,-ROOT-只有一个Region

  .META.:记录了用户创建的表的Region信息,.META. 可以有很多个Region

  zookeeper中记录了 -ROOT- 表的location,Client访问用户数据之前需要首先访问zookeeper,然后访问 -ROOT- 表,接着访问 .META. 表,最后才能找到用户数据的位置去访问。

read1

  这种结构,大大增加了HBase可存储数据的容量,不过对于数据量不是很大的公司来说,完全没有必要实现这种三级分表的机制

  HBase几乎不支持什么事务操作,但是它能存储超大量的数据,只需在普通的硬件服务器上就行。

  HBase在建表的时候,不需要指定表中有哪些列(字段),只需要指定有哪些列族,插入数据时,列族中可以存储任意多个列(存的是KeyValue,列名:列值),HBase不在乎冗余,只为了查询方便,不需要满足结构化数据库的三大范式。不过表中的行键是唯一的。HBase去更改字段值的时候,不会删除历史值,而是以版本来区分,也就是说一个value可以有多个版本,通过版本号来区分(时间戳)。要查询某一个具体字段的值,需要指定的坐标有:表名 —> 行键 —> 列族(ColumnFamily):列名(Qualifier) —> 版本,一个单元格子又叫一个cell,一个cell中可以存储多个版本的数据。

HBaseTable

HBase与普通RDBMS的区别

  1. RDBMS

  RDBMS有以下特点:

  面向视图:RDBMS表使用固定的视图,表中的数据类型也会事先定义。表的视图在创建时就已经定义,并且不容易修改。向视图中添加元素的操作会以新建表的形式实现,这一操作会在原始表和新建表中建立一对一关系。这也限制了RDBMS的使用场景,RDBMS适合高度结构化的用例场景,比如金融数据存储。

  标准化数据:RDBMS通常存储着高度标准化的数据,但是数据仓库中可能存在非标准化的数据。数据仓库通常针对的是报表型用例,而标准化数据存储则是针对事务型用例。

  薄表:RDBMS表中通常不会包含太多列,并且最大只支持几百个列。这导致RDMBS通常会使用多个表,并且会在这些表间建立各种关系,比如一对一、一对多、多对多。

  1. HBase(NoSQL数据库)

  HBase有以下特点:

  弱视图:HBase是一种高效的映射嵌套。用户可以在运行时定义列,每一行都有属于自己的列。HBase将解释数据值的任务交给应用程序。因此,HBase非常适合数据结构灵活的应用。每一个列中都可以存储任意多的字段,需要就设定,不需要就放空。

  非标准化数据:从HBase表中检索出的行是用例视角的信息全集,这能极大减少服务器的执行周期并支持大量的并发请求。

  表冗余:HBase可以存储上千万行,几十亿列,数据会很冗余,不能进行事务操作,但是对于查询很方便。

HBase集群搭建

  1. 首先要确保zookeeper集群已经搭建成功,然后配置HBase集群,要修改3个文件(注意:要把Hadoop的hdfs-site.xml个core-site.xml 放到hbase/conf下)
  • 修改hbase-env.sh
export JAVA_HOME=${JAVA_HOME}
#告诉hbase使用外部的zookeeper
export HBASE_MANAGES_ZK=false
  • 修改hbase-site.xml
<configuration>
    <!-- 指定hbase在HDFS上存储的路径 -->
    <property>
            <name>hbase.rootdir</name>
            <value>hdfs://ns1/hbase</value>
    </property>
    <!-- 指定hbase是分布式的 -->
    <property>
            <name>hbase.cluster.distributed</name>
            <value>true</value>
    </property>
    <!-- 指定zk的地址,多个用“,”分割 -->
    <property>
            <name>hbase.zookeeper.quorum</name>
            <value>hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181</value>
    </property>
</configuration>
  • 修改regionservers文件
hdp-node-01
hdp-node-02
hdp-node-03
  1. 将配置好的HBase拷贝到其他节点:用scp - r 指令,并同步时间

  2. 启动所有的hbase

  分别启动zk:./zkServer.sh start

  启动hbase集群:start-dfs.sh

  启动hbase,在主节点上运行:start-hbase.sh

  1. 通过浏览器访问hbase管理页面:192.168.33.101:60010

  2. 如果有条件的话,为保证集群的可靠性,要启动多个HMaster,在另外的作为Master节点的机子上:hbase-daemon.sh start master

基本的HBase Shell

进入hbase命令行
./hbase shell

显示hbase中的表
list

创建user表,包含info、data两个列族
create 'user', 'info1', 'data1'
create 'user', {NAME => 'info', VERSIONS => '3'}

向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsan
put 'user', 'rk0001', 'info:name', 'zhangsan'

向user表中插入信息,row key为rk0001,列族info中添加gender列标示符,值为female
put 'user', 'rk0001', 'info:gender', 'female'

向user表中插入信息,row key为rk0001,列族info中添加age列标示符,值为20
put 'user', 'rk0001', 'info:age', 20

向user表中插入信息,row key为rk0001,列族data中添加pic列标示符,值为picture
put 'user', 'rk0001', 'data:pic', 'picture'

获取user表中row key为rk0001的所有信息
get 'user', 'rk0001'

获取user表中row key为rk0001,info列族的所有信息
get 'user', 'rk0001', 'info'

获取user表中row key为rk0001,info列族的name、age列标示符的信息
get 'user', 'rk0001', 'info:name', 'info:age'

获取user表中row key为rk0001,info、data列族的信息
get 'user', 'rk0001', 'info', 'data'
get 'user', 'rk0001', {COLUMN => ['info', 'data']}

get 'user', 'rk0001', {COLUMN => ['info:name', 'data:pic']}

获取user表中row key为rk0001,列族为info,版本号最新5个的信息
get 'user', 'rk0001', {COLUMN => 'info', VERSIONS => 2}
get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5}
get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5, TIMERANGE => [1392368783980, 1392380169184]}

获取user表中row key为rk0001,cell的值为zhangsan的信息
get 'people', 'rk0001', {FILTER => "ValueFilter(=, 'binary:图片')"}

获取user表中row key为rk0001,列标示符中含有a的信息
get 'people', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}

示例:
put 'user', 'rk0002', 'info:name', 'fanbingbing'
put 'user', 'rk0002', 'info:gender', 'female'
put 'user', 'rk0002', 'info:nationality', '中国'
get 'user', 'rk0002', {FILTER => "ValueFilter(=, 'binary:中国')"}


查询user表中的所有信息
scan 'user'

查询user表中列族为info的信息
scan 'user', {COLUMNS => 'info'}
scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 5}
scan 'persion', {COLUMNS => 'info', RAW => true, VERSIONS => 3}
查询user表中列族为info和data的信息
scan 'user', {COLUMNS => ['info', 'data']}
scan 'user', {COLUMNS => ['info:name', 'data:pic']}


查询user表中列族为info、列标示符为name的信息
scan 'user', {COLUMNS => 'info:name'}

查询user表中列族为info、列标示符为name的信息,并且版本最新的5个
scan 'user', {COLUMNS => 'info:name', VERSIONS => 5}

查询user表中列族为info和data且列标示符中含有a字符的信息
scan 'user', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}

查询user表中列族为info,rk范围是[rk0001, rk0003)的数据
scan 'people', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}

查询user表中row key以rk字符开头的
scan 'user',{FILTER=>"PrefixFilter('rk')"}

查询user表中指定范围的数据
scan 'user', {TIMERANGE => [1392368783980, 1392380169184]}

删除数据
删除user表row key为rk0001,列标示符为info:name的数据
delete 'people', 'rk0001', 'info:name'
删除user表row key为rk0001,列标示符为info:name,timestamp为1392383705316的数据
delete 'user', 'rk0001', 'info:name', 1392383705316


清空user表中的数据
truncate 'people'


修改表结构
首先停用user表(新版本不用)
disable 'user'

添加两个列族f1和f2
alter 'people', NAME => 'f1'
alter 'user', NAME => 'f2'
启用表
enable 'user'


###disable 'user'(新版本不用)
删除一个列族:
alter 'user', NAME => 'f1', METHOD => 'delete' 或 alter 'user', 'delete' => 'f1'

添加列族f1同时删除列族f2
alter 'user', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}

将user表的f1列族版本号改为5
alter 'people', NAME => 'info', VERSIONS => 5
启用表
enable 'user'


删除表
disable 'user'
drop 'user'

示例:
get 'person', 'rk0001', {FILTER => "ValueFilter(=, 'binary:中国')"}
get 'person', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}
scan 'person', {COLUMNS => 'info:name'}
scan 'person', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}
scan 'person', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}

scan 'person', {COLUMNS => 'info', STARTROW => '20140201', ENDROW => '20140301'}
scan 'person', {COLUMNS => 'info:name', TIMERANGE => [1395978233636, 1395987769587]}
delete 'person', 'rk0001', 'info:name'

alter 'person', NAME => 'ffff'
alter 'person', NAME => 'info', VERSIONS => 10

get 'user', 'rk0002', {COLUMN => ['info:name', 'data:pic']}

HBase Java demo

package cn.zhxiqi.bigdata.hbase;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class HbaseDemo {

	private Configuration conf = null;
	
	@Before
	public void init(){
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "weekend05,weekend06,weekend07");
	}
	
	@Test
	public void testDrop() throws Exception{
		HBaseAdmin admin = new HBaseAdmin(conf);
		admin.disableTable("account");
		admin.deleteTable("account");
		admin.close();
	}
	
	@Test
	public void testPut() throws Exception{
		HTable table = new HTable(conf, "person_info");
		Put p = new Put(Bytes.toBytes("person_rk_bj_zhang_000002"));
		p.add("base_info".getBytes(), "name".getBytes(), "zhangwuji".getBytes());
		table.put(p);
		table.close();
	}
	
	@Test
	public void testGet() throws Exception{
		HTable table = new HTable(conf, "person_info");
		Get get = new Get(Bytes.toBytes("person_rk_bj_zhang_000001"));
		get.setMaxVersions(5);
		Result result = table.get(get);
		List<Cell> cells = result.listCells();
		
//			result.getValue(family, qualifier);  可以从result中直接取出一个特定的value
		
		//遍历出result中所有的键值对
		for(KeyValue kv : result.list()){
			String family = new String(kv.getFamily());
			System.out.println(family);
			String qualifier = new String(kv.getQualifier());
			System.out.println(qualifier);
			System.out.println(new String(kv.getValue()));	
		}
		table.close();
	}
	
	/**
	 * 多种过滤条件的使用方法
	 * @throws Exception
	 */
	@Test
	public void testScan() throws Exception{
		HTable table = new HTable(conf, "person_info".getBytes());
		Scan scan = new Scan(Bytes.toBytes("person_rk_bj_zhang_000001"), Bytes.toBytes("person_rk_bj_zhang_000002"));
		
		//前缀过滤器----针对行键
		Filter filter = new PrefixFilter(Bytes.toBytes("rk"));
		
		//行过滤器
		ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_rk_bj_zhang_000001"));
		RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator);
		
		/**
         * 假设rowkey格式为:创建日期_发布日期_ID_TITLE
         * 目标:查找  发布日期  为  2014-12-21  的数据
         */
        rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2014-12-21_"));
		
		//单值过滤器 1 完整匹配字节数组
		new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes());
		//单值过滤器2 匹配正则表达式
		ByteArrayComparable comparator = new RegexStringComparator("zhang.");
		new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);

		//单值过滤器2 匹配是否包含子串,大小写不敏感
		comparator = new SubstringComparator("wu");
		new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator);

		//键值对元数据过滤-----family过滤----字节数组完整匹配
        FamilyFilter ff = new FamilyFilter(
                CompareOp.EQUAL , 
                new BinaryComparator(Bytes.toBytes("base_info"))   //表中不存在inf列族,过滤结果为空
                );
        //键值对元数据过滤-----family过滤----字节数组前缀匹配
        ff = new FamilyFilter(
                CompareOp.EQUAL , 
                new BinaryPrefixComparator(Bytes.toBytes("inf"))   //表中存在以inf打头的列族info,过滤结果为该列族所有行
                );
		
        
       //键值对元数据过滤-----qualifier过滤----字节数组完整匹配
        
        filter = new QualifierFilter(
                CompareOp.EQUAL , 
                new BinaryComparator(Bytes.toBytes("na"))   //表中不存在na列,过滤结果为空
                );
        filter = new QualifierFilter(
                CompareOp.EQUAL , 
                new BinaryPrefixComparator(Bytes.toBytes("na"))   //表中存在以na打头的列name,过滤结果为所有行的该列数据
        		);	
        //基于列名(即Qualifier)前缀过滤数据的ColumnPrefixFilter
        filter = new ColumnPrefixFilter("na".getBytes());
        
        //基于列名(即Qualifier)多个前缀过滤数据的MultipleColumnPrefixFilter
        byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")};
        filter = new MultipleColumnPrefixFilter(prefixes);
        //为查询设置过滤条件
        scan.setFilter(filter);
		scan.addFamily(Bytes.toBytes("base_info"));
		ResultScanner scanner = table.getScanner(scan);
		for(Result r : scanner){
			/**
			for(KeyValue kv : r.list()){
				String family = new String(kv.getFamily());
				System.out.println(family);
				String qualifier = new String(kv.getQualifier());
				System.out.println(qualifier);
				System.out.println(new String(kv.getValue()));
			}
			*/
			//直接从result中取到某个特定的value
			byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
			System.out.println(new String(value));
		}
		table.close();
	}
	
	@Test
	public void testDel() throws Exception{
		HTable table = new HTable(conf, "user");
		Delete del = new Delete(Bytes.toBytes("rk0001"));
		del.deleteColumn(Bytes.toBytes("data"), Bytes.toBytes("pic"));
		table.delete(del);
		table.close();
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
//		conf.set("hbase.zookeeper.quorum", "weekend05:2181,weekend06:2181,weekend07:2181");
		HBaseAdmin admin = new HBaseAdmin(conf);
		
		TableName tableName = TableName.valueOf("person_info");
		HTableDescriptor td = new HTableDescriptor(tableName);
		HColumnDescriptor cd = new HColumnDescriptor("base_info");
		cd.setMaxVersions(10);
		td.addFamily(cd);
		admin.createTable(td);
		admin.close();
	}
}

Hive

  • 什么是Hive

  Hive是建立在Hadoop上的数据仓库基础架构。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。Hive定义了简单的类SQL查询语言,称为HQL,它允许熟悉SQL的用户查询数据。同时,这个语言也允许熟悉MapReduce开发的开发者开发自定义的mapper和reducer来处理内建的mapper和reducer无法完成的复杂的分析工作。

  Hive是SQL解析引擎,它将SQL语句转译成M/R Job,然后在Hadoop执行

  Hive的表其实就是HDFS的目录/文件,按表名把文件夹分开。如果是分区表,则分区值是子文件夹,可以直接在M/R Job里使用这些数据。

  夸张点说,Hive就是一个翻译工具,将SQL语句翻译成MapReduce程序去执行。当然,Hive不是一个关系型数据库、不是一个设计用于联机事务处理(OLTP)、不是用于实时查询和行级更新的,它是专为OLAP设计的。

  • 为什么选择Hive
  • 基于Hadoop的大数据的计算/扩展能力
  • 支持SQL Like查询语言
  • 统一的元数据管理
  • 简单编程
  • Hive的系统架构

Hive1

  这是一个简版的Hive架构图,该组件组件架构中包含了不同的单元:

单元名称 操作
用户接口 Hive是一个数据仓库基础工具软件,可以创建用户和HDFS之间互动。Hive支持Hive的Web UI,Hive命令行等用户界面
元存储 Hive选择各自的数据库服务器,用以储存表,数据库,列模式或元数据表,它们的数据类型可以是HDFS文件的映射。
HiveQL HiveQL处理引擎,就是将用户编写的SQL语言解析、编译成MapReduce程序进行处理
执行引擎 HiveQL处理引擎和MapReduce的结合部分是由Hive执行引擎。执行引擎处理查询并产生结果和MapReduce的结果一样。它采用MapReduce方法。
HDFS或HBase Hadoop的分布式文件系统或者HBASE数据存储技术是用于将数据存储到文件系统。

Hive

  这是一张更为详细的Hive架构图,从此图中可以看出,Hadoop和mapreduce是hive架构的根基。Hive架构包括如下组件:CLI(Command line interface)、JDBC/ODBC、Thrift Server、WEB GUI、metastore和Driver(Complier、Optimizer和Executor),这些组件可以分为两大类:服务端组件和客户端组件。

  服务端组件:

  Driver组件:该组件包括Complier、Optimizer和Executor,它的作用是将我们写的HiveQL(类SQL)语句进行解析、编译优化,生成执行计划,然后调用底层的mapreduce计算框架。生成的查询计划存储在HDFS中,并在随后有MapReduce调用执行。

  Metastore组件:元数据服务组件,这个组件存储hive的元数据,hive的元数据存储在关系数据库里,hive支持的关系数据库有derby、mysql。元数据对于hive十分重要,因此hive支持把metastore服务独立出来,安装到远程的服务器集群里,从而解耦hive服务和metastore服务,保证hive运行的健壮性。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。

  Thrift服务:Thrift是facebook开发的一个软件框架,它用来进行可扩展且跨语言的服务的开发,Hive集成了该服务,能让不同的编程语言调用Hive的接口。

  客户端组件:

  CLI:Command Line Interface,命令行接口,即Hive Shell命令行

  JDBC/ODBC:连接MySQL/Oracle数据库的驱动接口

  Thrift客户端:上面的架构图里表明Thrift客户端,但是Hive架构的许多客户端接口是建立在Thrift客户端之上,包括JDBC和ODBC接口。

  WEBGUI:Hive客户端提供了一种通过网页的方式访问Hive所提供的服务。这个接口对应Hive的HWI(Hive Web Interface)组件,使用前要启动HWI服务。

  • Hive的执行流程

HiveExecutor

  此图很明了的表明了Hive的执行流程。Hive的数据存储在HDFS上,大部分的查询由MapReduce完成(包含*的查询,比如select * from table不会生成MapReduce任务)。Driver调用编译器(compiler)处理HiveQL字串时,这些字串可能是一条DDL、DML或查询语句,编译器将字串转化为策略(plan),策略仅由元数据操作和HDFS操作组成,元数据操作只包含DDL语句,HDFS操作只包含LOAD语句,对插入和查询而言,策略由map-reduce任务中的有向无环图(DAG)组成。

  • Hive的数据模型

  Hive的metastore:metastore是Hive元数据的集中存放地。metastore默认使用内嵌的derby数据库作为存储引擎,但是derby引擎有个缺点,就是一次只能1打开一个会话,也就是在哪一个目录下启动的Hive Shell进行操作,这些操作后的数据都会存储在这个目录下的derby_db文件夹下,但是如果下次在另外一个目录启动Hive,就又是一个新的derby_db,所以这不适合多用户访问,所以一般都会使用MySQL作为外置存储引擎,以方便多用户同时访问。要使用MySQL作为存储引擎,需要将mysql的连接驱动包放到Hive安装目录的jar包目录下,然后hive-site.xml中配置对应的信息,然后初始化。

  Hive的数据存储基于Hadoop HDFS,没有专门的数据存储格式,存储结构主要包括:数据库、文件、表、视图,Hive默认可以直接加载文本文件(TextFile),还支持sequence file、RC file,在创建Hive表的时候,只要指定Hive数据的列分隔符与行分隔符,Hive就可以解析数据

  Hive中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性:列分隔符(通常为空格、”\t”、”\x001”)、行分隔符(“\n”)以及读取文件数据的方法(Hive中默认有三个文件格式TextFile,SequenceFile以及RCFile)。由于在加载数据的过程中不会对数据本身进行任何修改,而只是将数据内容复制或移动到相应的HDFS目录中。而在数据库中,不同的数据库有不同的存储引擎,定义了自己的数据格式,所有数据都会按照一定的组织存储,因此,数据库加载数据的过程会比较耗时。

  由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不支持对数据的改写和添加,所有的数据都是加载的时候确定好的。Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。

  Hive的数据类型:

  · 基本数据类型:tinyint/smallint/int/bigint/float/double/boolean/string

  · 复杂数据类型:Array/Map/Struct,没有date/datetime类型

  1. 内部表

  Hive的内部表与数据库中的Table在概念上是类似的。每一个Table在Hive中都有一个相应的目录存储数据。例如在Hive Shell中创建一个test表,载入了一些数据,它在HDFS中的路径就是/hdp/test,其中hdp是在hive-site.xml中由 ${hive.metastore.warehouse.dir} 指定的数据仓库的目录,所有的Table数据(不包括External Table)都会保存在这个目录中,也就是说载入表数据的时候,即使指定其他路径下的数据,但是只要不是创建的外部表(External Table),Hive就会将数据移动到指定的数据仓库路径对应表的目录下。删除表时,元数据与数据都会被删除。

内部表简单示例:

创建数据文件:test_inner_table.txt
创建表:create table test_inner_table (key string)
加载数据(LOCAL指定数据存放在HDFS本地)LOAD DATA LOCAL INPATH filepath INTO TABLE test_inner_table
查看数据:select * from test_inner_table;  select count(*) from test_inner_table
删除表:drop table test_inner_table

create table trade_detail(id bigint, account string, income double, expenses double, time string) row format delimited fields terminated by '\t';
  1. 外部表

  外部表是指向已经在HDFS文件系统中存在的数据,可以创建Partition。它和内部表在元数据的组织上是相同的,而实际数据的存储则有较大的差异。内部表的创建过程和数据加载过程这两个过程可以分别独立完成也可以在同一个语句中完成,在加载数据过程中,实际数据会被移动到数据仓库目录中;之后对数据的访问将会直接在数据仓库目录中完成。删除表时,表中的数据和元数据将会被同时删除。而外部表只有一个过程,加载数据和创建表同时完成(CREATE EXTERNAL TABLE … LOCATION …),实际数据是存储在LOCATION后面指定的HDFS路径中,并不会移动到数据仓库目录中。当删除一个External Table时,仅删除该链接而已。

示例:

创建数据文件:test_external_table.txt
创建表:create external table test_external_table (key string)
加载数据:LOAD DATA INPATH filepath INTO TABLE test_inner_table
查看数据:select * from test_external_table;  select count(*) from test_external_table
删除表:drop table test_external_table

CREATE EXTERNAL TABLE page_view
( viewTime INT, 
  userid BIGINT,
  page_url STRING, 	
 referrer_url STRING, 							
  ip STRING COMMENT 'IP Address of the User',
  country STRING COMMENT 'country of origination‘
)
    COMMENT 'This is the staging page view table'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES 	TERMINATED BY '12'
    STORED AS TEXTFILE
    LOCATION 'hdfs://centos:9000/user/data/staging/page_view';
  1. 分区表

  Partition对应于数据库中的Partition列的密集索引,但是Hive中Partition的组织方式和数据库中的很不相同。在Hive中,表中一个Partition对应于HDFS下的一个目录,所有的Partition的数据都存储在对应的目录中。例如test表中包含ds和city两个Partition,则对应于ds = 20180330,city = China的HDFS子目录为 /hdp/test/ds=20180330/city=China;对应与ds = 20180401,city = US的HDFS子目录为/hdp/test/ds=20180401/city/US。可以使用任意字段作为分区,即使表中没有定义的字段。这样就可以只针对某一分区来做查询等操作,而不需要查询整个表。

示例:

创建数据文件:test_partition_table.txt
创建表:create table test_partition_table (key string) partitioned by (dt string)
加载数据:LOAD DATA INPATH filepath INTO TABLE test_partition_table partition (dt=2018)
查看数据:select * from test_partition_table;  select count(*) from test_partition_table
删除表:drop table test_partition_table

CREATE TABLE tmp_table #表名
(
title   string, # 字段名称 字段类型
minimum_bid     double,
quantity        bigint,
have_invoice    bigint
)COMMENT '注释:XXX' #表注释
PARTITIONED BY(pt STRING) #分区表字段(如果你文件非常之大的话,采用分区表可以快过滤出按分区字段划分的数据)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'   #字段是用什么分割开的
STORED AS SEQUENCEFILE; #用哪种方式存储数据,SEQUENCEFILEhadoop自带的文件压缩格式
  1. 桶表

  Buckets是将表的列通过Hash算法进一步分解成不同的文件存储。它对指定列计算hash,根据hash值切分数据,目的是为了并行,每一个Buckets对应一个文件。例如将user列分散至32个bucket,首先对user列的值计算hash,对应hash值为0的HDFS目录为/hdp/test/ds=20180330/city=China/part-00000;hash值为20的HDFS目录为/hdp/test/ds=20180401/city=US/part-00020.如果想应用很多的Map任务,这种表结构是个不错的选择。

简单示例:

创建数据文件:test_bucket_table.txt
创建表:create table test_bucket_table (key string) clustered by (key) into 20 buckets
加载数据:LOAD DATA INPATH filepath INTO TABLE test_bucket_table
查看数据:select * from test_bucket_table;  set hive.enforce.bucketing = true;
  1. Hive表结构

HiveTable

  有兴趣的可以去研究一下每一个表所代表的意思,这里就只了解了几个:

  TBLS:即Tables,记录的Hive中创建的表名,不同表在HDFS中为对应数据库下的一个文件夹,表中的数据之类的都会在此表文件下

TBLS

  其中:TBL_ID是表示创建的第几个表(包括删除了的);TBL_TYPE是表类型,有MANAGED_TABLE和EXTERANL_TABLE,默认是MANAGED_TABLE类型。具体表中有哪些字段在COLUMNS表中,每个字段有创建时的顺序。

  DBS:即DataBases,记录Hive中创建了哪些数据库,有个默认的default数据库

DBS

  可以看到这里指出了这个数据库的各种文件存放的位置,默认的会将数据库文件直接放在warehouse文件夹下,自己创建的数据库会在warehouse目录下有一个数据库名.db的文件夹,在那个数据库中创建的各种信息会反正这个文件夹下。数据以 xxx.data 的命名存放在表名的文件夹下,所以,如果我们自己将符合格式的数据以这种命名存放存放到这个表文件夹中,在Hive中的这个表中也能查到这些数据,即使有的文件的数据缺字段,也能查到,会在缺的字段显示NULL,且NULL只会显示在最后,也就是说缺字段的会导致显示的时候字段与其他对应不上,多的字段会被丢弃不显示

  如果有分区,则其信息可以在PARTITION开头的表中查找,有设置索引,则在INDEX开头的表中查找相关信息,还有一些数据清洗的参数可以在SKEWED开头的表中查询等。

  因为Hive查询是调用MapReduce来执行的,启动MapReduce会花一些时间,所以这个不适合小数据的查询,比较时候那种大数据量的查询。

  • UDF 用户自定义函数

  UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容,编写UDF函数的时候需要注意以下几点:

  1. 自定义UDF需要继承org.apache.hadoop.hive.ql.UDF
  2. 需要实现evaluate函数,evaluate函数支持重载

  使用步骤:

  1. 把程序打包放到目标机器上去
  2. 进入Hive客户端,添加jar包:hive>add jar /hadoop/Downloads/udf_test.jar
  3. 创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS ‘hive.udf.Add’;

  4. 使用函数查询HQL语句: ```sql #显示所有函数 hive>show functions;

#查看函数用法 hive>describe function substr;

SELECT add_example(8, 9) FROM scores; SELECT add_example(scores.math, scores.art) FROM scores; SELECT add_example(6, 7, 8, 6.8) FROM scores;

5. 销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;

注意:UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF

示例:
```java
package cn.zhxiqi.bigdata;
import java.util.HashMap;
import org.apache.hadoop.hive.ql.exec.UDF;

public class PhoneNbrToArea extends UDF{
	private static HashMap<String, String> areaMap = new HashMap<>();
	static {
		areaMap.put("1388", "beijing");
		areaMap.put("1399", "tianjin");
		areaMap.put("1366", "nanjing");
	}
	//一定要用public修饰才能被hive调用
	public String evaluate(String pnb) {
		String result  = areaMap.get(pnb.substring(0,4))==null? (pnb+"    huoxing"):(pnb+"  "+areaMap.get(pnb.substring(0,4)));			
		return result;
	}
}

利用Java来编写Hive查询示例:

Hive远程服务启动
#hive --service hiveserver >/dev/null  2>/dev/null &

JAVA客户端相关代码
Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
Connection con = DriverManager.getConnection("jdbc:hive://192.168.1.102:10000/wlan_dw", "", "");
Statement stmt = con.createStatement();
String querySQL="SELECT * FROM wlan_dw.dim_m order by flux desc limit 10";

ResultSet res = stmt.executeQuery(querySQL);  

while (res.next()) {
System.out.println(res.getString(1) +"\t" +res.getLong(2)+"\t" +res.getLong(3)+"\t" +res.getLong(4)+"\t" +res.getLong(5));
}
  • Hive的安装
  1. 配置环境变量,安装MySQL
  2. 配置Hive

主要是配置hive-site.xml文件

只留以下内容便可以跑起来:
<property>
	<name>javax.jdo.option.ConnectionURL</name>
	<value>jdbc:mysql://weekend01:3306/hive?createDatabaseIfNotExist=true</value>
	<description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
	<name>javax.jdo.option.ConnectionDriverName</name>
	<value>com.mysql.jdbc.Driver</value>
	<description>Driver class name for a JDBC metastore</description>
</property>

<property>
	<name>javax.jdo.option.ConnectionUserName</name>
	<value>root</value>
	<description>username to use against metastore database</description>
</property>

<property>
	<name>javax.jdo.option.ConnectionPassword</name>
	<value>root</value>
	<description>password to use against metastore database</description>
</property>
  1. 安装Hive和MySQL完成后,将MySQL的连接jar包拷贝到$HIVE_HOME/lib目录下,如果出现没有权限问题,在MySQL里授权下
    mysql -uroot -p
    #(执行下面的语句  *.*:所有库下的所有表   %:任何IP地址或主机都可以连接)
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123' WITH GRANT OPTION;
    #刷新权限
    FLUSH PRIVILEGES;
    
  • Hive QL

与SQL语法很类似,所以只展示一下,不解释了,详情可以参考这里Apache Hive LanguageManual

  • Hive和普通关系数据库的异同
  Hive RDBMS
查询语言 HQL SQL
数据存储 HDFS Raw Device or Local FS
索引
执行 MapReduce Excutor
执行延迟
处理数据规模

未解决:hive QL 什么情况下才会转换为MapReduce程序????

#一些相关指令
SHOW TABLES; # 查看所有的表
SHOW TABLES '*TMP*'; #支持模糊查询
SHOW PARTITIONS TMP_TABLE; #查看表有哪些分区
DESCRIBE TMP_TABLE; #查看表结构

set hive.cli.print.header=true;

CREATE TABLE page_view(viewTime INT, userid BIGINT,
     page_url STRING, referrer_url STRING,
     ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
STORED AS SEQUENCEFILE;   TEXTFILE

//sequencefile
create table tab_ip_seq(id int,name string,ip string,country string) 
    row format delimited
    fields terminated by ','
    stored as sequencefile;
insert overwrite table tab_ip_seq select * from tab_ext;

//create & load
create table tab_ip(id int,name string,ip string,country string) 
    row format delimited
    fields terminated by ','
    stored as textfile;
load data local inpath '/home/hadoop/ip.txt' into table tab_ext;

//external
CREATE EXTERNAL TABLE tab_ip_ext(id int, name string,
     ip STRING,
     country STRING)
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 STORED AS TEXTFILE
 LOCATION '/external/hive';
 
// CTAS  用于创建一些临时表存储中间结果
CREATE TABLE tab_ip_ctas
   AS
SELECT id new_id, name new_name, ip new_ip,country new_country
FROM tab_ip_ext
SORT BY new_id;

//insert from select   用于向临时表中追加中间结果数据
create table tab_ip_like like tab_ip;

insert overwrite table tab_ip_like
    select * from tab_ip;

//CLUSTER 这个比较高级
create table tab_ip_cluster(id int,name string,ip string,country string)
clustered by(id) into 3 buckets;

load data local inpath '/home/hadoop/ip.txt' overwrite into table tab_ip_cluster;
set hive.enforce.bucketing=true;
insert into table tab_ip_cluster select * from tab_ip;

select * from tab_ip_cluster tablesample(bucket 2 out of 3 on id); 

//PARTITION
create table tab_ip_part(id int,name string,ip string,country string) 
    partitioned by (part_flag string)
    row format delimited fields terminated by ','; 
load data local inpath '/home/hadoop/ip.txt' overwrite into table tab_ip_part
     partition(part_flag='part1'); 
load data local inpath '/home/hadoop/ip_part2.txt' overwrite into table tab_ip_part
     partition(part_flag='part2');

select * from tab_ip_part;

select * from tab_ip_part  where part_flag='part2';
select count(*) from tab_ip_part  where part_flag='part2';


alter table tab_ip change id id_alter string;
ALTER TABLE tab_cts ADD PARTITION (partCol = 'dt') location '/external/hive/dt';

show partitions tab_ip_part;

//write to hdfs
insert overwrite local directory '/home/hadoop/hivetemp/test.txt' select * from tab_ip_part where part_flag='part1';    
insert overwrite directory '/hiveout.txt' select * from tab_ip_part where part_flag='part1';

//array 
create table tab_array(a array<int>,b array<string>)
row format delimited
fields terminated by '\t'
collection items terminated by ',';

示例数据
tobenbrone,laihama,woshishui     13866987898,13287654321
abc,iloveyou,itcast     13866987898,13287654321

select a[0] from tab_array;
select * from tab_array where array_contains(b,'word');
insert into table tab_array select array(0),array(name,ip) from tab_ext t; 

//map
create table tab_map(name string,info map<string,string>)
row format delimited
fields terminated by '\t'
collection items terminated by ';'
map keys terminated by ':';

示例数据:
fengjie			age:18;size:36A;addr:usa
furong	    age:28;size:39C;addr:beijing;weight:180KG

load data local inpath '/home/hadoop/hivetemp/tab_map.txt' overwrite into table tab_map;
insert into table tab_map select name,map('name',name,'ip',ip) from tab_ext; 

//struct
create table tab_struct(name string,info struct<age:int,tel:string,addr:string>)
row format delimited
fields terminated by '\t'
collection items terminated by ','

load data local inpath '/home/hadoop/hivetemp/tab_st.txt' overwrite into table tab_struct;
insert into table tab_struct select name,named_struct('age',id,'tel',name,'addr',country) from tab_ext;

//cli shell
hive -S -e 'select country,count(*) from tab_ext' > /home/hadoop/hivetemp/e.txt  
有了这种执行机制,就使得我们可以利用脚本语言(bash shell,python)进行hql语句的批量执行

select * from tab_ext sort by id desc limit 5;

select a.ip,b.book from tab_ext a join tab_ip_book b on(a.name=b.name);

//UDF 用户自定义函数
select if(id=1,first,no-first),name from tab_ext;

hive>add jar /home/hadoop/myudf.jar;
hive>CREATE TEMPORARY FUNCTION my_lower AS 'org.dht.Lower';
select my_upper(name) from tab_ext;  
  • Hive与传统数据库以及HBase的比较

Comp

  Hive vs HBase

  • 区别:
  1. HBase:Hadoop database的简称,也就是基于Hadoop数据库,是一种NoSQL数据库,主要适用于海量数据的随机快速查询,如日志明细、交易清单、轨迹行为等。
  2. Hive:Hive是Hadoop数据仓库,严格来说,不是数据库,主要是让开发人员能够通过SQL来计算和处理HDFS上的结构化数据,使用于离线的批量数据计算。通过元数据来描述HDFS上的结构化文本数据,通俗点来说,就是定义一张表来描述HDFS上的结构化文本,包括各列数据名称,数据类型是怎么样的等等,方便我们处理数据,当前很多SQL ON Hadoop的计算引擎均用的是Hive的元数据,如Spark SQL等。Hive会将HiveQL翻译为MapReduce来处理数据。
  • 关系:

  在大数据架构中,Hive和HBase是协作关系,数据流一般如下:

  1. 通过ETL工具将数据源抽取到HDFS存储;
  2. 通过Hive清洗、处理和计算原始数据;
  3. Hive清洗处理后的结果,如果是面向海量数据随机查询场景的可存入HBase
  4. 数据应用从HBase查询数据;

HiveVSHbase

together

  1. MapReduce程序计算KPI
  2. HBASE详单查询
  3. HIVE数据仓库多维分析

更加详细的内容可以参考hbase和hive的差别是什么,各自适用在什么场景中?

storm & spark

Storm

什么是Apache Storm

  Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache Zookeeper管理分布式环境和集群状态。它可以并行的对实时数据执行各种操作。在数据处理时间和方式上,Storm与Hadoop MapReduce基本上是两个对立面,而这两个技术具备整合可能性极大程度该归结于YARN这个集群管理层。一般Storm都会结合一个消息队列组件和数据库来配合使用。它自己不存储任何数据。一般消息队列做其数据源,数据库做其数据结果输出目的地。通俗的来说,storm流处理就比如是一个自来水工厂,原始的水进来,经过大分子沉淀,然后在进入下一步骤进行消毒等等,最后输出。

  无状态:没有上下文关系,第二次访问不会记得前一次访问的数据,即每次访问都是独立的,它的访问结果与前面的访问是无直接关系的,不会受前面的请求应答情况直接影响,也不会直接影响后面的请求应答情况。叫做 人生若只如初见???

Storm的特点

  Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理数据流。被称作“实时的Hadoop”。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个节点每秒可以处理数以百万计的消息)。Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程语言来开发应用。

  Storm集群主机之间并发,主机之内有多进程的并发,进程之内有多线程的并发,线程可执行多个task实例。

stormm

编程模型简单

  在大数据处理方面Hadoop大家应该都耳熟能详,基于Google Map/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常简单和优美。同样,Storm也为大数据的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助开发人员快速、高效的开发应用。

可扩展

  在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可以创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,开发过程中的spout、bolt就是作为一个或者多个任务的方式执行的。因此,计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。

高可靠性

  Storm可以保证spout发出的每条消息都能被“完全处理”,这也是直接区别于其他实时系统的地方。spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。

高容错性

  如果在消息处理过程中出了一些异常,Storm会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。当然,如果处理单元中存储了中间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。

Storm集群与Hadoop集群

  Storm集群和Hadoop集群表面上看很类似。Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology);Hadoop擅长分布式离线批处理,而Storm设计为支持分布式实时计算;Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中;Hadoop新的spark组件提供了在Hadoop平台上运行storm的可能性。

Topology 与 MapReduce

  一个关键的区别是:一个MapReduce job最终会结束,而一个topology永远会运行(除非手动kill)

Nimbus 与 ResoureManager

  在Storm的集群里面有两种节点:控制节点(Master Node)和工作节点(Worker Node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。

Supervisor(Worker进程)与NodeManager(YarnChild)

  每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。

结构 Hadoop Storm
主节点 JobTracker Nimbus
从节点 TaskTracker Supervisor
应用程序 Job Topology
工作进程名称 Child Worker
计算模型 Map/Reduce Spout/Bolt

Hadoop&Storm

Storm体系架构

StormOne

Storm中的Nimbus和Supervisor

  Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面,要么在本地磁盘上。这也就意味着你可以用kill -9 来强制杀死Numbus和Supervisor进程,然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常稳定。

Storm的基本概念

  在深入了解Storm之前,需要了解以下这些概念:

  Topologies:拓扑,也俗称一个任务

  Spouts:拓扑的消息源

  Bolts:拓扑的处理逻辑单元

  Tuple:消息元组

  Streams:流

  Stream groupings:流的分组策略

  Tasks:任务处理单元

  Executor:工作线程

  Workers:工作进程

  Configuration:topology的配置

Storm集群架构

  Storm集群也是采用的主从架构,主节点是Nimbus,从节点是Supervisor,有关调度相关的信息存储在Zookeeper集群中,架构如下所示:

Storm

Nimbus

  Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。

Supervisor

  Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。

Zookeeper

  用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology。Nimbus会第一时间感知到,并重新分配Topology到其他可用的Supervisor上运行。

Storm2

  一个Topology的Spout/Bolt对应多个Task可能分布在多个Supervisor的多个Worker内部。而每个Worker内部又存在多个Executor,根据实际对Topology的配置在运行时进行计算并分配。

  • Topology:Storm对一个分布式计算应用程序的抽象,目的是通过一个实现Topology能够完整地完成一件事情(从业务角度来看)。一个Topology是由一组静态程序组件(Spout/Bolt)、组件关系Streaming Groups这两部分组成。换句话说,topology是storm中运行的一个实时应用程序的名称。将Spout、Bolt整合起来的拓扑图。定义了spout和bolt的结合关系、并发数量、配置等等。
  • Spout:描述了数据是如何从外部系统(或者组件内部直接产生)进入到Storm集群,并由该Spout所属的Topology来处理,通常是从一个数据源读取数据,也可以做一些简单的处理(为了不影响数据连续地、实时地、快速地进入到系统,通常不建议把复杂处理逻辑放在这里去做)。也就是说,在一个topology中,spout是获取源数据流的组件(一个topology里面的消息生产者),通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据(tuple消息)。spout可以是可靠的也可以是不可靠的:如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发送一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。消息源可以发送多条消息流stream:使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发送指定的stream。
  • Bolt:这里是描述与业务相关的处理逻辑,所有的消息处理逻辑被封装在bolts里面。bolt作为接收数据然后执行处理的组件,用户可以在其中执行自己想要的操作。Bolts可以做很多事情,如过滤数据、聚合数据、查询数据等,它也可以简单的做消息流的传递,也可以通过多级Bolts的组合来完成复杂的消息流处理,如求TopN等。Bolts可以发送多条消息流:使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发送的stream。Bolts的主要方法是execute,它以一个tuple作为输入,使用OutputCollector来发送tuple,通过调用OutputCollector的ack方法,以通知这个tuple的发射者spout。Bolts一般的流程:处理一个输入tuple,发射0个或多个tuple,然后调用ack通知storm自己已经处理过这个tuple(storm提供了一个IBasicBolt会自动调用ack)。

  在我们编写完成一个Topology之后,上面的组件都以静态的方式存在,在提交Topology运行以后,会产生下面的动态组件:

Task

  worker中每一个spout/bolt的线程称为一个task,也就是Spout/Bolt在运行时所表现出来的实体,都称为Task,一个Spout/Bolt在运行时可能对应一个或多个Spout/Bolt Task,与实际在编写Topology的时候进行的配置有关。在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。

  每一个spout和bolt会被当作很多task在整个集群里执行,每一个executor对应到一个线程,在这个线程上运行多个task,stream grouping则是定义怎么从一堆task发送tuple到另一堆task的,可以调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个task)

Worker

  运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。所以,Worker是运行时Task所在的一级容器,Executor运行于Worker中,一个Worker对应于Supervisor上创建的一个JVM实例。

  一个topology可能会在一个或多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程大约会处理其中的6个tasks,storm会尽量均匀的分配工作给所有的worker。

worker

Executor

  Executor是运行时Task所在的直接容器,在Executor中执行Task的处理逻辑,一个或多个Executor实例可以运行在同一个Worker进程中,一个或多个Task可以运行于同一个Executor中,在Worker进程并行的基础上,Executor可以并行,进而Task也能够基于Executor实现并行计算。

Topology

  如上图,这整个流程是一个Topology,而storm在运行中可以分为spout与bolt两个组件,其中,spout从外部读取数据,类似于Hadoop的inputFormat一样,数据源从spout开始,数据以tuple的方式发送到bolt,多个bolt可以串连起来,一个bolt也可以接入多个spout/bolt。Spout和Bolt的作用前面解释过了。

Tuple:是一次消息传递的基本单元,理解为一组消息就是一个Tuple。

Stream:Tuple的集合,表示数据的流向。

Storm的Topology

  在Storm中,一个实时应用的计算任务被打包作为Topology发布,这与Hadoop的MapReduce任务相似。但是有一点不同的是,在Hadoop中,MapReduce任务最终会执行完成后结束,而在Storm中,Topology任务一旦提交后永远不会结束,除非认为的去停止任务。计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。一个Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:

  (1)Worker(作为一个进程)

  (2)Executor(作为一个线程)

  (3)Task

下图简要描述了这3者之间的关系:

Topology2

  1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务的情况)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt),因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成。

  executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task,task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例。

  task是最终运行spout或bolt中代码的单元,其中一个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法。topology启动后,1个component(spout或bolt)的数目是固定不变的,但该component使用的executor线程数可以动态调整,如1个executor线程可以执行该component的1个或多个task实例。这就说明对于1个component存在线程数小于等于task数目这种条件。默认情况下task数目等于executor线程数目,也就是1个executor线程只运行1个task。

总体的topology处理流程图如下:

Topology3

storm集群中,依旧是使用zookeeper作为监控各个组件是否正常运作的模块,Nimbus、Supervisor和Worker都会向zookeeper汇报心跳,来证明它的存活,Nimbus与Supervisor之间没有直接的交互,状态都是保存在zookeeper上。

storm3

Storm的流数据分组策略

grouping

  一个topology是spouts和bolts组成的图,通过stream groupings将spouts和bolts连接起来。

Storm中的Stream

  消息流stream是storm里的关键抽象,一个消息流是一个没有边界的tuple序列,而这些tuple序列会以一种分布式的方式并行地创建和处理,通过对stream中tuple序列中每个字段命名来定义stream,在默认的情况下,tuple的字段类型可以是:integer、long、short、byte、string、double、float、boolean和byte array,当然也可以自定义类型,只要实现相应的序列化器。

stream

Storm中的Stream groupings

  定义一个topology的关键的一个步骤是定义每个bolt接收什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配数据给bolts的,Storm里面有7种类型的stream grouping策略:

Shuffle Grouping —— 随机分组,随机派发stream里面的tuple,Storm保证每个bolt接收到的tuple数目大致相同

Shuffle Grouping

Fields Grouping —— 按字段分组,比如按userid来分组,具有相同userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

Field Grouping

All Grouping —— 广播发送,对于每一个tuple,所有的bolts都会收到

All Grouping

Global Grouping —— 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

Global Grouping

Non Grouping —— 不分组,这个分组的意思是表示stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle Grouping是一样的效果,有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

Direct Grouping —— 直接分组,指定分组,由tuple的发送单元直接决定tuple将发给哪个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发送的tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发送tuple。消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)

Local or shuffle grouping —— 如果目标bolt有一个或多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。

Own Stream Grouping —— 当然,Storm还提供了用户自定义Streaming Grouping接口,如果上述Streaming Grouping都无法满足实际的业务需求,我们也可以自己实现一个分组策略,只需要实现backtype.storm.grouping.CustomStreamGrouping接口,该接口重定义了如下方法:List chooseTasks(int taskid, List values)。

storm的消息可靠性处理机制

  Storm内部通过一种巧妙的异或算法判断每个tuple是否被正确完整的处理。

  1. Spout的一个Task创建一个Tuple时,也就是在Spout的nextTuple()方法中实现从特定数据源读取数据的处理逻辑中,会与Acker进行通信,向Acker发送消息,Acker保存该Tuple对应的信息.

  2. Bolt在emit一个新的子Tuple时,会保存与父Tuple的关系,在Bolt中进行ack时,会计算出父Tuple与由该父Tuple新生成的所有子Tuple的一个异或值,将该值发送给Acker(计算异或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ^ … ^ child-tuple-idN))。从这个公式中可以看出,Bolt并没有把所有生成的子Tuple发送给Acker,这要比发送一个异或值大的多,只发送一个异或值大大降低了Bolt与Acker之间网络通信的开销。

  3. Acker收到Bolt发送的异或值,与当前保存的task-id对应的初始ack-val做异或,tuple-id与ack-val相同,异或结果为0,但是子Tuple的child-tuple-id等并不相同,只有等所有的子Tuple的child-tuple-id都执行ack回来,最后ack-val就为0,表示整个Tuple树处理成功。无论成功与否,最后都要从Acker维护的队列中移除。

  4. 最后,Acker会向产生该原始父Tuple的Spout对应的Task发送通知,成功或失败,回调Spout的ack或fail方法。如果我们在实现Spout时,重写了ack和fail方法,处理回调就会执行这里的逻辑。

Tuple Tree

  这种异或算法存在1/2^64概率的误差,可以忽略不计。在开发中,对于那些不允许丢失的消息可以在发送时对tuple指定messageID并进行锚定,告诉Tuple Tree这里新增加了一个节点,保证消息的可靠性。继承BaseBasicBolt实现的API本身就是可靠的,不需要自己进行锚定发送和调用ack以及fail方法。

collector.emit(tuple, messageId); //可靠消息
collector.emit(tuple); //不可靠消息
collector.emit(tuple, new Values(word)); //锚定发送,可靠消息
collector.emit(new Values(word)); //非锚定发送,不可靠消息

Storm的容错机制

集群节点(机器):

  • Storm集群中的节点故障,此时Nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行
  • Zookeeper集群中的节点故障。Zookeeper保证少于半数的机器宕机系统仍可正常运行,及时修复故障机器即可。

任务槽(slot)故障:

  • Nimbus失败。Nimbus是无状态和快速失败的,因此Nimbus的失败不会影响当前正在运行的任务,但是当Nimbus失败时,无法提交新的任务,只要及时将它修复重启即可加入集群。
  • Supervisor失败。Supervisor是无状态(所有的状态都保存在Zookeeper或磁盘上)和快速失败(每当遇到任何意外的情况,进程自动毁灭)的,因此Supervisor的失败不会影响当前正在运行的任务,只要及时将他们重新启动即可。
  • Worker失败。每个Worker中包含数个Bolt(或Spout)任务。Supervisor负责监控这些任务,当worker失败后会尝试在本机重启它,如果在它启动时连续失败了一定的次数,无法发送心跳信息到Nimbus,Nimbus将在另一台主机上重新分配worker。

任务级容错:

  • Bolt任务崩溃(crash)引起的消息未被应答。此时,acker中所有与此Bolt任务关联的消息都会因为超时而失败,对应的Spout的fail方法将被调用。
  • acker任务失败。如果acker任务本身失败了,它在失败之前持有的所有消息都将超时而失败。Spout的fail方法将被调用。
  • Spout任务失败。在这种情况下,与Spout任务对接的外部设备(如消息队列、数据库等)负责消息的完整性。例如,当客户端异常时,kestrel队列会将处于pending状态的所有消息重新放回队列中。

Topology并行度设置

以字母大小写转换并添加后缀的例子来解释如何设置。

设置代码:

TopologyBuilder builder = new TopologyBuilder();

//将我们的spout组件设置到topology中去,第三个参数为设置并发度为4,即用4个excutor来执行这个组件
//.setNumTasks(8) 设置的是该组件执行时的并发task数量,也就是意味着1个excutor会运行2个task
builder.setSpout("randomspout", new RandomWordSpout(), 4);

//将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
//.shuffleGrouping("randomspout") 指定分组策略为随机分组,分组策略很多,包含两层含义:
//1.upperbolt组件接收的tuple消息一定来自于randomspout组件
//2.randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组
builder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout");

//将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
builder.setBolt("suffixbolt", new SuffixBolt(),4).shuffleGrouping("upperbolt");

//用builder来创建一个topology
StormTopology demotop = builder.createTopology();

//配置一些topology在集群中运行时的参数
Config conf = new Config();
//这里设置的是整个topology所占用的槽位数,也就是worker的数量(整个集群)
conf.setNumWorkers(4);
//设置日志输出
conf.setDebug(true);
//设置事务,0表示不需要事务
conf.setNumAckers(0);

//将这个topology提交给storm集群运行
StormSubmitter.submitTopology("demotopo", conf, demotop);

那么,下面我们看Storm是如何计算一个Topology运行时的并行度,并分配到4个Worker中的:

  • 计算Task总数:41 + 41 + 4*1 = 12(总共创建12个Task实例)
  • 计算运行时Topology并行度:12/4=3(每个Worker对应3个Executor)
  • 将12个Task分配到4个Worker中的3*4个Executor中:应该是每个Worker上3个Executor,将3个Task分配到3个Executor中
  • 每个Worker中分配3个Task,这里应该是3个相同类型的Task
  • Storm内部优化:会把同类型的Task尽量放到同一个Executor中运行
  • 分配过程:从Task个数最少的开始,这里的设置是每个Executor分配一个Task

Storm编程接口

  • Spouts

  Spout是Stream的消息产生源,Spout组件的实现可以通过继承BaseRichSpout类或者其他 *Spout 类来完成,也可以通过实现IRichSpout接口来实现。需要根据情况实现Spout类中重要的几个方法有:

open方法:当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
	this._collector = collector; 
}

getComponentConfiguration方法:此方法用于声明针对当前组件的特殊的Configuration配置。示例如下:

public Map<String, Object> getComponentConfiguration() { 
      if(!_isDistributed) {
	Map<String, Object> ret = new HashMap<String, Object>(); 
 	ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 
 	return ret;
      } else {
	return null;
     }
}
//这里便是设置了Topology中当前Component的线程数量上限

nextTuple方法:这是Spout类中最重要的一个方法。发送一个Tuple到Topology都是通过这个方法来实现的。示例如下:

public void nextTuple() { 
     Utils.sleep(100); 
     final String[] words = new String[] {"twitter","facebook","google"}; 
     final Random rand = new Random(); 
     final String word = words[rand.nextInt(words.length)];
     _collector.emit(new Values(word));
} 
//这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology。

declareOutputFields方法:此方法用于声明当前Spout的Tuple发送流。Stream流的定义是通过OutputFieldsDeclare.declare方法完成的,其中的参数包含了发送的域Fields。示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
	declarer.declare(new Fields("word"));
 }

除了上述几个方法之外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法。这两个方法在BaseRichSpout等类中已经被隐式的实现了。

  • Bolts

  Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。Bolt类需要实现的主要方法有:

prepare方法:此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是在execute中。示例如下:

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
	 this. _collector = collector;
}

getComponentConfiguration方法:和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。示例如下:

public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> conf = new HashMap<String, Object>();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,      
     emitFrequencyInSeconds);
     return conf;
 } 
//此处定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。

execute方法:这是Bolt中最关键的一个方法。对于Tuple的处理都可以放到此方法中进行。具体的发送也是在execute中通过调用emit方法来完成的。

有两种情况,一种是emit方法中有两个参数,另一种是有一个参数。

(1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是往下游Bolt发送的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple,保证Tuple处理的可靠性。

declareOutputFields方法:此方法用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
 	declarer.declare(new Fields("obj", "count", 	"actualWindowLengthInSeconds"));
 } 
//此例说明当前Bolt类发送的Tuple包含了三个字段:"obj","count","actualWindowLengthInSeconds"

Topology运行机制

  1. Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件。

  2. 在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task执行。worker在哪个supervisor节点上运行是由storm本身决定的。

  3. 任务分配好之后,Nimbus节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,存储当前Topology的所有worker进程的心跳信息。

  4. Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行。

  5. 一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。除非手动结束Topology,否则此步会不间断的执行。

Topology中的Stream处理时的方法调用过程如下:

TheProcessOfTopology

有几点需要说明的地方:

  1. 每个组件(Spout或Bolt)的构造方法和declareOutputFields方法都只被调用一次。

  2. open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。

  3. nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发送Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。相当于线程的run方法。

  4. 在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节点)。在每一个任务上反序列化component。

  5. Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的

  6. 在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。

  • 终止Topology:通过在Nimbus节点利用如下命令来终止一个Topology的运行
./bin/storm kill topologyName

  kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失。

Storm安装部署

  部署Storm集群需要依次完成的安装步骤:

  1. 安装jdk

  2. 搭建zookeeper

  3. 安装Storm依赖库

  4. 下载并解压Storm发布版本

  5. 修改storm.yaml配置文件

  6. 启动Storm各个后台进程

  前面2步之前就已经安装好了,这里直接从Storm的环境开始讲起。Storm发行版解压目录下有一个conf/storm.yaml文件,用于配置Storm。conf/storm.yaml中的配置选项将覆盖defaults.yaml中的默认配置。

storm.zookeeper.root:Storm在zookeeper集群中的根目录,默认是"/"

topology.workers:每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖

storm.zookeeper.servers:Storm集群使用的Zookeeper集群地址,其格式如下:
-"111.222.333.444"
-"555.666.777.888"

#Nimbus所在的主机名
nimbus.host:"hdp-node-01"

#定义可以在本机上运行的工作进程的数量,每个工作进程分配一个通信端口,也就是Supervisor节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设定了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有就再次给新提交的Topology分配
supervisor.slots.ports:
    -6700
    -6701
    -6702
    -6703

如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项

storm.local.dir:Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给予足够的访问权限,然后在storm.yaml中配置该目录,如:storm.local.dir:"/home/hadoop/storm/workdir"

#Storm集群的UI地址端口号,默认是8080
ui.port

更加详细的配置信息可以参考此贴:Storm配置详解

Storm常用命令

  1. 启动nimbus后台程序,命令格式:storm nimbus

  2. 启动supervisor后台程序,命令格式:storm supervisor

  3. 启动ui服务,命令格式:storm ui

  4. 提交Topologies,命令格式:storm jar [jar路径] [拓扑包名.拓扑类名] [stormIP地址] [storm端口] [拓扑名称] [参数]

例如:storm jar /home/storm/storm-starter.jar storm.starter.WordCountTopology wordcountTop; 提交storm-starter.jar到远程集群,并启动wordcountTop拓扑。

  1. 停止Topologies,查看当前运行的yopo:storm list;然后使用 storm kill [拓扑名称] 停止此Topology

示例代码

package com.itheima.bigData.storm;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

//首先需要继承BaseRichSpout
public class RandomWordSpout extends BaseRichSpout{

	private SpoutOutputCollector collector;
	//模拟一些数据
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};

	//不断地往下一个组件发送tuple消息
	//这里面是该spout组件的核心逻辑
	@Override
	public void nextTuple() {
		//可以从Kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
		Random random = new Random();
		int index = random.nextInt(words.length);

		//通过随机数拿到一个商品名
		String goodName = words[index];

		//将商品名封装成tuple,发送消息给下一个组件,可以封装成流
		collector.emit(new Values(goodName));
		//每发送一个消息,休眠500ms
		Utils.sleep(500);
	}

	//初始化方法,在spout组件实例化时调用一次
	@Override
	public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
		// TODO Auto-generated method stub
		this.collector = collector;
	}

	//声明本spout组件发送出去的tuple中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//传递字段名称,可以传递多个,要和nextTuple中的emit里面的数据对应,可以声明为流
		declarer.declare(new Fields("orignname"));
	}

}

package com.itheima.bigData.storm;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class SuffixBolt extends BaseBasicBolt{

	private FileWriter filewriter = null;


	//在bolt组件运行过程中只会被调用一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			filewriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	//该bolt组件的核心处理逻辑
	//每收到一个tuple消息,就会被调用一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// 先拿到上一个组件发送过来的商品名称
		String upper_name = tuple.getString(0);
		String suffix_name = upper_name + "_itisok";


		//为上一个组件发送过来的商品名称添加后缀
		try {
			filewriter.write(suffix_name);
			filewriter.write("\n");
			filewriter.flush();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	//本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub

	}

}

package com.itheima.bigData.storm;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

/**
 * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
 * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止的执行,除非人为或异常退出
 * @author Ace Z.Swift
 *
 */
public class TopoMain {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder = new TopologyBuilder();

		//将我们的spout组件设置到topology中去,第三个参数为设置并发度为4,即用4个excutor来执行这个组件
		//.setNumTasks(8) 设置的是该组件执行时的并发task数量,也就是意味着1个excutor会运行2个task
		builder.setSpout("randomspout", new RandomWordSpout(), 4);

		//将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
		//.shuffleGrouping("randomspout") 指定分组策略为随机分组,分组策略很多,包含两层含义:
		//1.upperbolt组件接收的tuple消息一定来自于randomspout组件
		//2.randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组
		builder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout");

		//将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
		builder.setBolt("suffixbolt", new SuffixBolt(),4).shuffleGrouping("upperbolt");

		//用builder来创建一个topology
		StormTopology demotop = builder.createTopology();

		//配置一些topology在集群中运行时的参数
		Config conf = new Config();
		//这里设置的是整个topology所占用的槽位数,也就是worker的数量(整个集群)
		conf.setNumWorkers(4);
		//设置日志输出
		conf.setDebug(true);
		//设置事务,0表示不需要事务
		conf.setNumAckers(0);

		//将这个topology提交给storm集群运行
		StormSubmitter.submitTopology("demotopo", conf, demotop);
	}
}

package com.itheima.bigData.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class UpperBolt extends BaseBasicBolt{

	//业务处理逻辑
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// 先获取到上一个组件传递过来的数据,数据在tuple里面,参数的数值代表封装的时候的第几个参数
		String goodName = tuple.getString(0);
		//将商品名转换成大写
		String goodName_upper = goodName.toUpperCase();
		//将转换完成的商品名发送出去,Values就是继承自tuple
		collector.emit(new Values(goodName_upper));
	}

	//声明该bolt组件要发出去的tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 定义字段名,以便于在下一个环节查值(可以根据字段名拿值,也可以根据角标拿值)
		declarer.declare(new Fields("uppername"));
	}

}

Spark

关于Spark

  Spark是类Hadoop MapReduce的通用的并行计算框架,基于Map/Reduce算法实现的分布式计算,是一个用来实现快速而通用的集群计算平台。Spark拥有Hadoop MapReduce所具有的优点,但不同于MapReduce的是,Spark是能够在内存中进行计算的,它的Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好的适用于数据挖掘与机器学习等需要迭代map reduce的算法。

  Spark的核心是一个对由很多计算任务组成的运行在多个工作机器或者是计算集群上的应用进行调度、分发以及监控的计算引擎,其各个组件如下图所示:

Spark

  • Spark Core:Spark Core实现了Spark的基本功能,包含任务调度、内存管理、错误恢复与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed Dataset,RDD)的API定义。RDD表示分布在多个计算节点上可以并行操作的元素集合,是Spark主要的编程抽象。
  • Spark SQL:Spark SQL是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据
  • Spark Streaming:Spark Streaming是Spark提供的对实时数据进行流式计算的组件。从底层设计来看,Spark Streaming支持与Spark Core同级别的容错性、吞吐量以及可伸缩性。
  • MLlib:是Spark提供的一个机器学习程序库,包含多种机器学习算法
  • GraphX:是用来操作图的程序库,可进行并行的图计算。与Spark Streaming和Spark SQL类似,GraphX也扩展了Spark的RDD API,能用来创建一个顶点和边都包含任意属性的有向图。
  • 集群管理器:Spark自带的一个独立调度器,同时也可以运行在Hadoop YARN、Apache Mesos等集群调度器

  Spark从一开始就是为交互式查询和迭代算法设计的,同时还支持内存式存储和高效的容错机制。Spark不仅可以将任何Hadoop分布式文件系统(HDFS)上的文件读取分为分布式数据集,也可以支持其他支持Hadoop接口的系统,比如本地文件、亚马逊S3、Cassandra、Hive、HBase等。所以,Hadoop并非Spark的必要条件,Spark支持任何实现了Hadoop接口的存储系统;Spark支持的Hadoop输入格式包括文本文件、SequenceFile、Avro、Parquet等。

  Spark的适用场景:Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小。由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对那种增量修改的应用模型不适合。

Spark vs Hadoop

  Spark的中间数据放到内存中,对于迭代运算效率更高(基于内存的分布式计算引擎);

  Spark更适合迭代运算比较多的ML和DM运算。因为在Spark里,有RDD的抽象概念;

  Spark比Hadoop更通用,Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作,Spark拥有如map、filter、flatMap、sample、groupByKey、reduceByKey、union、join、cogroup、mapValues、sort、partitionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供count、collect、reduce、lookup、save等多种actions操作。这些多种多样的数据集操作类型,给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的DataShuffle一种模式。用户可以命名、物化、控制中间结果的存储、分区等,可以说编程模型比Hadoop更灵活。Hadoop MapReduce会被新一代的大数据处理平台替代是技术发展的趋势,而在新一代的大数据处理平台中,Spark目前得到了最广泛的认可和支持。Spark是站在Hadoop And DataBase这两个巨人肩膀上的。

Spark生态及运行原理

Spark1

Spark特点

  1. 运行速度快 => Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。

  2. 适用场景广泛 => Spark可用于大数据分析统计,实时数据处理,图计算及机器学习

  3. 易用性 => 编写简单,支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在多种集群中

  4. 容错性高 => Spark引进了弹性分布式数据集RDD(Resilient Distributed Dataset)的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即允许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过Checkpoint来实现容错,而Checkpoint有两种方式:Checkpoint Data和Logging The Updates,用户可以控制采用哪种方式来实现容错。

Spark核心概念

Spark基础运行架构:

Spark-Core

  从上层来看,每个Spark应用都由一个驱动器程序(Driver Program)来发起集群上的各种并行操作。驱动器程序包含应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。驱动器程序通过一个SparkContext对象来访问Spark。这个对象代表对计算集群的一个连接。Spark Shell启动时已经自动创建了一个SparkContext对象,是一个叫作sc的变量。一旦有了SparkContext,我们就可以用它来创建RDD。如使用sc.textFile()就是来创建一个代表文件中各行文本的RDD。我们可以在这些行上进行各种操作,如count()。

  要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。不同的节点会统计文件的不同部分的行数。Spark会自动将函数发到各个执行器节点上,这样就可以在单一的驱动器程序中编程,并且让代码自动运行在多个节点上。

  除了使用Spark Shell交互式运行之外,Spark也可以在Java、Scala或Python的独立程序中被连接使用,这与在Shell中使用的主要区别在于你需要自行初始化SparkContext。在Java和Scala中,只需要给你的应用添加一个对于Spark-core工件的Maven依赖便可以连接到Spark,在Python中,可以先把应用写出Python脚本,然后用Spark自带的 spark-submit 脚本来运行,spark-submit脚本会帮我们引入Python程序的Spark依赖。

SparkWithYarn

spark运行流程:

  Spark架构和之前的一样,依旧是采用分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。

  • Master作为整个集群的控制器,负责整个集群的正常运行
  • Worker相当于计算节点,接收主节点命令与进行状态汇报
  • Executor负责任务的执行
  • Client作为用户的客户端负责提交应用
  • Driver负责控制一个应用的执行

  Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算机节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

  1. Executor/Task每个程序都有,不同程序互相隔离,task多线程并行

  2. 集群对Spark透明,Spark只要获取相关节点和进程

  3. Driver与Executor保持通信,协作处理

基本概念:

  • Application => Spark的应用程序,包含一个Driver program和若干Executor
  • SparkContext => Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor
  • Driver program => 运行Application的main()函数并创建SparkContext
  • Executor => 是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务
  • Cluster Manager => 在集群上获取资源的外部服务(例如:Standalone、Mesos、Yarn)
  • Worker Node => 集群中任何可以运行Application代码的节点,运行一个或多个Executor进程
  • Task => 运行在Executor上的工作单元
  • Job => SparkContext提交的具体Action操作,常和Action对应
  • Stage => 每个Job会被拆分成很多组Task,每组任务被称为Stage,也称TaskSet
  • RDD => 是Resilient distributed dataset(弹性分布式数据集)的简称,是Spark最核心的模块和类
  • DAGScheduler => 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler
  • TaskScheduler => 将Taskset提交给Worker node集群运行并返回结果
  • Transformations => 是Spark API的一种类型,Transformation返回值还是一个RDD,所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的
  • Action => 是Spark API的一种类型,Action返回值不是一个RDD,而是一个Scala集合,计算只有在Action被提交的时候计算才被触发

RDD

RDD

  作为Spark对数据的核心抽象,RDD其实就是分布式的元素集合,在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值。Spark会自动将RDD中的数据分发到集群上,并操作并行化执行。

  Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分成多个区,这些分区运行在集群中的不同节点上。RDD可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。

  用户可以使用两种方法创建RDD:读取一个外部数据集(sc.textFile()),或在驱动器程序里分发驱动器程序中的对象集合(如list和set)。创建出来后,RDD支持两种类型的操作:转化操作(transformation)和行为操作(action)。转化操作会由一个RDD生成一个新的RDD。另一方面,行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中。

  转化操作和行动操作的区别在于Spark计算RDD的方式不同。虽然可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD。这些RDD只有在第一次行动操作中用到时,才会真正计算。如行动操作first()中,Spark只需要扫描文件直到找到第一个匹配的行为为止,而不需要读取整个文件。

  默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算,如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。我们可以让Spark把数据持久化到许多不同的地方,在第一次对持久化的RDD计算之后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把RDD缓存到磁盘上而不是内存中。

注:在任何时候都能进行重算是我们为什么把RDD描述为“弹性”的原因。当保存RDD数据的一台机器失败时,Spark还可以使用这种特性来重算出丢掉的分区。

创建RDD

  Spark提供了两种创建RDD的方式:读取外部数据集,以及在驱动程序中对一个集合进行并行化

  1. 对集合进行并行化

  把程序中一个已有集合传给SparkContext的paralleliz()方法,不过需要注意的是,除了开发原型和测试时,这种方式用的并不多,毕竟这种方式需要把你的整个数据集先放到一台机器的内存中

  1. 从外部存储中读取数据:即SparkContext对象调用textFile()方法

RDD操作

  1. 转化操作:返回一个新的RDD操作,即转化操作返回的是RDD。转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算。有些转化操作每次只会操作RDD中的一个元素。

例:筛选出log.txt中的错误信息:

#Python实现filter()转化操作:
inputRDD = sc.textFile("log.txt")
errorRDD = inputRDD.filter(lambda x:"error" in x)
//Java实现filter()转化操作:
JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorRDD = inputRDD.filter(new Function<String,Boolean>(){
	public Boolean call(String x){ return x.contains("error"); }
});

注意:filter()操作不会改变已有的inputRDD中的数据。union()与filter()的不同点在于它操作两个RDD而不是一个。转化操作可以操作任意数量的输入RDD。

  通过转化操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph)来记录这些不同的RDD之间的依赖关系。Spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久的RDD丢失部分数据时恢复所丢失的数据,图如下:

lineageGraph

  1. 行动操作:向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算(行动操作返回的是其他数据类型),由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的RDD的转化操作。

  行动操作中,take()方法获取RDD中的少量元素;collect()函数获取整个RDD中的数据。记住,只有当整个数据集在单台机器的内存中放得下时,才能用collect(),因此collect()不能用在大规模数据集上。

  将数据写到诸如HDFS这样的分布式存储系统中:使用saveAsTextFile()、saveAsSequenceFile(),或任意的其他行动操作来把RDD的数据内容以各种自带的格式保存起来。

  每当我们调用一个新的操作时,整个RDD都会从头开始计算,要避免这种低效的行为,用户可以将中间结果持久化。

  1. 惰性求值

  不应该把RDD看作存放特定数据的数据集,而最好把每个RDD当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。虽然转化操作是惰性求值的,但是还是可以随时通过运行一个行动操作来强制Spark执行RDD的转化操作,并且在Spark中写一个非常复杂的映射来达到一步完成并不见得能比使用很多简单的连续操作获得好很多的性能。

  1. 向Spark传递函数

  当你传递的对象是某个对象的成员,或者包含了对某个对象的一个字段的引用时(例如self.field),Spark就会把整个对象发到工作节点上,这样传递的东西就太多了

FuncWithPython

解决办法是:只要你把所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量。(有时,如果传递的类里面包含Python不知道如何序列化传输的对象,也会导致你的程序失败)

TheKeyOfQuestion

  在Java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递

InterfaceOfJavaFunc

在Java中使用匿名内部类进行函数传递:

RDD<String> errors = lines.filter(new Function<String,Boolean>(){
	public Boolean call(String x){ return x.contains("error"); }
});

在Java中使用具名类进行函数传递:

class ContainsError implements Function<String,Boolean>(){
	public Boolean call(String x){ return x.contains("error"); }
}
RDD<String> errors = lines.filter(new ContainsError());

在Java中使用lambda表达式进行函数传递:

RDD<String> errors = lines.filter(s -> s.contains("error"));

匿名内部类和lambda表达式都可以引用方法中封装的任意final变量,因此可以像在Python和Scala中一样把这些变量传递给Spark

针对各个元素的转化操作:

DiffRDDOP

RDD的flatMap()和map()的区别:

DiffMap

因为转化操作返回的是RDD,所以不像普通的编程一样需要用字符串数组去接收切割的返回值

伪集合操作:以下四种集合操作都要求操作的RDD是相同数据类型的

4RDD

  • distinct()操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份
  • union(other)是最简单的集合操作,它会返回一个包含两个RDD中所有元素的RDD。如果输入的RDD中有重复数据,Spark的union()操作也会包含这些重复数据
  • intersection(other)方法,只返回两个RDD中都有的元素。intersection()在运行时也会去掉所有重复的元素(单个RDD内的重复元素也会一起移除,需要数据混洗)
  • subtract(other)函数接收另一个RDD作为参数,返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD(需要数据混洗)
  • cartesian(other)转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则来自另一个RDD(计算两个RDD的笛卡儿积)

Cartesian

常见的RDD转化操作:

对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

tranRDD1

对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

tranRDD2

RDD行动操作:RDD的一些行动操作会以普通集合或者值的形式将RDD的部分或全部数据返回驱动器程序中。

对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作:

actionRDD

其中,take(n)返回RDD中的n个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合,collect()和take(n)这些操作返回元素的顺序与预期的可能不一样

在不同RDD类型间转换:Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD,接口如下:

JavaInterface

也可以调用RDD上的一些别的函数(不能只是创建出一个DoubleFunction然后把它传给map())。当需要一个DoubleRDD时,我们应当调用mapToDouble()来替代map(),跟其他所有函数所遵循的模式一样。例如,用Java创建DoubleRDD:

JavaDoubleRDD result = rdd.mapToDouble(
	new DoubleFunction<Integer>(){
		public double call(Integer x){
			return (double)x*x;
		}
	}
);
System.out.println(result.mean());

持久化(缓存):

  • Spark RDD是惰性求值的,当需要多次使用同一个RDD时,如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它所有的依赖,这在迭代算法中消耗格外大。为了避免多次计算同一个RDD,可以让Spark对数据进行持久化
  • 当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。如过一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
  • 根据不同的目的,将RDD分为不同的持久化级别(如下图)。在Scala和Java中,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。当我们把数据写到磁盘或堆外存储上时,也总是使用序列化后的数据。

RDDpersist

注:persist()调用本身不会触发强制求值。如果缓存的数据太多,内存放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。RDD还有一个方法叫做unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除。

键值对RDD

  键值对RDD通常用来进行聚合计算,我们一般要先通过一些初始ETL(抽取、转化、装载)操作来将数据转化为键值对形式。而分区,是用来让用户控制键值对RDD在各节点上分布情况的高级特性。Spark为包含键值对类型的RDD提供了一些专有的操作,这些RDD被称为pair RDD。pair RDD提供reduceByKey()方法,可以分别归约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合到一起,合并为一个RDD。很多存储键值对的数据格式会在读取时直接返回有其键值对数据组成的pair RDD,当需要把一个普通的RDD转为pair RDD时,可以调用map()函数来实现,传递的函数需要返回键值对。

#在Python中使用第一个单词作为键创建出一个pair RDD
pairs = lines.map(lambda x:(x.split(" ")[0],x))

  Java没有自带的二元组类型,因此Spark的Java API让用户使用scala.Tuple2类来创建二元组。Java用户可以通过new Tuple2(elem1,elem2)来创建一个新的二元组,并且可以通过 ._1() 和 ._2() 方法访问其中的元素。

PairFunction<String,String,String> keyData = new PairFunction<String,String,String>(){
	public Tuple2<String,String> call(String x){
		return new Tuple2(x.split(" ")[0],x);
	}
};
JavaPairRDD<String,String> pairs = lines.mapToPair(keyData);

  当用Scala和Python从一个内存中的数据集创建pair RDD时,只需要对这个由二元组组成的集合调用SparkContext.parallelize()方法。而要使用Java从内存数据集创建pair RDD的话,则需要使用SparkContext.parallelizePairs()。pair RDD可以使用所有标准RDD上的可用的转化操作。由于pair RDD中包含二元组,所以需要传递的函数应当操作二元组而不是独立的元素。

Pair RDD的转化操作(以键值对集合{(1,2),(3,4),(3,6)}为例)

Pair RDD

针对两个pair RDD的转化操作(rdd={(1,2),(3,4),(3,6)} other={(3,9)})

Pair RDD2

只访问pair RDD的值部分的话,Spark提供了mapValues(func)函数,功能类似与map{case(x,y):(x,func(y))}

rdd.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))

mapValues

reduceByKey()与reduce()类似、foldByKey()与fold()类似。调用reduceByKey()和foldByKey()会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并

#用Python实现单词计数
rdd = sc.textFile("hdfs://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x:(x,1).reduceByKey(lambda x,y:x+y))
//用Scala实现单词技术
val input = sc.textFile("hdfs://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x,1).reduceByKey(x,y) => x+y)
//用Java实现单词计数
JavaRDD<String> input = sc.textFile("hdfs://...");
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String,String>(){
	public Iterable<String> call(String x){ return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String,Integer> result = words.mapToPair(
	new PairFunction<String,String,Integer>(){
		public Tuple2<String,Integer> call(String x){ return new Tuple2(x,1); }
	}).reduceByKey(
		new Function2<Integer,Integer,Integer>(){
			public Integer call(Integer a,Integer b){ return a+b; }
		});

可以使用countByValue()函数,以便更快地实现单词计数:result = rdd.flatMap(lambda x:x.split(“ “)).countByValue()。

Kafka

JPDA

  Sun Microsystem 的 Java Platform Debugger Architecture (JPDA) 技术是一个多层架构,使您能够在各种环境中轻松调试 Java 应用程序。JPDA 由两个接口(分别是 JVM Tool Interface 和 JDI)、一个协议(Java Debug Wire Protocol)和两个用于合并它们的软件组件(后端和前端)组成。它的设计目的是让调试人员在任何环境中都可以进行调试。更详细的介绍,可以参考使用 Eclipse 远程调试 Java 应用程序。

JDWP 设置

JVM本身就支持远程调试,Eclipse也支持JDWP,只需要在各模块的JVM启动时加载以下参数:

dt_socket       # 表示使用套接字传输。
address=8000
# JVM在8000端口上监听请求,这个设定为一个不冲突的端口即可。
server=y
# y表示启动的JVM是被调试者。如果为n,则表示启动的JVM是调试器。
suspend=y
# y表示启动的JVM会暂停等待,直到调试器连接上才继续执行。suspend=n,则JVM不会暂停等待。

需要在 $HADOOP_HOME/etc/hadoop/hadoop-env.sh 文件的最后添加你想debug的进程

#远程调试namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"

#远程调试datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"

#远程调试RM
export YARN_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"

#远程调试NM
export YARN_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦