原创

Hadoop基础知识

相关

大数据

概念

  • 不能使用一台机器进行处理的数据

  • 大数据的核心是样本=总体

特性

  • 大量性(volume): 一般在大数据里,单个文件的级别至少为几十,几百GB以上
  • 快速性(velocity): 反映在数据的快速产生及数据变更的频率上
  • 多样性(variety): 泛指数据类型及其来源的多样化,进一步可以把数据结构归纳为结构化(structured),半结构化(semi-structured),和非结构化(unstructured)
  • 易变性: 伴随数据快速性的特征,数据流还呈现一种波动的特征。不稳定的数据流会随着日,季节,特定事件的触发出现周期性峰值
  • 准确性: 又称为数据保证(data assurance)。不同方式,渠道收集到的数据在质量上会有很大差异。数据分析和输出结果的错误程度和可信度在很大程度上取决于收集到的数据质量的高低
  • 复杂性: 体现在数据的管理和操作上。如何抽取,转换,加载,连接,关联以把握数据内蕴的有用信息已经变得越来越有挑战性

    关键技术

1.数据分布在多台机器

  • 可靠性:每个数据块都复制到多个节点

  • 性能:多个节点同时处理数据

2.计算随数据走

  • 网络IO速度 << 本地磁盘IO速度,大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行)

  • 代码向数据迁移,避免大规模数据时,造成大量数据迁移的情况,尽量让一段数据的计算发生在同一台机器上

3.串行IO取代随机IO

  • 传输时间 << 寻道时间,一般数据写入后不再修改

Hadoop简介

Hadoop可运行于一般的商用服务器上,具有高容错、高可靠性、高扩展性等特点

特别适合写一次,读多次的场景

适合场景

  • 大规模数据
  • 流式数据(写一次,读多次)
  • 商用硬件(一般硬件)

不适合场景

  • 低延时的数据访问
  • 大量的小文件
  • 频繁修改文件(基本就是写1次)

Hadoop架构

  • HDFS: 分布式文件存储
  • YARN: 分布式资源管理
  • MapReduce: 分布式计算
  • Others: 利用YARN的资源管理功能实现其他的数据处理方式

内部各个节点基本都是采用Master-Woker架构

Hadoop HDFS

简介

Hadoop Distributed File System,分布式文件系统

架构

hdfs-architecture

  • Block数据块;

    1. 基本存储单位,一般大小为64M(配置大的块主要是因为:
      1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;
      2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;
      3)对数据块进行读写,减少建立网络的连接成本)
    2. 一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小
    3. 基本的读写单位,类似于磁盘的页,每次都是读写一个块
    4. 每个块都会被复制到多台机器,默认复制3份

HDFS2.x以后的block默认128M

  • NameNode

    1. 存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小
    2. 一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件
    3. 数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)
    4. NameNode失效则整个HDFS都失效了,所以要保证NameNode的可用性
  • Secondary NameNode

    1. 定时与NameNode进行同步(定期合并文件系统镜像和编辑日志,然后把合并后的传给NameNode,替换其镜像,并清空编辑日志,类似于CheckPoint机制),但NameNode失效后仍需要手工将其设置成主机
  • DataNode

    1. 保存具体的block数据
    2. 负责数据的读写操作和复制操作
    3. DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息
    4. DataNode之间会进行通信,复制数据块,保证数据的冗余性

    Hadoop写文件

1.客户端将文件写入本地磁盘的 HDFS Client 文件中

2.当临时文件大小达到一个 block 大小时,HDFS client 通知 NameNode,申请写入文件

3.NameNode 在 HDFS 的文件系统中创建一个文件,并把该 block id 和要写入的 DataNode 的列表返回给客户端

4.客户端收到这些信息后,将临时文件写入 DataNodes

  • 4.1 客户端将文件内容写入第一个 DataNode(一般以 4kb 为单位进行传输)
  • 4.2 第一个 DataNode 接收后,将数据写入本地磁盘,同时也传输给第二个 DataNode
  • 4.3 依此类推到最后一个 DataNode,数据在 DataNode 之间是通过 pipeline 的方式进行复制的
  • 4.4 后面的 DataNode 接收完数据后,都会发送一个确认给前一个 DataNode,最终第一个 DataNode 返回确认给客户端
  • 4.5 当客户端接收到整个 block 的确认后,会向 NameNode 发送一个最终的确认信息
  • 4.6 如果写入某个 DataNode 失败,数据会继续写入其他的 DataNode。然后 NameNode 会找另外一个好的 DataNode 继续复制,以保证冗余性
  • 4.7 每个 block 都会有一个校验码,并存放到独立的文件中,以便读的时候来验证其完整性

    5.文件写完后(客户端关闭),NameNode 提交文件(这时文件才可见,如果提交前,NameNode 垮掉,那文件也就丢失了。fsync:只保证数据的信息写到 NameNode 上,但并不保证数据已经被写到DataNode 中)

    Rack aware(机架感知)

    通过配置文件指定机架名和 DNS 的对应关系

    假设复制参数是3,在写入文件时,会在本地的机架保存一份数据,然后在另外一个机架内保存两份数据(同机架内的传输速度快,从而提高性能)

    整个 HDFS 的集群,最好是负载平衡的,这样才能尽量利用集群的优势

Hadoop 读文件

img

  1. 客户端向NameNode发送读取请求
  2. NameNode返回文件的所有block和这些block所在的DataNodes(包括复制节点)
  3. 客户端直接从DataNode中读取数据,如果该DataNode读取失败(DataNode失效或校验码不对),则从复制节点中读取(如果读取的数据就在本机,则直接读取,否则通过网络读取)

Hadoop 可靠性

HDFS 的可靠性主要有以下几点:

  • 冗余副本策略
  • 机架策略
  • 心跳机制
  • 安全模式
  • 效验和
  • 回收站
  • 元数据保护
  • 快照机制

冗余副本策略

  可以在 hdfs-site.xml 中设置复制因子指定副本数量

  所有数据块都可副本

  DataNode 启动时,遍历本地文件系统,产生一份 HDFS 数据块和本地文件的对应关系列表 (blockreport) 汇报给 Namenode

机架策略

  HDFS 的"机架感知",通过节点之间发送一个数据包,来感应它们是否在同一个机架

  一般在本机架放一个副本,在其他机架再存放一个副本,这样可以防止机架失效时丢失数据,也可以提高带宽利用率

心跳机制

  NameNode 周期性从 DataNode 接受心跳信息和块报告

  NameNode 根据块报告验证元数据

  没有按时发送心跳的 DataNode 会被标记为宕机,不会再给他任何 I/O 请求

  如果 DataNode 失效造成副本数量下降,并且低于预先设定的值,NameNode 会检测出这些数据库,并在合适的时机重新复制

  引发重新复制的原因还包括数据副本本身损坏,磁盘错误,复制因子被增大等

安全模式

  NameNode 启动时会先经过一个 "安全模式" 阶段

  安全模式阶段不会产生数据写

  在此阶段 NameNode 收集各个 DataNode 的报告, 当数据块达到最小副本数以上时,会被认为是"安全"的

  在一定比例(可设置) 的数据块被确定为"安全" 后 ,在过若干时间,安全模式结束

  当检测到副本数不足的数据块时,该块会被复制,直到达到最小副本数

效验和  

  在文件创立时,每个数据块都产生效验和

  效验和会作为单独一个隐藏文件保存在命名空间下

  客户端获取数据时可以检查效验和是否相同,从而发现数据块是否损坏

  如果正在读取的数据块损坏,则可以继续读取其他副本

回收站

  删除文件时,其实是放入回收站 /trash

  回收站里的文件是可以快速恢复的

  可以设置一个时间值,当回收站里文件的存放时间超过了这个值,就被彻底删除,并且释放占用的数据块

元数据保护

  映像文件和事物日志是 NameNode 的核心数据.可以配置为拥有多个副本

  副本会降低 NameNode 的处理速度,但增加安全性

  NameNode 依然是单点,如果发生故障要手工切换

Hadoop 命令工具

HDFS - 命令工具

  • fsck: 检查文件的完整性

  • start-balancer.sh: 重新平衡HDFS

  • hdfs dfs -copyFromLocal 从本地磁盘复制文件到HDFS

Hadoop YARN

旧的MapReduce架构

yarn-old-mapreduce

  • JobTracker: 负责资源管理,跟踪资源消耗和可用性,作业生命周期管理(调度作业任务,跟踪进度,为任务提供容错)
  • TaskTracker: 加载或关闭任务,定时报告任务状态

此架构会有以下问题:

  1. JobTracker 是 MapReduce 的集中处理点,存在单点故障
  2. JobTracker 完成了太多的任务,造成了过多的资源消耗,当 MapReduce job 非常多的时候,会造成很大的内存开销。这也是业界普遍总结出老 Hadoop 的 MapReduce 只能支持 4000 节点主机的上限
  3. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM
  4. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot , 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就集群资源利用的问题

总的来说就是单点问题资源利用率问题

YARN架构

yarn-architecture

yarn-architecture-physical

YARN就是将 JobTracker 的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理和一个每个作业的管理(ApplicationMaster) ResourceManager 和 NodeManager 提供了计算资源的分配和管理,而 ApplicationMaster 则完成应用程序的运行

  • ResourceManager: 全局资源管理和任务调度
  • NodeManager: 单个节点的资源管理和监控
  • ApplicationMaster: 单个作业的资源管理和任务监控
  • Container: 资源申请的单位和任务运行的容器

架构对比

hadoop-different

YARN架构下形成了一个通用的资源管理平台和一个通用的应用计算平台,避免了旧架构的单点问题和资源利用率问题,同时也让在其上运行的应用不再局限于 MapReduce 形式

YARN基本流程

yarn-process

yarn-process-status-update

1. Job submission

从ResourceManager 中获取一个Application ID 检查作业输出配置,计算输入分片 拷贝作业资源(job jar、配置文件、分片信息)到 HDFS,以便后面任务的执行

2. Job initialization

ResourceManager 将作业递交给 Scheduler(有很多调度算法,一般是根据优先级)Scheduler 为作业分配一个 Container,ResourceManager 就加载一个 application master process 并交给 NodeManager。

管理 ApplicationMaster 主要是创建一系列的监控进程来跟踪作业的进度,同时获取输入分片,为每一个分片创建一个 Map task 和相应的 reduce task Application Master 还决定如何运行作业,如果作业很小(可配置),则直接在同一个 JVM 下运行

3. Task assignment

ApplicationMaster 向 Resource Manager 申请资源(一个个的Container,指定任务分配的资源要求)一般是根据 data locality 来分配资源

4. Task execution

ApplicationMaster 根据 ResourceManager 的分配情况,在对应的 NodeManager 中启动 Container 从 HDFS 中读取任务所需资源(job jar,配置文件等),然后执行该任务

5. Progress and status update

定时将任务的进度和状态报告给 ApplicationMaster Client 定时向 ApplicationMaster 获取整个任务的进度和状态

6. Job completion

Client定时检查整个作业是否完成 作业完成后,会清空临时文件、目录等

Hadoop ResourceManager

负责全局的资源管理和任务调度,把整个集群当成计算资源池,只关注分配,不管应用,且不负责容错

资源管理

  1. 以前资源是每个节点分成一个个的Map slot和Reduce slot,现在是一个个Container,每个Container可以根据需要运行ApplicationMaster、Map、Reduce或者任意的程序
  2. 以前的资源分配是静态的,目前是动态的,资源利用率更高
  3. Container是资源申请的单位,一个资源申请格式:, resource-name:主机名、机架名或*(代表任意机器), resource-requirement:目前只支持CPU和内存
  4. 用户提交作业到ResourceManager,然后在某个NodeManager上分配一个Container来运行ApplicationMaster,ApplicationMaster再根据自身程序需要向ResourceManager申请资源
  5. YARN有一套Container的生命周期管理机制,而ApplicationMaster和其Container之间的管理是应用程序自己定义的

任务调度

  1. 只关注资源的使用情况,根据需求合理分配资源
  2. Scheluer可以根据申请的需要,在特定的机器上申请特定的资源(ApplicationMaster负责申请资源时的数据本地化的考虑,ResourceManager将尽量满足其申请需求,在指定的机器上分配Container,从而减少数据移动)

内部结构

yarn-resource-manager

  • Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)
  • Adaminstration Service: 队列、节点、Client权限管理
  • ApplicationMasterService: 注册、终止ApplicationMaster, 获取ApplicationMaster的资源申请或取消的请求,并将其异步地传给Scheduler, 单线程处理
  • ApplicationMaster Liveliness Monitor: 接收ApplicationMaster的心跳消息,如果某个ApplicationMaster在一定时间内没有发送心跳,则被任务失效,其资源将会被回收,然后ResourceManager会重新分配一个ApplicationMaster运行该应用(默认尝试2次)
  • Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息
  • NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的Container都标记成无效,也不会调度任务到该节点运行
  • ApplicationManager: 管理应用程序,记录和管理已完成的应用
  • ApplicationMaster Launcher: 一个应用提交后,负责与NodeManager交互,分配Container并加载ApplicationMaster,也负责终止或销毁
  • YarnScheduler: 资源调度分配, 有FIFO(with Priority),Fair,Capacity方式
  • ContainerAllocationExpirer: 管理已分配但没有启用的Container,超过一定时间则将其回收

Hadoop NodeManager

Node节点下的Container管理

  1. 启动时向ResourceManager注册并定时发送心跳消息,等待ResourceManager的指令
  2. 监控Container的运行,维护Container的生命周期,监控Container的资源使用情况
  3. 启动或停止Container,管理任务运行时的依赖包(根据ApplicationMaster的需要,启动Container之前将需要的程序及其依赖包、配置文件等拷贝到本地)

内部结构

yarn-node-manager

  • NodeStatusUpdater: 启动向ResourceManager注册,报告该节点的可用资源情况,通信的端口和后续状态的维护

  • ContainerManager: 接收RPC请求(启动、停止),资源本地化(下载应用需要的资源到本地,根据需要共享这些资源)

    PUBLIC: /filecache

    PRIVATE: /usercache//filecache

    APPLICATION: /usercache//appcache//(在程序完成后会被删除)

  • ContainersLauncher: 加载或终止Container

  • ContainerMonitor: 监控Container的运行和资源使用情况

  • ContainerExecutor: 和底层操作系统交互,加载要运行的程序

Hadoop ApplicationMaster

单个作业的资源管理和任务监控

具体功能描述:

  1. 计算应用的资源需求,资源可以是静态或动态计算的,静态的一般是Client申请时就指定了,动态则需要ApplicationMaster根据应用的运行状态来决定
  2. 根据数据来申请对应位置的资源(Data Locality)
  3. 向ResourceManager申请资源,与NodeManager交互进行程序的运行和监控,监控申请的资源的使用情况,监控作业进度
  4. 跟踪任务状态和进度,定时向ResourceManager发送心跳消息,报告资源的使用情况和应用的进度信息
  5. 负责本作业内的任务的容错

ApplicationMaster可以是用任何语言编写的程序,它和ResourceManager和NodeManager之间是通过ProtocolBuf交互,以前是一个全局的JobTracker负责的,现在每个作业都一个,可伸缩性更强,至少不会因为作业太多,造成JobTracker瓶颈。同时将作业的逻辑放到一个独立的ApplicationMaster中,使得灵活性更加高,每个作业都可以有自己的处理方式,不用绑定到MapReduce的处理模式上

如何计算资源需求

一般的MapReduce是根据block数量来定Map和Reduce的计算数量,然后一般的Map或Reduce就占用一个Container

如何发现数据的本地化

数据本地化是通过HDFS的block分片信息获取的

Hadoop Container

  1. 基本的资源单位(CPU、内存等)
  2. Container可以加载任意程序,而且不限于Java
  3. 一个Node可以包含多个Container,也可以是一个大的Container
  4. ApplicationMaster可以根据需要,动态申请和释放Container

Hadoop Failover

失败类型

  1. 程序问题
  2. 进程崩溃
  3. 硬件问题

失败处理

任务失败

  1. 运行时异常或者JVM退出都会报告给ApplicationMaster
  2. 通过心跳来检查挂住的任务(timeout),会检查多次(可配置)才判断该任务是否失效
  3. 一个作业的任务失败率超过配置,则认为该作业失败
  4. 失败的任务或作业都会有ApplicationMaster重新运行

ApplicationMaster失败

  1. ApplicationMaster定时发送心跳信号到ResourceManager,通常一旦ApplicationMaster失败,则认为失败,但也可以通过配置多次后才失败
  2. 一旦ApplicationMaster失败,ResourceManager会启动一个新的ApplicationMaster
  3. 新的ApplicationMaster负责恢复之前错误的ApplicationMaster的状态(yarn.app.mapreduce.am.job.recovery.enable=true),这一步是通过将应用运行状态保存到共享的存储上来实现的,ResourceManager不会负责任务状态的保存和恢复
  4. Client也会定时向ApplicationMaster查询进度和状态,一旦发现其失败,则向ResouceManager询问新的ApplicationMaster

NodeManager失败

  1. NodeManager定时发送心跳到ResourceManager,如果超过一段时间没有收到心跳消息,ResourceManager就会将其移除
  2. 任何运行在该NodeManager上的任务和ApplicationMaster都会在其他NodeManager上进行恢复
  3. 如果某个NodeManager失败的次数太多,ApplicationMaster会将其加入黑名单(ResourceManager没有),任务调度时不在其上运行任务

ResourceManager失败

  1. 通过checkpoint机制,定时将其状态保存到磁盘,然后失败的时候,重新运行
  2. 通过zookeeper同步状态和实现透明的HA

可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和zookeeper来ֹ实现HA

Hadoop MapReduce

简介

一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组

Pattern

img

map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)

Map输出格式和Reduce输入格式一定是相同的

基本流程

MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中

img

详细流程

img

多节点下的流程

img

主要过程

img

Map Side

Record reader

记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内.

Map

在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择.

Shuffle and Sort

ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。

Reduce

reduce采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当reduce功能完成,就会发送0个或多个键值对。

输出格式

输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。类似记录读取,自定义输出格式不在本书范围。

Hadoop 读取数据

MapReduce - 读取数据

通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map

InputFormat

决定读取数据的格式,可以是文件或数据库等

功能

  1. 验证作业输入的正确性,如格式等
  2. 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的Map任务
  3. 提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用

方法

List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题

RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题

类结构

img

TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容

KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为(\t)字符。

NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1

SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。

InputSplit

代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法

Split内有Location信息,利于数据局部化

一个InputSplit给一个单独的Map处理

public abstract class InputSplit {
      /**
       * 获取Split的大小,支持根据size对InputSplit排序.
       */
      public abstract long getLength() throws IOException, InterruptedException;

      /**
       * 获取存储该分片的数据所在的节点位置.
       */
      public abstract String[] getLocations() throws IOException, InterruptedException;
}

RecordReader

将InputSplit拆分成一个个对给Map处理,也是实际的文件读取分隔对象

问题

大量小文件如何处理

CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)

怎么计算split的

通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度

通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小

如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取

如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)

先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)

public List<InputSplit> getSplits(JobContext job) throws IOException {
    // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
            // 获取该文件所有的block信息列表[hostname, offset, length]
              BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
            // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
              if (isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                // 计算分片大小
                // 即 Math.max(minSize, Math.min(maxSize, blockSize));
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                long bytesRemaining = length;
                // 循环分片。
                // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                      splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                      bytesRemaining -= splitSize;
                }
                // 处理余下的数据
                if (bytesRemaining != 0) {
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
                }
            } else {
                // 不可split,整块返回
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
            }
        } else {
            // 对于长度为0的文件,创建空Hosts列表,返回
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }

    // 设置输入文件数量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
}

分片间的数据如何处理

split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split

img

解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;

    // initialize函数即对LineRecordReader的一个初始化
    // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        // 打开文件,并定位到分片读取的起始位置
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());

        boolean skipFirstLine = false;
        if (codec != null) {
            // 文件是压缩文件的话,直接打开文件
            in = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            // 只要不是第一个split,则忽略本split的第一行数据
            if (start != 0) {
                skipFirstLine = true;
                --start;
                // 定位到偏移位置,下次读取就会从偏移位置开始
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }

        if (skipFirstLine) {
            // 忽略第一行数据,重新定位start
            start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);// key即为偏移量
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (pos < end) {
            newSize = in.readLine(value, maxLineLength,    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            // 读取的数据长度为0,则说明已读完
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            // 读取的数据长度小于最大行长度,也说明已读取完毕
            if (newSize < maxLineLength) {
                break;
            }
            // 执行到此处,说明该行数据没读完,继续读入
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

Hadoop Mapper

主要是读取InputSplit的每一个Key,Value对并进行处理

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    /**
     * 预处理,仅在map task启动时运行一次
     */
    protected void setup(Context context) throws  IOException, InterruptedException {
    }

    /**
     * 对于InputSplit中的每一对<key, value>都会运行一次
     */
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

    /**
     * 扫尾工作,比如关闭流等
     */
    protected void cleanup(Context context) throws IOException, InterruptedException {
    }

    /**
     * map task的驱动器
     */
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }
}

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private RecordReader<KEYIN, VALUEIN> reader;
    private InputSplit split;

    /**
     * Get the input split for this map.
     */
    public InputSplit getInputSplit() {
        return split;
    }

    @Override
    public KEYIN getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
    }

    @Override
    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        return reader.nextKeyValue();
    }
}

Hadoop Shuffle

对Map的结果进行排序并传输到Reduce进行处理 Map的结果并不是直接存放到硬盘,而是利用缓存做一些预排序处理 Map会调用Combiner,压缩,按key进行分区、排序等,尽量减少结果的大小 每个Map完成后都会通知Task,然后Reduce就可以进行处理

img

Map端

当Map程序开始产生结果的时候,并不是直接写到文件的,而是利用缓存做一些排序方面的预处理操作

每个Map任务都有一个循环内存缓冲区(默认100MB),当缓存的内容达到80%时,后台线程开始将内容写到文件,此时Map任务可以继续输出结果,但如果缓冲区满了,Map任务则需要等待

写文件使用round-robin方式。在写入文件之前,先将数据按照Reduce进行分区。对于每一个分区,都会在内存中根据key进行排序,如果配置了Combiner,则排序后执行Combiner(Combine之后可以减少写入文件和传输的数据)

每次结果达到缓冲区的阀值时,都会创建一个文件,在Map结束时,可能会产生大量的文件。在Map完成前,会将这些文件进行合并和排序。如果文件的数量超过3个,则合并后会再次运行Combiner(1、2个文件就没有必要了)

如果配置了压缩,则最终写入的文件会先进行压缩,这样可以减少写入和传输的数据

一旦Map完成,则通知任务管理器,此时Reduce就可以开始复制结果数据

Reduce端

Map的结果文件都存放到运行Map任务的机器的本地硬盘中

如果Map的结果很少,则直接放到内存,否则写入文件中

同时后台线程将这些文件进行合并和排序到一个更大的文件中(如果文件是压缩的,则需要先解压)

当所有的Map结果都被复制和合并后,就会调用Reduce方法

Reduce结果会写入到HDFS中

调优

一般的原则是给shuffle分配尽可能多的内存,但前提是要保证Map、Reduce任务有足够的内存

对于Map,主要就是避免把文件写入磁盘,例如使用Combiner,增大io.sort.mb的值

对于Reduce,主要是把Map的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘。默认情况下,所有的内存都是分配给Reduce方法的,如果Reduce方法不怎么消耗内存,可以mapred.inmem.merge.threshold设成0,mapred.job.reduce.input.buffer.percent设成1.0

在任务监控中可通过Spilled records counter来监控写入磁盘的数,但这个值是包括map和reduce的

对于IO方面,可以Map的结果可以使用压缩,同时增大buffer size(io.file.buffer.size,默认4kb)

配置

属性 默认值 描述
io.sort.mb 100 映射输出分类时所使用缓冲区的大小.
io.sort.record.percent 0.05 剩余空间用于映射输出自身记录.在1.X发布后去除此属性.随机代码用于使用映射所有内存并记录信息.
io.sort.spill.percent 0.80 针对映射输出内存缓冲和记录索引的阈值使用比例.
io.sort.factor 10 文件分类时合并流的最大数量。此属性也用于reduce。通常把数字设为100.
min.num.spills.for.combine 3 组合运行所需最小溢出文件数目.
mapred.compress.map.output false 压缩映射输出.
mapred.map.output.compression.codec DefaultCodec 映射输出所需的压缩解编码器.
mapred.reduce.parallel.copies 5 用于向reducer传送映射输出的线程数目.
mapred.reduce.copy.backoff 300 时间的最大数量,以秒为单位,这段时间内若reducer失败则会反复尝试传输
io.sort.factor 10 组合运行所需最大溢出文件数目.
mapred.job.shuffle.input.buffer.percent 0.70 随机复制阶段映射输出缓冲器的堆栈大小比例
mapred.job.shuffle.merge.percent 0.66 用于启动合并输出进程和磁盘传输的映射输出缓冲器的阀值使用比例
mapred.inmem.merge.threshold 1000 用于启动合并输出和磁盘传输进程的映射输出的阀值数目。小于等于0意味着没有门槛,而溢出行为由 mapred.job.shuffle.merge.percent单独管理.
mapred.job.reduce.input.buffer.percent 0.0 用于减少内存映射输出的堆栈大小比例,内存中映射大小不得超出此值。若reducer需要较少内存则可以提高该值.

Hadoop 编程

处理

  1. select:直接分析输入数据,取出需要的字段数据即可
  2. where: 也是对输入数据处理的过程中进行处理,判断是否需要该数据
  3. aggregation:min, max, sum
  4. group by: 通过Reducer实现
  5. sort
  6. join: map join, reduce join

Third-Party Libraries

export LIBJARS=$MYLIB/commons-lang-2.3.jar, hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS
hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner The dependent libraries are now included inside the application JAR file

一般还是上面的好,指定依赖可以利用Public Cache,如果是包含依赖,则每次都需要拷贝

Hadoop IO

  1. 输入文件从HDFS进行读取.
  2. 输出文件会存入本地磁盘.
  3. Reducer和Mapper间的网络I/O,从Mapper节点得到Reducer的检索文件.
  4. 使用Reducer实例从本地磁盘回读数据.
  5. Reducer输出- 回传到HDFS.

序列化

序列化是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。反序列化是指将字节流转回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储

在Hadoop中,系统中多个节点进程间的通信是通过“远程过程调用”(RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下:

1.紧凑

紧凑格式能充分利用网络带宽(数据中心最稀缺的资源)

2.快速

进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。

3.可扩展

为了满足新的需求,协议不断变化。所以在控制客户端和服务期的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增加新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。

4.支持互操作

对于系统来说,希望能够支持以不同语言写的客户端与服务器交互,所以需要设计一种特定的格式来满足这一需求。

Writable 接口

Writable 接口定义了两个方法:一个将其状态写入 DataOutput 二进制流,另一个从 DataInput二进制流读取状态。

BytesWritable

BytesWritable 是对二进制数据数组的封装。它的序列化格式为一个指定所含数据字节数的整数域(4字节),后跟数据内容的本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03和05)

NullWritable

NullWritable 是 writable 的特殊类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。

ObjectWritable和GenericWritable

ObjectWritable 是对 java 基本类型(String,enum,Writable,null或这些类型组成的数组)的一个通用封装。它在 Hadoop RPC 中用于对方法的参数和返回类型进行封装和解封装。

Wriable集合类

io 软件包共有6个 Writable 集合类,分别是 ArrayWritable,ArrayPrimitiveWritable,TwoDArrayWritable,MapWritable,SortedMapWritable以及EnumMapWritable

ArrayWritable 和 TwoDArrayWritable 是对 Writeble 的数组和两维数组(数组的数组)的实现。ArrayWritable 或 TwoDArrayWritable 中所有元素必须是同一类的实例。ArrayWritable 和 TwoDArrayWritable 都有get() ,set() 和 toArray()方法,toArray() 方法用于新建该数组的一个浅拷贝。

ArrayPrimitiveWritable 是对 Java 基本数组类型的一个封装。调用 set() 方法时,可以识别相应组件类型,因而无需通过继承该类来设置类型。

序列化框架

尽管大多数 Mapreduce 程序使用的都是 Writable 类型的键和值,但这并不是 MapReduce API 强制要求使用的。事实上,可以使用任何类型,只要能有一个机制对每个类型进行类型与二进制表示的来回转换就可以。

为了支持这个机制,Hadoop 有一个针对可替换序列化框架的 API 。序列化框架用一个 Serialization 实现来表示。Serialization 对象定义了从类型到 Serializer 实例(将对象转换为字节流)和 Deserializer 实例(将字节流转换为对象)的映射方式。

序列化IDL

还有许多其他序列化框架从不同的角度来解决问题:不通过代码来定义类型,而是使用接口定义语言以不依赖与具体语言的方式进行声明。由此,系统能够为其他语言生成模型,这种形式能有效提高互操作能力。它们一般还会定义版本控制方案。

两个比较流行的序列化框架 Apache Thrift 和Google的Protocol Buffers,常常用作二进制数据的永久存储格式。Mapreduce 格式对该类的支持有限,但在 Hadoop 内部,部分组件仍使用上述两个序列化框架来实现 RPC 和数据交换。

基于文件的数据结构

对于某些应用,我们需要一种特殊的数据结构来存储自己的数据。对于基于 Mapreduce 的数据处理,将每个二进制数据大对象单独放在各自的文件中不能实现可扩展性,所以 Hadoop 为此开发了很多更高层次的容器。

关于 SequenceFile 。

考虑日志文件,其中每一行文本代表一条日志记录。纯文本不适合记录二进制类型的数据。在这种情况下,Hadoop 的 SequenceFile 类非常合适,为二进制键值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键,以及值可以是 Writable 类型。

SequenceFile 也可以作为小文件的容器。HDFS和Mapreduce 是针对大文件优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。

SequnceFile的写操作

通过 createWriter()静态方法可以创建 SequenceFile 对象,并返回 SequnceFile.Writer 实例。该静态方法有多个重载版本,但都需要制定待写入的数据流,Configuration 对象,以及键和值的类型。存储在 SequenceFIle 中的键和值并不一定是 Writable 类型。只要能够被 Sertialization 序列化和反序列化,任何类型都可以。

SequenceFile的读操作

从头到尾读取顺序文件不外乎创建 SequenceFile.reader 实例后反复调用 next() 方法迭代读取记录。读取的是哪条记录与使用的序列化框架有关。如果使用的是 Writable 类型,那么通过键和值作为参数的 next() 方法可以将数据流的下一条键值对读入变量中。

1.通过命令行接口显示 SequenceFile。

hadoop fs 命令有一个 -text 选项可以以文本形式显示顺序文件。该选项可以查看文件的代码,由此检测出文件的类型并将其转换为相应的文本。该选项可以识别 gzip 压缩文件,顺序文件和 Avro 数据文件;否则,假设输入为纯文本文件。

\2. SequenceFile 的排序和合并。

Mapreduce 是对多个顺序文件进行排序(或合并)最有效的方法。Mapreduce 本身是并行的,并且可由你制定使用多少个 reducer 。例如,通过制定一个 reducer ,可以得到一个输出文件。

3.SequenceFile 的格式。

顺序文件由文件头和随后的一条或多条记录组成。顺序文件的前三个字节为 SEQ,紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值的名称,数据压缩细节,用户定义的元数据以及同步标识。同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中,位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识。

关于MapFile

MapFile 是已经排过序的 SequenceFile ,它有索引,所以可以按键查找。索引本身就是一个 SequenceFile ,包含 map 中的一小部分键。由于索引能够加载进内存,因此可以提供对主数据文件的快速查找。主数据文件则是另一个 SequenceFIle ,包含了所有的 map 条目,这些条目都按照键顺序进行了排序。

其他文件格式和面向列的格式

顺序文件和 map 文件是 Hadoop 中最早的,但并不是仅有的二进制文件格式,事实上,对于新项目而言,有更好的二进制格式可供选择。

Avro 数据文件在某些方面类似顺序文件,是面向大规模数据处理而设计的。但是 Avro 数据文件又是可移植的,它们可以跨越不同的编程语言使用。顺序文件,map 文件和 Avro 数据文件都是面向行的格式,意味着每一行的值在文件中是连续存储的。在面向列的格式中,文件中的行被分割成行的分片,然后每个分片以面向列的形式存储:首先存储每行第一列的值,然后是每行第2列的值,如此以往。

压缩

能够减少磁盘的占用空间和网络传输的量,并加速数据在网络和磁盘上的传输。

Hadoop 应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小,格式和所用的工具有关。比较各种压缩算法的压缩比和性能(从高到低):

\1. 使用容器文件格式,例如顺序文件, Avro 数据文件。 ORCF 了说 Parquet 文件

\2. 使用支持切分的压缩格式,例如 bzip2 或者通过索引实现切分的压缩格式,例子如LZO。

\3. 在应用中将文件中切分成块,并使用任意一种他所格式为每个数据块建立压缩文件(不论它是否支持切分)。在这种情况下,需要合理选择数据大小,以确保压缩后的数据块的大小近似于HDFS块的大小。

\4. 存储未经压缩的文件。

重点:压缩和拆分一般是冲突的(压缩后的文件的 block 是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时 Map 任务就无法处理,所以对于这些压缩,Hadoop 往往是直接使用一个 Map 任务处理整个文件的分析)。对大文件不可使用不支持切分整个文件的压缩格式,会失去数据的特性,从而造成 Mapreduce 应用效率低下。

Map 的输出结果也可以进行压缩,这样可以减少 Map 结果到 Reduce 的传输的数据量,加快传输速率。

在 Mapreduce 中使用压缩

FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);

如果输出生成顺序文件,可以设置 mapreduce.output.fileoutputformat.compress.type 属性来控制限制使用压缩格式。默认值是RECORD,即针对每条记录进行压缩。如果将其改为BLOCK,将针对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。

完整性

  • 检测数据是否损坏的常见措施是,在数据第一次引入系统时计算校验和并在数据通过一个不可靠的通道进行传输时再次计算校验和,这样就能发现数据是否损坏,如果计算所得的新校验和和原来的校验和不匹配,我们就认为数据已损坏。但该技术并不能修复数据。常见的错误检测码是 CRC-32(32位循环冗余检验),任何大小的数据输入均计算得到一个32位的整数校验和。
  • datanode 负责在收到数据后存储该数据及其校验和之前对数据进行验证。它在收到客户端的数据或复制其他 datanode 的数据时执行这个操作。正在写数据的客户端将数据及其校验和发送到由一系列 datanode 组成的管线,管线中最后一个 datanode 负责验证校验和。如果 datanode 检测到错误,客户端就会收到一个 IOException 异常的子类。
  • 客户端从 datanode 读取数据时,也会验证校验和,将它们与 datanode 中存储的校验和进行比较。每个datanode均持久保存有一个验证的校验和日志,所以它知道每个数据块的最后一次验证时间。客户端成功验证一个数据块后,会告诉这个 datanode , datanode 由此更新日志。保存这些统计信息对于检测损坏的磁盘很有价值。
  • 不只是客户端在读取数据块时会验证校验和,每个 datanode 也会在一个后台线程中运行一个 DataBlockScanner ,从而定期验证存储在这个 datanode 上的所有数据块。该项措施是解决物理存储媒体上位损坏的有力措施。
  • 由于 HDFS 存储着每个数据块的复本,因此它可以通过数据复本来修复损坏的数据块,进而得到一个新的,完好无损的复本。基本思路是,客户端在读取数据块时,如果检测到错误,首先向 namenode 报告已损坏的数据块及其正在尝试读取操作的这个 datanode ,再抛出 ChecksumException 异常。namenode 将这个数据块复本标记为已损坏,这样它不再将客户端处理请求直接发送到这个节点,或尝试将这个复本复制到另一个 datanode 。之后,它安排这个数据块的一个复本复制到另一个 datanode ,这样一来,数据块的复本因子又回到期望水平。此后,已损坏的数据块复本便被删除。
  • Hadoop的LocalFileSystem 执行客户端的校验和验证。这意味着在你写入一个名为 filename 的文件时,文件系统客户端会明确在包含每个文件快校验和的同一个目录内新建一个 filename.crc 隐藏文件。文件块的大小作为元数据存储在.crc文件中,所以即使文件块大小的设置已经发生变化,仍然可以正确读回文件。在读取文件时需要验证校验和,并且如果检测到错误,LocalFileSystem 还会抛出一个 ChecksumException 异常。

Hadoop 测试

Hadoop 测试

MRUnit单元测试Mapper和Reducer类在内存上独立运行, PipelineMapReduceDriver单线程运行.

LocalJobRunner单线程运行, 且仅有一个 Reducer能够启动conf.set("mapred.job.tracker", "local"); conf.set("fs.default.name", "file:////"); FileSystem fs = FileSystem.getLocal(conf);

MiniMRCluster, MiniYarnCluster, MiniDFSCluster 多线程

正文到此结束