HBase2.5.3
星期日, 3月 16, 2025 | 111分钟阅读

关于HBase2.5.3的学习。
基本概念
简介
Apache HBase(Hadoop DataBase)是一个开源的、高可靠性、高性能、面向列(这里指列族,非列式存储)、可伸缩、实时读写的分布式数据库,其设计思想来源于 Google 的 BigTable 论文。利用 Hadoop HDFS 作为其文件存储系统,利用 ZooKeeper 作为其分布式协同服务。主要用来存储非结构化和半结构化的松散数据(列式存储 NoSQL 数据库)。
HBase 良好的分布式架构设计为海量数据的快速存储、随机访问提供了可能,基于数据副本机制和分区机制可以轻松实现在线扩容、缩容和数据容灾,是大数据领域中 Key-Value 数据结构存储最常用的数据库方案。
注意:HBase 是列族数据库(Column-Family Database),不是列式数据库(Column-Oriented Database)。
总结:HBase 是运行在 HDFS 之上的面向列(列族)的数据库管理系统。
特点
易扩展
HBase 的扩展性主要体现在两个方面,一个是基于运算能力(HRegionServer) 的扩展,通过增加 HRegionSever 节点的数量,提升 HBase 上层的处理能力;另一个是基于存储能力的扩展(HDFS),通过增加 DataNode 节点数量对存储层的进行扩容,提升 HBase 的数据存储能力。
容量大
HBase 单表可以有十亿行、百万列,数据矩阵横向和纵向两个维度所支持的数据量级都非常具有弹性。HBase 的主要作用就是面向 PB 级别数据的实时入库和快速随机访问。这主要源于上述易扩展的特点,使得 HBase 通过扩展来存储海量的数据。
面向列
HBase 是根据列族来存储数据的,列族下面可以有非常多的列。列式存储的最大好处就是,其数据在表中是按照某列存储的,这样在查询只需要少数几个字段时,能大大减少读取的数据量。还可以动态增加列,可单独对列进行各方面的操作。
多版本
HBase 的每个列的数据存储支持多个 Version,比如住址列,可能有多个变更。
稀疏性
为空的列并不占用存储空间,表可以设计的非常稀疏。不必像关系型数据库那样需要预先知道所有列名然后再进行null 填充。
高可靠
WAL(Write Ahead Log)日志先行机制,保证数据写入的时候不会因为集群异常而导致写入数据丢失。Replication 机制,保证了集群在出现严重问题的时候,数据不会发生丢失或者损坏。HBase 底层使用 HDFS,本身也有备份。
高性能
HBase 底层采用 LSM 树(Log-Structured Merge-Tree)数据结构进行存储,底层的 LSM 树数据结构和 RowKey 有序排列等架构上的独特设计,使得 HBase 写入性能非常高。HRegion 切分、主键索引、缓存机制使得 HBase 在海量数据下具备一定的随机读取性能,该性能针对 RowKey 的查询能够到达毫秒级别。LSM 树属于树形结构,最末端的子节点是以内存的方式进行存储的,内存中的小树会 Flush 到磁盘中(当子节点达到一定阈值以后,会放到磁盘中,且存入的过程会进行实时 Merge 成一个主节点,然后磁盘中的树定期会做 Merge 操作,合并成一棵大树,以优化读性能)。
应用
HBase 是一种 NoSQL 数据库,这意味着它不像传统的 RDBMS 数据库那样支持 SQL 作为查询语言。HBase 是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多 RDBMS 系统的特性,比如列类型,辅助索引,触发器和高级查询语言等。
HBase 不适合解决所有的问题,首先数据量要足够大,如果有十亿或百亿行数据,那么 HBase 是一个很好的选择,如果只有几百万行甚至不到的数据量,RDBMS 是一个很好的选择。因为数据量小的话,真正能工作的机器量少,剩余的机器都处于空闲的状态。其次,如果你不需要辅助索引,静态类型的列,事务等特性可以考虑 HBase。但是一个已经用 RDBMS的系统想要切换到 HBase,则需要重新设计系统。最后,保证硬件资源足够,每个 HDFS 集群在少于 5 个节点的时候,都不能表现的很好。因为 HDFS 默认的复制数量是 3,再加上一个 NameNode。其实 HBase 在单机环境下也能运行,但是请在开发环境中进行使用。
适合 HBase 的应用:
-
存储业务数据:车辆 GPS 信息,司机点位信息,用户操作信息,设备访问信息。
-
存储日志数据:架构监控数据(登录日志,中间件访问日志,推送日志,短信邮件发送记录),业务操作日志信息。
-
存储业务附件:UDFS 系统(去中心化文件系统)存储图像,视频,文档等附件信息。
HBase 和 RDBMS 的区别:
属性 | HBase | RDBMS |
---|---|---|
数据类型 | 只有字符串 | 丰富的数据类型 |
数据操作 | 增删改查,不支持JOIN | 各种各样的函数与表连接 |
存储模式 | 列式存储 | 表结构和行式存储 |
数据保护 | 更新后仍然保留l旧版本 | 替换 |
可伸缩性 | 轻易增加节点 | 需要中间层,牺牲性能 |
数据模型
在 HBase 表中,一条数据拥有一个全局唯一的主键(RowKey)和任意数量的列(Column Qualifier),每个列的数据存储支持多个版本(Version),一列或多列组成一个列族(Column Family),同一个列族中列的数据在物理上都存储在同一个 HFile 中。这样基于列存储的数据结构有利于数据缓存和查询。
所以,在 HBase 中定位一条数据需要通过:RowKey → Column Family → Column Qualifier → Version。
HBase 表中的数据是疏松地存储的,因此用户可以动态地为数据定义各种不同的列。HBase 中的数据按主键排序(字典序),同时 HBase 会将表按主键划分为多个 HRegion 存储在不同的 HRegionServer 上,以完成数据的分布式存储和读取。
NameSpace
命名空间类似于关系型数据库中的数据库的概念,他其实是表的逻辑分组。这种抽象为多租户相关功能奠定了基础。命名空间是可以管理维护的,可以创建,删除或更改命名空间。HBase 有两个特殊预定义的命名空间:
- default:没有明确指定命名空间的表将自动落入此命名空间
- hbase:系统命名空间,用于包含 HBase 的内部表和元数据表
Table
Table 和关系型数据库中的表一个意思,由行和列组成。
RowKey
RowKey 的概念与关系型数据库中的主键相似,是一行数据的唯一标识。RowKey 可以是任意字符串(最大长度是64KB,实际应用中长度一般为 10-100 Bytes),RowKey 以字节数组保存。存储数据时,数据会按照 RowKey 的字典序排序存储,所以设计 RowKey 时,要充分利用排序存储这个特性,将经常一起读取的行存放到一起。
访问 HBase 数据的方式有三种:
-
基于 RowKey 的单行查询;
-
基于 RowKey 的范围查询;
-
全表扫描查询。
Column Family
Column Family 即列族,HBase 基于列划分数据的物理存储,同一个列族中列的数据在物理上都存储在同一个 HFile中。一个列族可以包含任意多列,一般同一类的列会放在一个列族中,每个列族都有一组存储属性:
-
是否应该缓存在内存中;
-
数据如何被压缩或行键如何编码等。
HBase 在创建表的时候就必须指定列族。HBase 的列族不是越多越好,官方推荐一个表的列族数量最好小于或者等于三,过多的列族不利于 HBase 数据的管理和索引。
Column Qualifier
列族的限定词,理解为列的唯一标识。但是列标识是可以改变的,因此每一行可能有不同的列标识。使用的时候必须 列族:列
,列可以根据需求动态添加或者删除,同一个表中不同行的数据列都可以不同。
Timestamp
Timestamp 是实现 HBase 多版本的关键。在 HBase 中,使用不同的 Timestamp 来标识相同 RowKey 对应的不同版本的数据。相同 RowKey 的数据按照 Timestamp 倒序排列,默认查询的是最新的版本,当然用户也可以指定 Timestamp 的值 来读取指定版本的数据。
HBase 通过 RowKey 和 Column Family,Column Qualifier 来确定一个存贮单元,然后再通过时间戳来进行索引。时间戳的类型是 64 位整型,时间戳默认是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。 为了避免数据存在过多版本而造成管理(包括存贮和索引)负担,HBase 提供了两种数据版本回收方案:
-
一是保存数据的最后 n 个版本
-
二是保存最近一段时间内的版本(比如最近七天)
Cell
Cell 由 Row,Column Family,Column Qualifier,Version 组成。Cell 中的数据是没有类型的,全部使用字节码形式存贮,因为 HDFS 上的数据都是字节数组。
架构模型
HBase 可以将数据存储在本地文件系统,也可以存储在 HDFS 文件系统。在生产环境中,HBase 一般运行在 HDFS上,以 HDFS 作为基础的存储设施。用户通过 HBase Client 提供的 Shell 或 Java API 来访问 HBase 数据库,以完成数据的写入和读取。HBase 集群主要由 HMaster、HRegionServer 和 ZooKeeper 组成。
ZooKeeper
HBase 通过 ZooKeeper 来完成选举 HMaster、监控 HRegionServer、维护元数据集群配置等工作。主要工作职责如下:
- 选举 HMaster:保证任何时候,集群中只有一个 HMaster。实现 HMaster 主从节点的 Failover;
- 监控 HRegionServer(节点探活):实时监控 HRegionServer 的状态,将 HRegionServer 的上下线信息实时报告给HMaster;
- 维护元数据和集群配置:存放整个 HBase 集群的元数据以及集群的状态信息,包括:存储所有 HRegion 的寻址入口(hbase:meta 元数据表),存储所有的的元数据信息;存储 HBase 的 Schema,包括有哪些 Table,每个 Table 有哪些 Column Family。
Client
HBase Client 为用户提供了访问 HBase 的接口,可以通过元数据表(客户端负责发送请求到数据库)来定位到目标数据的 HRegionServer。客户端连接的方式有很多种:
- HBase Shell
- Java API
发送的请求主要包括:
- DDL:数据库定义语言(表的建立,删除,添加删除列族,控制版本)
- DML:数据库操作语言(增删改)
- DQL:数据库查询语言(查询,全表扫描,基于主键,基于过滤器)
Client 维护着一些 Cache 来加快对 HBase 的访问,比如 HRegione 的位置信息。
HMaster
HMaster 是 HBase 集群的主节点,负责整个集群的管理工作,HMaster 可以实现高可用(Active 和 Backup),通过ZooKeeper 来维护主备节点的切换。HMaster 主要工作职责如下:
-
管理分配:管理和分配 HRegion,负责启动的时候分配 HRegion 到具体的 HRegionServer,又或者在分割 HRegion 时关于新 HRegion 的分配。管理用户对 Table 结构的 DDL(创建,删除,修改)操作。
- 表的元数据信息存储在 ZooKeeper
- 表的数据存储在 HRegionServer 上(实际存储在 HDFS 上)
-
负载均衡:一方面负责将用户的数据均衡地分布在各个 HRegionServer 上,防止 HRegionServer 数据倾斜过载。另一方面负责将用户的请求均衡地分布在各个 HRegionServer 上,防止 HRegionServer 请求过热;
-
维护数据:发现失效的 HRegion,并将失效的 HRegion 分配到正常的 HRegionServer 上。当某个 HRegionServer 下线时迁移其内部的 HRegion 到其他 HRegionServer 上。
-
权限控制。
HRegionServer
HRegionServer 直接对接用户的读写请求,是真正干活的节点,属于 HBase 具体数据的管理者。主要工作职责如下:
- 实时和 HMaster 保持心跳,汇报当前节点的信息;
- 当接收到 HMaster 的命令创建表时,会分配一个 HRegion 对应一张表;
- 负责切分在运行过程中变得过大的 HRegion;
- 当 HRegionServer 意外关闭的时候,当前节点的 HRegion 会被其他 HRegionServer 管理;
- 维护 HMaster 分配给它的 HRegion,处理对这些 HRegion 的 IO 请求;
- 当客户端发送 DML 和 DQL 操作时,HRegionServer 负责和客户端建立连接;
- WAL:Write Ahead Log 日志先行。记录了数据写入、更新日志,它被用来做故障恢复;
- MemStore:写缓存,数据首先会被写入到 MemStore 中。每个 HRegion 的每个 Column Family 都会有一个MemStore。
- 负责与底层的 HDFS 交互,存储数据(HLog、HFile)到 HDFS。
- BlockCache:读缓存,在内存中存储了最常访问的数据,采用 LRU 机制进行淘汰。
当某个 HRegionServer 宕机后,ZooKeeper 会通知 HMaster 进行失效备援。下线的 HRegionServer 所负责的 HRegion 暂时停止对外提供服务,HMaster 会将该 HRegionServer 所负责的 HRegion 转移到其他 HRegionServer 上,并且会对下线的HRegionServer 进行日志重放,将 MemStore 中还未持久化到磁盘中的数据进行恢复。
当某台 HRegionServer Failover 的时候,整个过程中 HRegion 中的数据是不可用的,因为它是缺失的。因此,HBase 属于 CP 架构,降低了可用性,具备强一致性读/写。设想一下,如果 Redo 过程中的 HRegion 能够响应请求,那么可用性提高了,则必然返回不一致的数据(因为 Redo 可能还没完成),那么 HBase 的一致性就降低了。
HRegion
一个 HRegionServer 包含了多个 HRegion。HBase 将表中的数据基于 RowKey 的不同范围划分到不同 HRegion 上,每个HRegion 都负责一定范围的数据存储和访问。
HRegion 是 HBase 中分布式存储和负载均衡的最小单元,不同的 HRegion 可以分布在不同的 HRegionServer 上。每个表一开始只有一个 HRegion,随着数据不断插入表,HRegion 不断增大,当增大到指定阀值(10G)的时候,HRegion 就会等分成两个 HRegion,切分后其中一个 HRegion 会被转移到其他的 HRegionServer 上,实现负载均衡。
当 Table 中的行不断增多,就会有越来越多的 HRegion。为了防止前期数据的处理都集中在一个 HRegionServer,我们可以根据自己的业务进行预分区。
这样即使有一张百亿条数据的表,由于数据被划分到不同的 HRegion上,每个 HRegion 都可以独立地进行读写,HBase 读写数据的时候还可以与多 HRegion 分布式并发操作,所以访问速度并不会有太大的降低。
负载均衡
Split
在 HBase 中 Split 是一个很重要的功能,HBase 是通过把数据分配到一定数量的 HRegion 来达到负载均衡的。一个Table 会被分配到一个或多个 HRegion 中,这些 HRegion 会被分配到一个或者多个 HRegionServer 中。在自动 Split 策略中,当一个 HRegion 达到一定的大小就会自动 Split 成两个 HRegion。Table 在 HRegion 中是按照 RowKey 来排序的,并且一个 RowKey 所对应的行只会存储在一个 HRegion 中,这一点保证了 HBase 的强一致性。
当一个 Table 刚被创建的时候,HBase 默认的分配一个 HRegion 给 Table。也就是说这个时候,所有的读写请求都会访问到同一个 HRegionServer 的同一个 HRegion 中,这个时候就达不到负载均衡的效果了,集群中的其他 HRegionServer 可能处于比较空闲的状态。解决这个问题可以用 pre-splitting 在创建 Table 时提前生成多个 HRegion。
在 Table 初始化的时候如果不配置的话,HBase 是不知道如何去 Split HRegion 的,因为 HBase 不知道应该把哪个RowKey 作为 Split 的开始点。如果我们可以大概预测到 RowKey 的分布,我们可以使用 pre-spliting 来帮助我们提前 SplitHRegion。
如果我们的预测不是特别准确,还是会导致某个 HRegion 过热被集中访问,不过还好我们还有 auto-split,默认按 10G自动切分。但是如果文件到达 9G 后迟迟未到 10G 此时对于 HBase 来说是比较难受的。最好的办法就是首先预测 Split 的切分点,做 pre-splitting,后面再交给 auto-split 来处理。
HBase 在每次数据合并之后都会针对相应 HRegion 生成一个 requestSplit 请求,requestSplit 首先会执行 checkSplit,检测 FileSize 是否达到阈值,如果超过阈值,就进行切分。
HBase 自带了两种 pre-split 的算法,分别是 HexStringSplit
和 UniformSplit
。如果我们的 RowKey
是十六进制的字符串作为前缀的,就比较适合用 HexStringSplit 作为 pre-split 的算法。例如,我们使用 HexHash(prefix) 作为 RowKey的前缀,其中 HexHash 为得到十六进制字符串的 hash 算法。我们也可以用我们自己的 Split 算法。
当一个 HRegion 达到一定的大小,他会自动 Split 成两个 HRegion。如果我们的 HBase 版本是 0.94 之后,那么默认的有三种自动 Split 的策略,ConstantSizeRegionSplitPolicy,IncreasingToUpperBoundRegionSplitPolicy 还有KeyPrefixRegionSplitPolicy。
在 0.94 版本之前 ConstantSizeRegionSplitPolicy 是默认和唯一的 Split 策略。当某个 Store(对应一个 Column Family)的大小大于配置值 hbase.hregion.max.filesize 的时候(默认 10G)HRegion 就会自动分裂。
而 0.94 版本之后 IncreasingToUpperBoundRegionSplitPolicy 是默认的 Split 策略。这个策略中,最小的分裂大小和Table 的某个 HRegionServer 的 HRegion 个数有关,当 StoreFile 的大小大于以下公式得出的值的时候就会 Split。公式如下:
# R 为同一个 Table 中在同一个 HRegionServer 中的 HRegion 的个数
Min(R^2 * "hbase.hregion.memstore.flush.size", "hbase.hregion.max.filesize")
例如:
- hbase.hregion.memstore.flush.size 默认值 128MB。
- hbase.hregion.max.filesize 默认值为 10GB。
- 如果初始时
R=1
,那么Min(128MB, 10GB)=128MB
,也就是说在第一个 Flush 的时候就会触发分裂操作。 - 当
R=2
的时候Min(2*2*128MB, 10GB)=512MB
,当某个 StoreFile 大小达到 512MB 的时候,就会触发分裂。 - 如此类推,当
R=9
的时候,StoreFile 达到 10GB 的时候就会分裂,也就是说当R>=9
的时候,StoreFile 达到 10GB 的时候就会分裂。
KeyPrefixRegionSplitPolicy 可以保证相同的前缀的 RowKey 保存在同一个 HRegion 中。指定 RowKey 前缀位数划分HRegion,通过读取 KeyPrefixRegionSplitPolicy.prefix_length 属性,该属性为数字类型,表示前缀长度,在进行 Split 时,按此长度对 SplitPoint 进行截取。此种策略比较适合固定前缀的 RowKey。当 Table 中没有设置该属性,指定此策略效果等同与使用 IncreasingToUpperBoundRegionSplitPolicy。
我们可以通过配置 hbase.regionserver.region.split.policy 来指定 Split 策略,也可以写我们自己的 Split 策略。
Store
一个 HRegion 由多个 Store 组成,每个 Store 都对应一个 Column Family,Store 包含 1 个 MemStore 和 0 或多个StoreFile 组成。
-
MemStore:作为 HBase 的内存数据存储,数据的写操作会先写到 MemStore 中,当 MemStore 中的数据增长到指定阈值(默认 128M)后,HRegionServer 会启动 FlushCache 进程将 MemStore 中的数据写入 StoreFile 持久化存储,每次写入后都形成一个单独的 StoreFile。当客户端检索数据时,先在 MemStore 中查找,如果 MemStore 中不存在,则会在StoreFile 中继续查找。
-
StoreFile:MemStore 中的数据写到文件后就是 StoreFile,StoreFile 底层是以 HFile 格式保存的。HBase 以 StoreFile 的大小来判断是否需要切分 HRegion。当一个 HRegion 中所有 StoreFile 的大小和数量都增长到超过指定阈值时,HMaster会把当前 HRegion 分割为两个,切分后其中一个 HRegion 会被转移到其他的 HRegionServer 上,实现负载均衡。
-
HFile:HFile 和 StoreFile 是同一个文件,只不过站在 HDFS 的角度称这个文件为 HFile,站在 HBase 的角度就称这个文件为 StoreFile。是 HBase 在 HDFS 中存储数据的格式,它包含多层的索引,这样在 HBase 检索数据的时候就不用完全的加载整个文件。
HFile
HFile 小节内容基于 HBase官网和《HBase原理与实践》。
基本概念
官方文档:https://hbase.apache.org/book.html#hfile
StoreFile(HFile) 是 HBase 最终存储数据的介质,这里需要了解几个相关的名词:
- Block:每个 HFile 由 N 个 Block 组成。
- KeyValue:每个 Block 又是由多个 KeyValue 数据组成,KeyValue 对象是数据存储的核心,KeyValue 包装了一个字节数组,同时将偏移量 offsets 和 lengths 放入数组中,这个数组指定从哪里开始解析数据内容。
注意:KeyValue 对象不跨 Block 存储,假如这里有一个 KeyValue 的大小为 8M,即使 Block-Size=64KB,当读取该KeyValue 的时候也是以一个连贯的 Block 进行读取。
KeyValue 包装的字节数组的格式如下:
-
Key Length:存储 Key 的长度,4 个字节。
-
Value Length:存储 Value 的长度,4 个字节。
-
Key(还可以被拆解):存储数据的 Key,由 Row Key Length,Row Key,Column Family Length,Column Family,Column Qualifier,TimeStamp,Key Type 组成。
-
Value:存储 Column Qualifier 的值(用户写入的实际数据)。
而 Key 又可以拆解成以下部分:
- Row Length:存储 RowKey 的长度,2 个字节。
- Row (i.e. the RowKey):存储 RowKey 的实际内容,其大小为 Row Length。
- Column Family Length:存储 Column Family 的长度,1 个字节。
- Column Family:存储 Column Family 的实际内容,大小为 Column Family Length。
- Column Qualifier:存储 Column Qualifier 的实际内容,既然 Key 中其他所有字段的大小都知道了,整个 Key 的大小也就知道了,那么这个 Column Qualifier 大小也是明确的了,无需再存储其 Length。
- Timestamp:存储时间戳 TimeStamp,8 个字节。
- KeyType(e.g. Put, Delete, DeleteColumn, DeleteFamily):存储 Key 类型 KeyType,1 个字节。Type 分为 Put、Delete、DeleteColumn、DeleteFamilyVersion、DeleteFamily 等类型,标记这个 KeyValue 的类型。
看完以上内容,当在同一个 Row 插入 2 个 Column Qualifier 的数据时。
- Put #1:
rowkey=row1, cf:attr1=value1
- Put #2:
rowkey=row1, cf:attr2=value2
即使这是相同的 Row,KeyValue 实例也是为每个 Column Qualifier 单独创建。
Key portion for Put #1:
rowlength -----------→ 4
row -----------------→ row1
columnfamilylength --→ 2
columnfamily --------→ cf
columnqualifier -----→ attr1
timestamp -----------→ server time of Put
keytype -------------→ Put
Key portion for Put #2:
rowlength -----------→ 4
row -----------------→ row1
columnfamilylength --→ 2
columnfamily --------→ cf
columnqualifier -----→ attr2
timestamp -----------→ server time of Put
keytype -------------→ Put
由 Data Block 的结构可以看出,HBase 中数据在最底层是以 KeyValue 的形式存储的,其中 Key 是一个比较复杂的复合结构。因为任意 KeyValue 中都包含 Rowkey、Column Family 以及 Column Qualifier,因此这种存储方式实际上比直接存储 Value 占用更多的存储空间。这也是 HBase 系统在表结构设计时经常强调 Rowkey、Column Family 以及 ColumnQualifier 尽可能设置短的根本原因。假如这些描述所占字节越长,那么这个 KeyValue 越大,磁盘消耗就变大。
文件结构
HFile V1
官方文档:https://hbase.apache.org/book.html#_hfile_format_2
截止目前,HFile 一共有 3 个版本,要深入了解 HFile 的话,还得从第一个版本开始看起。
详细结构如下:
-
Data Block
:保存表中的数据,这部分可以被压缩。 -
Meta Block
:(可选)保存用户自定义的 KV 属性对,可以被压缩。 -
File Info
:HFile 的 Meta 元信息,不被压缩,定长。主要记录了文件的一些统计元信息,比较重要的是AVG_KEY_LEN 和 AVG_VALUE_LEN,分别记录了该文件中所有 Key 和 Value 的平均长度。 -
Data Block Index
:Data Block 的索引。每个 Data 块的起始点 -
Meta Block Index
:(可选)Meta Block 的索引,每个 Meta 块的起始 -
Trailer Block
:记录了 HFile 的版本信息,各个部分的偏移值和寻址信息。定长
读取一个 HFile 时,会首先读取 Trailer,由 Trailer 指向数据块的 Data Block Index。Trailer 保存了每个段的起始位置(段的 Magic Number 用来做安全 Check),然后 Data Block Index 会被读取到内存中,这样,当检索某个 Key 时,不需要扫描整个 HFile,只需要从内存中找到 Key 所在的 Block,通过一次磁盘 IO 将整个 Block 读取到内存中,再找到需要的Key 即可。
**Data Block 是 HBase I/O 的基本单元,为了提高效率,HRegionServer 提供了基于 LRU 的 Block Cache 机制。**每个Block Data 块的大小可以在创建 Table 的时候通过 BLOCKSIZE 参数指定(建议为 1024 的整数倍,默认为 65536 Byte,也就是 64KB)。
create 'my_table', {NAME => 'my_cf', BLOCKSIZE => '65536'}
**大号的 Block 有利于顺序 Scan,小号的 Block 利于随机查询。**每个 Data 块除了开头的 Magic 以外就是一个个KeyValue 拼接而成,Magic 内容就是一些随机数字,目的是防止数据损坏。
**HFile 的 Data Block,Meta Block 通常采用压缩的方式存储。**具体使用方式如下。
create 'my_table', {NAME => 'my_cf', COMPRESSION => 'SNAPPY'}
alter 'my_table', {NAME => 'my_cf', COMPRESSION => 'SNAPPY'}
第一版的 Block Index 非常简单,只有两个:一个是 Data Block Index,一个是 Meta Block Index。
Block Index 包括了以下几点:
- Offset (long)
- Uncompressed size (int)
- Key (a serialized byte array written using Bytes.writeByteArray)
- Key length as a variable-length integer (VInt)
- Key bytes
Block Index 的数量会存放在 Trailer,这样才能读取到 Block Index 的数据。这个版本的 Block Index 有一个缺点,无法知道 Block 压缩后的数据大小,这在之后的解压过程中是必要的。因此,HFile 读取器必须根据块之间的偏移量差来推断这个压缩的大小。所以在版本 2,解决了这个问题,在 Block Index 中增加了实际存储 Block 大小的数据(压缩后的数据大小)。
HFile V2/V3
接下来介绍 HFile V2 和 V3 版本,其中 V2 在 0.92 引入,V3 在 0.98 引入。HFile V1 版本除了刚才提到的问题以外,还发现在实际使用过程中占用内存较多,HFile V2 版本针对此也进行了优化,HFile V3 版本基本和 V2 版本相同,只是在Cell 层面添加了 Tag 数组的支持(增加安全方面特性,为 Cell 级别增加 ACL)。
HFile V2 在接口层面做了兼容,在读 HFile 的时候,支持 v1 和 v2;在写 HFile 的时候,只会写 v2 版本的 HFile。
HFile V3 不和 V1、V2 兼容,因为在存储 KeyValue 时,会额外的存储 Tags,用于控制 ACL。
另一方面,V2 还用到了 load-on-open
这个概念,意思是说,在打开 HFile 的时候,加载那些必要的信息,包括Trailer,Trailer 里记录了 HFile 的必要信息。而其他数据就可以在用到的时候,通过 Trailer 再解析出来。第 2 版 HFile 的结构如下:
HFile V2 主要分为四个部分:
-
Scanned Block Section
:**顺序扫描 HFile 时,会被读取的数据块。**这个部分包含 3 种数据块:Data Block,LeafIndex Block 以及 BloomBlock。- 其中 Data Block 中存储用户的 KeyValue 数据,
- Leaf Index Block 中存储索引树的叶子节点数据,
- Bloom Block 中存储布隆过滤器相关数据。
-
Non-Scanned Block Section
:顺序扫描 HFile 时,不会被读取的数据块。主要包括 Meta Block 和 IntermediateLevel Data Index Blocks 两部分。 -
Load-On-Open-Section
:**这部分数据会在 HRegionServer 打开 HFile 时直接加载到内存中,**包括 Root Data Index、Meta Index Block、FileInfo 和布隆过滤器 MetaBlock。- FileInfo 是固定长度的数据块,主要记录了文件的一些统计元信息,比较重要的是 AVG_KEY_LEN 和AVG_VALUE_LEN,分别记录了该文件中所有 Key 和 Value 的平均长度。
- Root Data Index 表示 Data Blocke、Meta Block 和 Bloom Filter 的根索引。
- 布隆过滤器 Metadata 记录了 HFile 中布隆过滤器的相关元数据。
-
Trailer
:这部分主要记录了 HFile 的版本信息、其他各个部分的偏移值和寻址信息。
HFile 物理结构如下图所示。
如上图所示, HFile 文件由各种不同类型的 Block 构成,虽然这些 Block 的类型不同,但却拥有相同的数据结构。
每个 Block 的大小可以在创建表列族的时候通过参数BLOCKSIZE => '65536'
进行指定(建议为 1024 的整数倍),默认为64k 。大号的 Block 有利于顺序 Scan,小号的 Block 利于随机查询,因而需要权衡。
所有 Block 块都拥有相同的数据结构,如上图左侧所示,HBase 将 Block 块抽象为一个统一的 HFileBlock。HFileBlock支持两种类型,一种类型支持 Checkum,一种不支持。为方便讲解,下图选用不支持 Checkum 的 HFileBlock 内部结构:
上图所示 HFileBlock 主要包括两部分:BlockHeader 和 BlockData。其中 BlockHeader 主要存储 Block 元数据,BlockData 用来存储具体数据。Block 元数据中最核心的字段是 BlockType 字段,用来标示该 Block 块的类型。HBase 中定义了 8 种 BlockType,每种 BlockType 对应的 Block 都存储不同的数据内容,有的存储用户数据,有的存储索引数据,有的存储 Meta 元数据。对于**任意一种类型的 HFileBlock,都拥有相同结构的 BlockHeader,但是 BlockData 结构却不尽相同。**下表罗列了最核心的几种 BlockType。
Trailer Block
**Trailer Block 主要记录了 HFile 的版本信息、各个部分的偏移值和寻址信息,**下图为 Trailer 内存和磁盘中的数据结构,其中只显示了部分核心字段:
**HRegionServer 在打开 HFile 时会加载所有 HFile 的 Trailer 部分以及 load-on-open 部分到内存中。实际加载过程会先解析 Trailer Block,然后再进一步加载 load-on-open 部分的数据,**具体步骤如下:
-
加载 HFile Version 版本信息,HBase 中 Version 包含 majorVersion 和 minorVersion 两部分,前者决定了 HFile 的主版本 V1、V2 还是 V3;后者在主版本确定的基础上决定是否支持一些微小修正,比如是否支持 Checksum 等。不同的版本使用不同的文件解析器对 HFile 进行读取解析。
-
**HBase 会根据 Version 信息计算 Trailer Block 的大小(不同 Version 的 Trailer Block 大小不同),再根据 Trailer Block大小加载整个 HFile Trailer Block 到内存中。**Trailer Block 中包含很多统计字段,例如,
- TotalUncompressedBytes 表示 HFile 中所有未压缩的 KeyValue 总大小。
- NumEntries 表示 HFile 中所有 KeyValue 总数目。
- CompressionCodec 表示该 HFile 所使用的压缩算法,[HBase 中压缩算法](Apache HBase® Reference Guide)主要有 BZIP2、LZ4、LZO、SNAPPY、ZSTD 等,默认为 NONE,表示不使用压缩。可以在创建表列族的时候通过参数
COMPRESSION => 'SNAPPY'
进行指定。
-
**Trailer Block 中另两个重要的字段是 LoadOnOpenDataOffset 和LoadOnOpenDataSize,前者表示 load-on-openSection 在整个 HFile 文件中的偏移量,后者表示 load-on-open Section 的大小。**根据此偏移量以及大小,HBase 会在启动后将 load-on-open Section 的数据全部加载到内存中。load-on-open 部分主要包括 FileInfo 模块、Root Data Index模块以及布隆过滤器 Metadata 模块。
-
FileInfo 是固定长度的数据块,主要记录了文件的一些统计元信息,比较重要的是 AVG_KEY_LEN 和AVG_VALUE_LEN,分别记录了该文件中所有 Key 和 Value 的平均长度。
-
Root Data Index 表示 Data Blocke、Meta Block 和 Bloom Filter 的根索引。
-
布隆过滤器 Metadata 记录了 HFile 中布隆过滤器的相关元数据。
-
Data Block
Data Block 是 HBase 中文件读取的最小单元。Data Block 中主要存储用户的 KeyValue 数据,而 KeyValue 结构是HBase 存储的核心。HBase 中所有数据都是以 KeyValue 结构存储在 Data Block 中。
内存和磁盘中的 Data Block 结构如图所示。
**KeyValue 由 4 个部分构成,分别为 Key Length、Value Length、Key 和 Value。其中,Key Length 和 Value Length 是两个固定长度的数值,Value 是用户写入的实际数据,**Key 是一个复合结构,由多个部分构成:Rowkey、Column Family、Column Qualifier、TimeStamp 以及 KeyType。其中,KeyType 有四种类型,分别是 Put、Delete、DeleteColumn 和DeleteFamily。
由 Data Block 的结构可以看出,HBase 中数据在最底层是以 KeyValue 的形式存储的,其中 Key 是一个比较复杂的复合结构。因为任意 KeyValue 中都包含 Rowkey、Column Family 以及 ColumnQualifier,因此这种存储方式实际上比直接存储Value 占用更多的存储空间。这也是 HBase 系统在表结构设计时经常强调 Rowkey、Column Family 以及 ColumnQualifier 尽可能设置短的根本原因。
布隆过滤器相关的 Block
布隆过滤器对 HBase 的数据读取性能优化至关重要。HBase 是基于 LSM 树结构构建的数据库系统,数据首先写入内存,然后异步 Flush 到磁盘形成文件。这种架构天然对写入友好,而对数据读取并不十分友好,因为随着用户数据的不断写入,系统会生成大量文件,用户根据 Key 获取对应的 Value,理论上需要遍历所有文件,在文件中查找指定的 Key,这无疑是很低效的做法。
使用布隆过滤器可以对数据读取进行相应优化,对于给定的 Key,经过布隆过滤器处理就可以知道该 HFile 中是否存在待检索 Key,如果不存在就不需要遍历查找该文件,这样就可以减少实际 IO 次数,提高随机读性能。布隆过滤器通常会存储在内存中,所以布隆过滤器处理的整个过程耗时基本可以忽略。
HBase 会为每个 HFile 分配对应的位数组,KeyValue 在写入 HFile 时会先对 Key 经过多个 Hash 函数的映射,映射后将对应的数组位置为 1,get 请求进来之后再使用相同的 Hash 函数对待查询 Key 进行映射,如果在对应数组位上存在 0,说明该 get 请求查询的 Key 肯定不在该 HFile 中。当然,如果映射后对应数组位上全部为 1,则表示该文件中有可能包含待查询 Key,也有可能不包含,需要进一步查找确认(布隆过滤器的缺点)。
可以想象,HFile 文件越大,里面存储的 KeyValue 值越多,位数组就会相应越大。一旦位数组太大就不适合直接加载到内存了,因此 HFile V2 在设计上将位数组进行了拆分,拆成了多个独立的位数组(根据 Key 进行拆分,一部分连续的 Key 使用一个位数组)。这样,一个 HFile 中就会包含多个位数组,根据 Key 进行查询时,首先会定位到具体的位数组,只需要加载此位数组到内存进行过滤即可,从而降低了内存开销。
**在文件结构上每个位数组对应 HFile 中一个 Bloom Block,因此多个位数组实际上会对应多个 Bloom Block。为了方便根据 Key 定位对应的位数组,HFile V2 又设计了相应的索引 Bloom Index Block。**对应的内存和逻辑结构如图所示。
整个 HFile 中仅有一个 Bloom Index Block 数据块,位于 load-on-open 部分。Bloom Index Block 从大的方面看由两部分内容构成,其一是 HFile 中布隆过滤器的元数据基本信息,其二是构建了指向 Bloom Block 的索引信息。
Bloom Index Block 结构中 TotalByteSize 表示位数组大小,NumChunks 表示 Bloom Block 的个数,HashCount 表示Hash 函数的个数,HashType 表示 Hash 函数的类型,TotalKeyCount 表示布隆过滤器当前已经包含的 Key 的数目,TotalMaxKeys 表示布隆过滤器当前最多包含的 Key 的数目。
Bloom Index Entry 对应每一个 Bloom Block 的索引项,作为索引分别指向 Scanned block 部分的 Bloom Block,BloomBlock 中实际存储了对应的位数组。Bloom Index Entry 的结构见上图中间部分,其中 BlockKey 是一个非常关键的字段,表示该 Index Entry 指向的 Bloom Block 中第一个执行 Hash 映射的 Key。BlockOffset 表示对应 Bloom Block 在 HFile 中的偏移量。
因此,一次 get 请求根据布隆过滤器进行过滤查找需要执行以下三步操作:
- 首先根据待查找 Key 在 Bloom Index Block 所有的索引项中根据 BlockKey 进行二分查找,定位到对应的 Bloom IndexEntry。
- 再根据 Bloom Index Entry 中 BlockOffset 以及 BlockOndiskSize 加载该 Key 对应的位数组。
- 对 Key 进行 Hash 映射,根据映射的结果在位数组中查看是否所有位都为 1,如果不是,表示该文件中肯定不存在该Key,否则有可能存在。
索引相关的 Block
**根据索引层级的不同,HFile 中索引结构分为两种: single-level
和 multi-level
,前者表示单层索引,后者表示多级索引,一般为两级或三级。**HFile V1 版本中只有 single-level 一种索引结构,V2 版本中引入多级索引。
之所以引入多级索引,是因为随着 HFile 文件越来越大,Data Block 越来越多,索引数据也越来越大,已经无法全部加载到内存中,多级索引可以只加载部分索引,从而降低内存使用空间。同布隆过滤器内存使用问题一样,这也是 V1 版本升级到 V2 版本最重要的因素之一。
V2 版本 Index Block 有两类:Root Index Block 和 NonRoot Index Block。NonRoot Index Block 又分为 IntermediateIndex Block 和 Leaf Index Block 两种。
**HFile 中索引是树状结构,Root Index Block 表示索引数根节点,Intermediate Index Block 表示中间节点,Leaf IndexBlock 表示叶子节点,叶子节点直接指向实际 Data Block,**如图所示。
需要注意的是,这三种 Index Block 在 HFile 中位于不同的部分,
- Root Index Block 位于“load-on-open”部分,会在 HRegionServer 打开 HFile 时加载到内存中。
- Intermediate Index Block 位于“Non-Scanned block”部分,Leaf Index Block 位于“Scanned block”部分,
- Leaf Index Block 直接指向实际 Data Block。
HFile 中除了 Data Block 需要索引之外,Bloom Block 也需要索引,Bloom 索引结构实际上采用了单层结构,BloomIndex Block 就是 Root Index Block 其中一种。
**对于 Data Block,由于 HFile 刚开始数据量较小,索引采用单层结构,只有 Root Index Block 一层索引,直接指向Data Block。当数据量慢慢变大,Root Index Block 大小超过阈值之后,索引就会分裂为多级结构,由一层索引变为两层,根节点指向叶子节点,叶子节点指向实际 Data Block,如果数据量再变大,索引层级就会变为三层。**如下图。
索引流程
基本的索引流程为:由 Root Data Index 索引到 Intermediate Block Index,再由 Intermediate Block Index 索引到 LeafIndex Block,最后由 Leaf Index Block 查找到对应的 Data Block。在实际场景中,Intermediate Block Index 基本上用不到。因此,索引逻辑被简化为:**由 Root Data Index 直接索引到 Leaf Index Block,再由 Leaf Index Block 查找到的对应的 DataBlock。**如下图。
下面针对 Root index Block 和 NonRoot index Block 两种结构进行解析(Intermediate Index Block 和 Leaf Index Block 在内存和磁盘中存储格式相同,都为 NonRoot Index Block 格式)。
Root Index Block
**Root Index Block 表示索引树根节点索引块,既可以作为 Bloom Block 的直接索引,也可以作为 Data Block 多极索引树的根索引。**对于单层和多级这两种索引结构,对应的 Root Index Block 结构略有不同,单层索引结构是多级索引结构的一种简化场景。本节以多级索引结构中的 Root Index Block 为例进行分析,图为 Root Index Block 的结构图。
图中,Index Entry 表示具体的索引对象,每个索引对象由 3 个字段组成:Block Offset 表示索引指向 Data Block 的偏移量,BlockDataSize 表示索引指向 Data Block 在磁盘上的大小,BlockKey 表示索引指向 Data Block 中的第一个 Key。
除此之外,还有另外 3 个字段用来记录 MidKey 的相关信息,这些信息用于在对 HFile 进行 Split 操作时,快速定位HFile 的切分点位置。需要注意的是单层索引结构和多级索引结构相比,仅缺少与 MidKey 相关的这三个字段。
Root Index Block 位于整个 HFile 的“load-on-open”部分,因此会在 HRegionServer 打开 HFile 时直接加载到内存中。此处需要注意的是,在 Trailer Block 中有一个字段为 DataIndexCount,表示 Root Index Block 中 Index Entry 的个数,只有知道 Entry 的个数才能正确地将所有 Index Entry 加载到内存。
NonRoot Index Block
当 HFile 中 Data Block 越来越多,单层结构的根索引会不断膨胀,超过一定阈值之后就会分裂为多级结构的索引结构。多级结构中根节点是 Root Index Block。而索引树的中间层节点和叶子节点在 HBase 中存储为 NonRoot IndexBlock,但从 Block 结构的视角分析,无论是中间节点还是叶子节点,其都拥有相同的结构,如图所示。
和 Root Index Block 相同,NonRoot Index Block 中最核心的字段也是 IndexEntry,用于指向叶子节点块或者 DataBlock。不同的是,NonRoot Index Block 结构中增加了 Index Entry 的内部索引 Entry Offset 字段,Entry Offset 表示IndexEntry 在该 Block 中的相对偏移量(相对于第一个 Index Entry),用于实现 Block 内的二分查找。通过这种机制,所有非根节点索引块(包括 Intermediate Index Block 和 Leaf Index Block)在其内部定位一个 Key 的具体索引并不是通过遍历实现,而是使用二分查找算法,这样可以更加高效快速地定位到待查找 Key。
Why Fast
分层索引
无论是 Data Block Index 还是 Bloom Filter,都采用了分层索引的设计。
Data Block 的索引,在 HFile V2 中最多可支持三层索引:
- 最底层的 Data Block Index 称之为 Leaf Index Block,可直接索引到 Data Block;
- 中间层称之为 Intermediate Index Block,
- 最上层称之为 Root Data Index,Root Data index 存放在 Load-on-open Section 区域,会在 HRegionServer 打开 HFile时加载到内存中。
基本的索引流程为:由 Root Data Index 索引到 Intermediate Block Index,再由 Intermediate Block Index 索引到 LeafIndex Block,最后由 Leaf Index Block 查找到对应的 Data Block。在实际场景中,Intermediate Block Index 基本上用不到。因此,索引逻辑被简化为:由 Root Data Index 直接索引到 Leaf Index Block,再由 Leaf Index Block 查找到的对应的 DataBlock。
交叉存储
**HFile V1 的设计导致了 HRegionServer 启动时间过长,需要加载很大的数据量,比如大量的 Bloom Filter,大量的Block Index。为了解决这个问题,V2 使得 HFile 增加了新特性,把 Bloom Filter 和 Block Index 打散,写到多个 Block 中去,这样就减少了 HFile 写入时候的内存 Offset 记录。**并且这些打散的 Block Index 会有预定的长度。
Bloom Filter 包含 Bloom 元数据(Hash 函数类型,Hash 函数个数等)与位数组(Bloom Data),为了避免每一次读取时加载所有的 Bloom Data,HFile V2 中将 Bloom Data 部分分成了多个小的 Bloom Block。
Bloom Data 数据也被当成一类 Inline Block,与 Data Block、Leaf Index Block 交叉存在,而关于 Bloom Filter 的元数据与多个 Bloom Block 的索引信息,也被存放在 Load-On-Open Section 部分。
但需要注意的是,关于 Bloom Filter 的配置类型信息,保存在 FileInfo 部分,可以在创建表列族的时候通过参数BLOOMFILTER => 'ROW'
进行指定。共包含三种类型:
- 不启用;
- 基于 Row 构建 Bloom Filter;
- 基于 Row + Column 构建 Bloom Filter。
混合了 Bloom Filter Block 以后的 HFile 构成如下图所示:
按需读取
无论是 Data Block 的索引数据,还是 Bloom Filter 数据,都被拆成了多个 Block,基于这样的设计,无论是索引数据,还是 Bloom Filter,都可以按需读取,避免在读取阶段一次读入大量的数据,有效降低时延。
总结:
HFile v2/v3
HFile2版本有一个缺点,无法知道数据解压后的大小。
HFile2版本进行了内存优化,添加了一个属性,可以知道解压后数据的大小。
HFile3版本在V2版本的基础上,添加了细粒度的cell级别的权限控制。
此时,版本比较多了,为了解决版本相关问题,添加了Trailer Block,用来记录版本这些必要的信息。
HLog
一个 HRegionServer 只有一个 HLog 文件。负责记录数据的操作日志,当 HBase 出现故障时可以进行日志重放、故障恢复。例如磁盘掉电导致 MemStore 中的数据没有持久化存储到 StoreFile,这时就可以通过 HLog 日志重放来恢复数据。
HLog 文件就是一个普通的 Hadoop Sequence File,Sequece File 的 Key 是 HLogKey 对象,Sequece File 的 Value 是HBase 的 KeyValue 对象,即本次的操作。
HLogKey 中记录了写入数据的归属信息,除了 Table 和 HRegion 名称外,同时还包括 Sequence Number 和Timestamp:
- Timestamp:写入时间。
- Sequence Number:起始值为 0,或者是最近一次存入文件系统中的 Sequence Number。
数据被写入 WAL 后,会被加入到 MemStore 即写缓存。然后服务端就可以向客户端返回 ack 表示写数据完成。
HDFS
HDFS 为 HBase 提供底层数据存储服务,同时为 HBase 提供高可用支持。HBase 将 HLog 存储在 HDFS 上,当服务器发生异常宕机时,可以重放 HLog 来恢复数据。
比如:MySQL 数据直接落入磁盘,HBase 数据落入 HDFS,HDFS 数据落入磁盘。
BlockCache
众所周知,提升数据库读取性能的一个核心方法是,尽可能将热点数据存储到内存中,以避免昂贵的 IO 开销。现代系统架构中,诸如 Redis 这类缓存组件已经是体系中的核心组件,通常将其部署在数据库的上层,拦截系统的大部分请求,保证数据库的“安全”,提升整个系统的读取效率。
同样为了提升读取性能,HBase 也实现了一种读缓存结构 BlockCache。**客户端读取某个 Block,首先会检查该 Block是否存在于 BlockCache,如果存在就直接加载出来,如果不存在就去 HFile 文件中加载,加载出来后再放到 BlockCache中,后续同一请求或者邻近数据查找请求可以直接从内存中获取,以避免昂贵的 IO 开销。**从字面意思可以看出来,BlockCache 主要用来缓存 Block。
HBase 在实现中提供了两种缓存结构:MemStore 和 BlockCache。MemStore 作为 HBase 的写缓存,保存着数据的最近一次更新;BlockCache 作为 HBase 的读缓存,保存着最近被访问的数据块。
需要关注的是, Block 是 HBase 中最小的数据读取单元,即数据从 HFile 中读取都是以 Block 为最小单元执行的。
BlockCache 是 RegionServer 级别的,一个 RegionServer 只有一个 BlockCache,在 RegionServer 启动时完成BlockCache 的初始化工作。
BlockCache 名称中的 Block 指的是 HBase 的 Block,分别有以下几种:
- DATA
- ENCODED_DATA
- LEAF_INDEX
- BLOOM_CHUNK
- META
- INTERMEDIATE_INDEX
- RO0T_INDEX
- FILE_INFO
- GENERAL_BLOOM_META
- DELETE_FAMILY_BLOOM_META
- TRAILER
- INDEX_V1
- ……
到目前为止,HBase 先后实现了 3 种 BlockCache 方案:
LRUBlockCache
:最早的实现方案,也是默认的实现方案(LRUBlockCache + BucketCache);SlabCache
:HBase 0.92 版本实现的第二种方案,参见 HBase-4027;BucketCache
:HBase 0.96 之后官方提供的另一种可选方案,参见 HBASE-7404。
这 3 种方案的不同之处主要在于内存管理模式,其中 LRUBlockCache 是将所有数据都放入 JVMHeap 中,交给 JVM进行管理。而后两种方案采用的机制允许将部分数据存储在堆外。
这种演变本质上是因为 LRUBlockCache 方案中 JVM 垃圾回收机制经常导致程序长时间暂停,而采用堆外内存对数据进行管理可以有效缓解系统长时间 GC。
BlockCache 默认是开启的,不需要做额外的事情去开启 BlockCache。如果想让某个列族不使用 BlockCache,可以通过以下命令关闭它:
create 'my-table', {NAME => 'my-cf', BLOCKCACHE => 'false'}
alter 'my-table', CONFIGURATION => {NAME => 'my-cf', BLOCKCACHE => 'false'}
LRUBlockCache
LRUBlockCache 是完全基于 JVM heap 的 LRU 方案,在 0.92 版本之前只有这种 BlockCache 的实现方案,LRU 就是Least Recently Used,即近期最少使用算法。读出来的 Block 会被放到 BlockCache 中待下次查询使用。当缓存满了的时候,会根据 LRU 的算法来淘汰 Block。每次访问数据都会将其放在我们的队首,如果需要淘汰数据,就只需要淘汰队尾即可。这里有个问题,如果有个数据在 1 分钟内被访问了 1000 次,接下来 1 分钟没有访问该数据,但是有其他的数据被大量访问,就会导致这个热点数据被淘汰。
LRUBlockCache 被分为三个区域,如下表所示。
区域名称 | 占用比例 | 说明 |
---|---|---|
single-access | 25% | 单次读取区域。 Block被读出后先放到这个区域,当被读到多次后会升级到下一个区域 |
multi-access | 50% | 多次读取区域。当一个Block被缓存到单次读取区域后又被多次访问,会升级到这个区域 |
in-memory | 25% | 这个区域跟Block被访问了几次没有关系, 它只存放那些列族属性中被设置了IN_MEMORY => 'true' 的Block |
# Java API 设置 in-memory
HColumnDescriptor.setInMemory(true);
# Shell API 设置 in-memory
create 't', {NAME => 'f', IN_MEMORY => 'true'}
看起来是不是很像 JVM 的新生代、老年代、永久代(JDK1.8 已彻底移除,替换为了元空间)?没错,这个方案就是模拟 JVM 的分代设计而做的。
列族被设置为 IN-MEMORY 并不是意味着这个列族是存储在内存中的,这个列族依然是跟别的列族一样存储在硬盘上。一般的 Block 被第一次读出后是放到 single-access
,只有当被访问多次后才会放到 multi-access
,而带有IN_MEMORY 属性的列族中的 Block 一开始就会被放到 in-memory 区域。这个区域的缓存有最高的存活时间,在需要淘汰Block 的时候,这个区域的 Block 是最后被考虑到的,所以这个属性仅仅是为了 BlockCache 而创造的。
目前 BlockCache 的堆内内存方案就只有 LRUBlockCache,可以通过将 hfile.block.cache.size
设置为 0 来关闭该方案,但是不推荐。相关配置如下,默认为 0.4,表示分配给 StoreFile 使用的块缓存的最大堆(-Xmx setting)。
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value>
</property>
计算 HBase 中有多少内存可用于缓存的方法如下:
number of region servers * heap size(-Xms1024m初始堆和-Xmx1024m最大堆) * hfile.block.cache.size * 0.99
块缓存的默认值为 0.4,表示可用堆的 40%。最后一个值(99%)是在驱逐开始之后 LRU 缓存中的默认可接受加载因子。它被包含在这个等式中的原因是,说可以使用 100% 的可用内存是不现实的,因为这会使得该过程从加载新块的位置阻塞。下面是一些示例:
- 1 个 RegionServer,堆大小设置为 1GB,BlockCache 大小约为 405MB。
- 20 个 RegionServer,堆大小设置为 8GB,BlockCache 大小约为 63.3GB 。
- 100 个 RegionServer,堆大小设置为 24GB,并且 hfile.block.cache.size 为 0.5,BlockCache 大小约为 1.16TB。
设置 hfile.block.cache.size
的时候要注意在 HBase 的内存使用上有一个规则那就是 MemStore + BlockCache的内存占用比例不能超过 0.8 (即 80%),否则就会报错。因为必须要留 20%作为机动空间。用配置项来说明就是:hbase.regionserver.global.memstore.size
(默认为 0.4) + hfile.block.cache.size
(默认为 0.4) <= 0.8。
值得一提的是,这两个配置项的默认值都是 0.4,也就是说默认项的总和就己经达到了他们俩可以占用的内存比例上限了,所以基本没事就不用去加大这两个配置项,你调大哪一个,都必须相应地调小另外一个。
BlockCache 可以带来很多好处,就是一个菜鸟都可以想到用内存来做缓存提高读取性能,但是 LRUBlockCache 有什么坏处呢?完全基于 JVM Heap 的缓存,势必带来一个后果:随着内存中对象越来越多,每隔一段时间都会引发一次 FullGC。在 Full GC 的过程中,整个 JVM 完全处于停滞状态(Stop the Word),有的时候长达几分钟。
SlabCache
为了解决 LRUBlockCache 方案中因 JVM 垃圾回收导致的服务中断问题,SlabCache 方案提出使用 Java NIO DirectByteBuffer 技术实现堆外内存存储,不再由 JVM 管理数据内存。
默认情况下,系统在初始化的时候会分配两个缓存区,分别占整个 BlockCache 大小的 80% 和 20%,每个缓存区分别存储固定大小的 Block,其中前者主要存储小于等于 64K 的 Block,后者存储小于等于 128K 的 Block,如果一个 Block太大就会导致两个区都无法缓存。
和 LRUBlockCache 相同,SlabCache 也使用 Least-Recently-Used 算法淘汰过期的 Block。和 LRUBlockCache 不同的是,SlabCache 淘汰 Block 时只需要将对应的 BufferByte 标记为空闲,后续 Cache 对其上的内存直接进行覆盖即可。
线上集群环境中,不同表不同列簇设置的 BlockSize 都可能不同,很显然,默认只能存储小于等于 128KB Block 的SlabCache 方案不能满足部分用户场景。比如,用户设置 BlockSize = 256K,简单使用 SlabCache 方案就不能达到缓存这部分 Block 的目的。因此 HBase 在实际实现中将 SlabCache 和 LRUBlockCache 搭配使用,称为 DoubleBlockCache 。 具体查询流程:在一次随机读中,一个 Block 从 HDFS 中加载出来之后会在两个 Cache 中分别存储一份。缓存读时首先在 LRUBlockCache 中查找,如果 Cache Miss 再在 SlabCache 中查找,此时如果命中,则将该 Block 放入LRUBlockCache 中。
但是,经过实际测试,DoubleBlockCache 方案有很多弊端。比如,SlabCache 中固定大小内存设置会导致实际内存使用率比较低,而且使用 LRUBlockCache 缓存 Block 依然会因为 JVM GC 产生大量内存碎片。因此在 HBase 0.98 版本之后,已经不建议使用该方案(已废弃)。
BucketCache
基本介绍
BucketCache 借鉴了 SlabCache 的创意,也用上了堆外内存。不过它是这么用的:相比起只有 2 个区域的SlabeCache,BucketCache 一上来就分配了 14 种区域。
这 14 种区域分别放的是大小为 4KB、8KB、16KB、32KB、40KB、 48KB、56KB、64KB、96KB、128KB、192KB、256KB、384KB、 512KB 的 Block。而且这个种类列表还是可以手动通过设置 hbase.bucketcache.bucket.sizes
属性来定义,如下。这 14 种类型可以分配出很多个 Bucket。
<property>
<name>hbase.bucketcache.bucket.sizes</name>
<value>4096,8192</value>
</property>
**BucketCache 的存储不一定要使用堆外内存,是可以自由在 3 种存储介质直接选择:**堆外(offheap)、文件(file)、mmaped file(memory-mapped file 内存映射文件)。通过设置hbase.bucketcache.ioengine
为 offheap、file 或者mmaped file 来配置,如下。
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
每个 Bucket 的大小上限为最大尺寸的 Block * 4,比如可以容纳的最大的 Block 类型是 512KB,那么每个 Bucket 的大小就是 512KB * 4 = 2048KB。 系统一启动 BucketCache 就会把可用的存储空间按照每个 Bucket 的大小上限均分为多个 Bucket。如果划分完的数量比你的种类还少,比如比 14 (默认的种类数量)少,就会直接报错,因为每一种类型的 Bucket 至少要有一个 Bucket。 BucketCache 实现起来的样子就像如下图所示(每个区域的大小都是 512 * 4)。
BucketCache 还有一个特别的长处,那就是它自己来划分内存空间、自己来管理内存空间,Block 放进去的时候是考虑到 offset 偏移量的,所以内存碎片少,发生 GC 的时间很短。
还有一点是为什么存储介质会有 file?我们用缓存不就是为了使用内存,然后利用内存比硬盘快得优势来提高读写的性能吗?大家不要忘记了还有 SSD 硬盘,最开始设计这种策略的初衷就是想把 SSD 作为一层比传统机械硬盘更快的缓存层来使用,所以你可以把 file 这种类型等同于 SSD-file。
使用 BucketCache 有以下的好处:
- 这是第一个可以使用 SSD 硬盘的缓存策略,这是最大的亮点;
- 这种策略极大地改进了 SlabCache 使用率低的问题;
- 配置极其灵活,可以适用于多种场景。
在实际测试中也表现出了很高的性能,所以 HBase 就顺理成章地把 SlabCache 废弃了。off-heap 的延迟可以接近于on-heap 的延迟,因为 off-heap 并不会引起 GC 操作。
相关配置
BucketCache 相关配置项如下:
-
hbase.bucketcache.ioengine
:使用的存储介质,可选值为 offheap、file、mmaped file。默认为 offheap。 -
hbase.bucketcache.combinedcache.enabled
:是否打开组合模式(CombinedBlockCache)。默认为 true。 -
hbase.bucketcache.size
:BucketCache 所占的大小,如果设置为 0.0~1.0,则代表了占堆内存的百分比。如果是大于 1 的值,则代表实际的 BucketCache 的大小,单位为 MB。为什么要一个参数两用这么别扭呢?因为 BucketCache既可以用于堆内存,还可以用于堆外内存和硬盘。默认值为 0.0 ,即关闭 BucketCache。 -
hbase.bucketcache.bucket.sizes
:定义所有 Block 种类,默认为 14 种,种类之间用逗号分隔。单位为 B,每一种类型必须是 1024 的整数倍,否则会报异常:java.io.I0Exception: Invalid HFile block magic
。 -
-XX:MaxDirectMemorySize
:这个参数不是在 hbase-site.xml 文件中配置的,而是 JVM 启动的参数。如果你不配置这个参数,JVM 会按需索取堆外内存;如果你配置了这个参数,你可以定义 JVM 可以获得的堆外内存上限。显而易见的,这个参数值必须比 hbase.bucketcache.size 大。
以前还有一个 hbase.bucketcache.combinedcache.percentage
配置项,用于配置 BucketCache 在组合模式中的百分比,后来被改名为hbase.bucketcache.percentage.in.combinedcache
,最后被废弃了。原因是这个参数太难让人理解了,直接用 hbase.bucketcache.size
就行了。
hbase-site.xml 演示案例如下。
<!-- LRUBlockCache 块缓存的最大堆 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.2</value>
</property>
<!-- BucketCache 使用的存储介质 -->
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
<!-- 是否打开组合模式 -->
<property>
<name>hbase.bucketcache.combinedcache.enabled</name>
<value>true</value>
</property>
<!-- BucketCache 所占的内存大小 -->
<property>
<name>hbase.bucketcache.size</name>
<value>4196</value>
</property>
<!-- 定义 BucketCache 的 Block 种类 -->
<property>
<name>hbase.bucketcache.bucket.sizes</name>
<value>4096,8192</value>
</property>
在 HBase 2.0.0 版本前,当配置了 BucketCache 且 hbase.bucketcache.combinedcache.enabled 设置为 false 时,所有数据以及 Index 块会首先被缓存在 L1(LRUCache)。当 L1 中发生缓存清除(置换)时,被置换出的数据块会被移动到 L2(BucketCache)。如果你想让某个列族不使用 BucketCache,你可以使用以下命令:
# Java API 设置
HColumnDescriptor.setCacheDataInL1(true);
# Shell API 设置
create 't', {NAME => 't', CONFIGURATION => {CACHE_DATA_IN_L1 => 'false'}}
alter 'my-table', CONFIGURATION => {NAME => 'my-cf', CACHE_DATA_IN_L1 => 'false'}
意思是数据只缓存在一级缓存(LRUCache)中,不使用二级缓存(BucketCache)。
这种部署模式被称为 Raw L1+L2。注意:此 L1+L2 模式已从 2.0.0 版本中移除。
从 HBase 2.0.0 开始,L1 与 L2 的概念便被弃用。当 BucketCache 启用时,数据块(DATA Blocks)会一直保存于BucketCache;INDEX/BLOOM 块会保存于 LRUBlockCache 的堆内存,所以 CACHE_DATA_IN_L1 的配置也被移除了。
Combined BlockCahce (组合模式)
前面说了 BucketCache 那么多好处,那么是不是 BucketCache 就完爆 LRUCache 了?答案是没有,在很多情况下倒是LRUCache 完爆 BucketCache。虽然后面有了 SlabCache 和 BucketCache,但是这些 Cache 从速度和可管理性上始终无法跟完全基于内存的 LRUCache 相媲美。虽然 LRUCache 有严重的 Full GC 问题,但是 HBase 却一直都没有放弃 LRUCache。所以,不要不经过测试比较就直接换策略。
在 SlabCache 的时代,SlabCache 是跟 LRUCache 一起使用的,每一个 Block 被加载出来都是缓存两份,一份在SlabCache 一份在 LRUCache,这种模式称之为 DoubleBlockCache。读取的时候 LRUCache 作为 L1 层缓存(一级缓存),把 SlabCache 作为 L2 层缓存(二级缓存)。
在 BucketCache 的时代,也不是单纯地使用 BucketCache,但是这不是一二级缓存的结合,而是另一种模式,叫组合模式(Combined BlockCahce)。具体地说就是把不同类型的 Block 分别放到 LRUCache 和 BucketCache 中。
- IndexBlock 和 BloomBlock 会被放到 LRUCache 中;
- DataBlock 被直接放到 BucketCache 中,所以会先去 LRUCache 查询以下,然后再去 BucketCache 查询真正的数据。
其实这种实现是一种更合理的二级缓存,数据从一级缓存到二级缓存最后到硬盘,数据是从小到大,存储介质也是由快到慢。考虑到成本和性能的组合,比较合理的介质是:LRUCache 使用内存 => BuckectCache 使用 SSD => HFile 使用机械硬盘。
提示:设置 hbase.bucketcache.ioengine,开启 hbase.bucketcache.combinedcache.enabled 且 hbase.bucketcache.size>0即可启用 Combined BlockCache。
关于 LRUBlockCache 和 BucketCache 单独使用谁比较强,曾经有人做过一个测试,并写了一篇报告出来,标题为Comparing BlockCache Deploys,结论是:
-
因为 BucketCache 自己控制内存空间,碎片比较少,所以 GC 时间大部分都比 LRUCache 短。
-
在缓存全部命中的情况下,LRUCache 的吞土量是 BucketCache 的两倍;在缓存基本命中的情况下,LRUCache 的吞吐量跟 BucketCache 基本相等。
-
读写延迟,IO 方面两者基本相等。
-
缓存全部命中的情况下,LRUCache 比使用 file 模式的 BucketCacheCPU 占用率低一倍,但是跟其他情况下差不多。
从整体上说 LRUCache 的性能好于 BucketCache,但由于 Full GC 的存在,在某些时刻 JVM 会停止响应,造成服务不可用。所以适当的搭配 BucketCache 可以缓解这个问题。
如何看缓存命中率?因为 HBase 搭建在 Hadoop 之上,所以只需要打开 Hadoop Metrics,查看hbase.regionserver.blockCacheHitRatio
即可。该值的取值范围为 0.0~1.0。
BucketCache 中 Block 的缓存写入/读取流程
下图所示是 Block 写入缓存以及从缓存中读取 Block 的流程,图中主要包括 5 个模块:
-
RAMCache 是一个存储 blockKey 和 Block 对应关系的 HashMap。
-
WriteThead 是整个 Block 写入的中心枢纽,主要负责异步地将 Block 写入到内存空间。
-
BucketAllocator 主要实现对 Bucket 的组织管理,为 Block 分配内存空间。
-
IOEngine 是具体的内存管理模块,将 Block 数据写入对应地址的内存空间。
-
BackingMap 也是一个 HashMap,用来存储 blockKey 与对应物理内存偏移量的映射关系,并且根据 blockKey 定位具体的 Block。图中实线表示 Block 写入流程,虚线表示 Block 缓存读取流程。
Block 缓存写入流程
- 将 Block 写入 RAMCache。实际实现中,HBase 设置了多个 RAMCache,系统首先会根据 blockKey 进行 hash,根据hash 结果将 Block 分配到对应的 RAMCache 中。
- WriteThead 从 RAMCache 中取出所有的 Block。和 RAMCache 相同,HBase 会同时启动多个 WriteThead 并发地执行异步写入,每个 WriteThead 对应一个 RAMCache。
- 每个 WriteThead 会遍历 RAMCache 中所有 Block,分别调用 bucketAllocator 为这些 Block 分配内存空间。
- BucketAllocator 会选择与 Block 大小对应的 Bucket 进行存放,并且返回对应的物理地址偏移量 offset。
- WriteThead 将 Block 以及分配好的物理地址偏移量传给 IOEngine 模块,执行具体的内存写入操作。
- 写入成功后,将 blockKey 与对应物理内存偏移量的映射关系写人 BackingMap 中,方便后续查找时根据 blockKey 直接定位。
Block 缓存读取流程
- 首先从 RAMCache 中查找。对于还没有来得及写入 Bucket 的缓存 Block,一定存储在 RAMCache 中。
- 如果在 RAMCache 中没有找到,再根据 blockKey 在 BackingMap 中找到对应的物理偏移地址量 offset。
- 根据物理偏移地址 offset 直接从内存中查找对应的 Block 数据。
压缩 BlockCache
HBASE-11331引入了惰性 BlockCache 解压缩,通常称为压缩 BlockCache。启用压缩 BlockCache 时,数据和编码数据块会以它们在磁盘上的格式的形式缓存在 BlockCache 中,而不是在缓存之前解压缩和解密。与默认的模式不同点在于:默认情况下,在缓存一个数据块时,会先解压缩、解密,然后存入缓存(因为数据块是从 HDFS 取)。而惰性 BlockCache解压缩会直接将数据块存入缓存。
对于托管的数据量超过缓存容量的 HRegionServer,启用此功能并使用 SNAPPY 压缩,实验证明:吞吐量增加 $50%$ ,平均延迟改善 $30%$ ,同时垃圾回收增加 $80%$ ,但是会增加总体 CPU 负载 $2%$ 。有关性能如何衡量和实现的更多详细信息,请参阅 HBASE-11331。对于托管的数据可以轻松适应缓存的 HRegionServer,或者如果您的工作负载对额外的 CPU 或垃圾回收负载比较敏感,可能只会获得较少的好处。
BlockCache 的压缩功能默认是关闭的,要启用该功能,请在所有 HRegionServer 的 hbase-site.xml 中将hbase.block.data.cachcompressed
设置为 true。
集群搭建
目标环境
前提:Hadoop 正常运行,ZooKeeper 正常运行,Hive 正常运行,服务器时间同步。
安装
解压
将准备好的安装包上传至 node01,然后解压:
[root@node01 ~]# tar -zxvf hbase-2.5.3-bin.tar.gz -C /opt/yjx/
[root@node01 ~]# rm hbase-2.5.3-bin.tar.gz -rf
修改配置文件
修改环境配置文件 hbase-env.sh
:
[root@node01 ~]# cd /opt/yjx/hbase-2.5.3/conf/
[root@node01 conf]# vim hbase-env.sh
在文件末尾添加以下内容:
export JAVA_HOME=/usr/java/jdk1.8.0_351-amd64
export HADOOP_HOME=/opt/yjx/hadoop-3.3.4/
export HBASE_MANAGES_ZK=false
export HBASE_LOG_DIR=${HBASE_HOME}/logs
# HBase 的 Jar 包和 Hadoop 的 Jar 包会有冲突,会导致服务启动失败。设置以下参数禁止启动时扫描 Hadoop 的 Jar包
export HBASE_DISABLE_HADOOP_CLASSPATH_LOOKUP="true"
修改配置文件 hbase-site.xml
:
[root@node01 conf]# vim hbase-site.xml
首先删除 `configuration` 节点中的所有内容,然后再在 `configuration `节点中添加以下内容:
<!-- 设置 HBase 数据存储的位置(存储在 HDFS 上的位置) -->
<!-- 使用本地文件系统例如:file:///root/hbase/data -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hdfs-yjx/hbase</value>
</property>
<!-- 是否为分布式模式部署,true 表示分布式部署 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 设置 HBase 的 ZK 集群地址,以逗号分隔 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>node01,node02,node03:2181</value>
</property>
<!-- 设置 HBase 在 ZK 上的数据根目录 znode 节点名称 -->
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<!-- 本地文件系统的临时目录,默认在 /tmp,/tmp 会在服务器重启时被清除,一般配置成本地文件模式时才需要设置 -->
<property>
<name>hbase.tmp.dir</name>
<value>/var/yjx/hbase</value>
</property>
<!-- 控制 HBase 是否检查流功能(hflush/hsync),如果要在 HDFS 系统上运行,请禁用此选项 -->
<!-- 简单的理解就是:使用 HDFS 存储将其设置为 false,使用本地文件系统将其设置为 true -->
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
修改 regionservers
:
[root@node01 conf]# vim regionservers
用以下内容替换文件内容:
node01
node02
node03
添加备用主机 backup-masters
:
[root@node01 conf]# vim backup-masters
用以下内容替换文件内容:
node02
拷贝 Hadoop 配置文件至 HBase。
[root@node01 conf]# cp /opt/yjx/hadoop-3.3.4/etc/hadoop/core-site.xml /opt/yjx/hbase-2.5.3/conf/
[root@node01 conf]# cp /opt/yjx/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /opt/yjx/hbase-2.5.3/conf/
拷贝至其他节点
将 node01 已配置好的 HBase 拷贝至 node02 和 node03。
[root@node02 ~]# scp -r root@node01:/opt/yjx/hbase-2.5.3 /opt/yjx/
[root@node03 ~]# scp -r root@node01:/opt/yjx/hbase-2.5.3 /opt/yjx/
# 或者使用分发脚本
[root@node01 ~]# yjxrsync /opt/yjx/hbase-2.5.3
修改环境变量
三个节点修改环境变量 vim /etc/profile
,在文件末尾添加以下内容:
export HBASE_HOME=/opt/yjx/hbase-2.5.3
export PATH=$HBASE_HOME/bin:$PATH
修改完成后 source /etc/profile
重新加载环境变量。
启动
启动 ZooKeeper(三台机器都需要执行)。
zkServer.sh start
zkServer.sh status
启动 HDFS。
[root@node01 \~]# start-all.sh
启动 HBase。
[root@node01 \~]# start-hbase.sh
访问
先使用jps
命令查看各节点服务运行情况。
Web 访问:http://node01:16010 结果如下。
Web 访问:http://node02:16010 结果如下。
关闭
先关闭 HBase。
[root@node01 ~]# stop-hbase.sh
再关闭 HDFS。
[root@node01 ~]# stop-all.sh
再关闭 ZooKeeper(三台机器都需要执行)。
zkServer.sh stop
环境搭建成功后删除安装包, shutdown -h now
关机拍摄快照。
交互方式
HBase Shell
命令分类
-
general
:processlist, status, table_help, version, whoami -
ddl
:alter, alter_async, alter_status, clone_table_schema, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters -
namespace
:alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables -
dml
:append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve -
tools
:assign, balance_switch, balancer, balancer_enabled, catalogjanitor_enabled, catalogjanitor_run, catalogjanitor_switch, cleaner_chore_enabled, cleaner_chore_run, cleaner_chore_switch, clear_block_cache, clear_compaction_queues, clear_deadservers, clear_slowlog_responses, close_region, compact, compact_rs, compaction_state, compaction_switch, decommission_regionservers, flush, flush_master_store, get_balancer_decisions, get_balancer_rejections, get_largelog_responses, get_slowlog_responses, hbck_chore_run, is_in_maintenance_mode, list_deadservers, list_decommissioned_regionservers, list_liveservers, list_unknownservers, major_compact, merge_region, move, normalize, normalizer_enabled, normalizer_switch, recommission_regionserver, regioninfo, rit, snapshot_cleanup_enabled, snapshot_cleanup_switch, split, splitormerge_enabled, splitormerge_switch, stop_master, stop_regionserver, trace, unassign, wal_roll, zk_dump -
replication
:add_peer, append_peer_exclude_namespaces, append_peer_exclude_tableCFs, append_peer_namespaces, append_peer_tableCFs, disable_peer, disable_table_replication, enable_peer, enable_table_replication, get_peer_config, list_peer_configs, list_peers, list_replicated_tables, remove_peer, remove_peer_exclude_namespaces, remove_peer_exclude_tableCFs, remove_peer_namespaces, remove_peer_tableCFs, set_peer_bandwidth, set_peer_exclude_namespaces, set_peer_exclude_tableCFs, set_peer_namespaces, set_peer_replicate_all, set_peer_serial, set_peer_tableCFs, show_peer_tableCFs, update_peer_config -
snapshots
:clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot -
configuration
:update_all_config, update_config, update_rsgroup_config -
quotas
:disable_exceed_throttle_quota, disable_rpc_throttle, enable_exceed_throttle_quota, enable_rpc_throttle, list_quota_snapshots, list_quota_table_sizes, list_quotas, list_snapshot_sizes, set_quota -
security
:grant, list_security_capabilities, revoke, user_permission -
procedures
:list_locks, list_procedures -
visibility labels
:add_labels, clear_auths, get_auths, list_labels, set_auths, set_visibility -
rsgroup
:add_rsgroup, alter_rsgroup_config, balance_rsgroup, get_namespace_rsgroup, get_rsgroup, get_server_rsgroup, get_table_rsgroup, list_rsgroups, move_namespaces_rsgroup, move_servers_namespaces_rsgroup, move_servers_rsgroup, move_servers_tables_rsgroup, move_tables_rsgroup, remove_rsgroup, remove_servers_rsgroup, rename_rsgroup, show_rsgroup_config -
storefiletracker
:change_sft, change_sft_all
基本命令
先用 HBase 提供的命令行工具进行交互,位于 HBase 的 /bin
目录下。
登入退出。
[root@node01 ~]# hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
For Reference, please visit: http://hbase.apache.org/2.0/book.html#shell
Version 2.5.3, rd385524561f771dcb402905c2bdcaeb4a8fecbdb, Wed Feb 1 09:01:02 UTC 2023
Took 0.0028 seconds
hbase:001:0> exit
帮助。
hbase> help
hbase(main):002:0> help 'status'
Show cluster status. Can be 'summary', 'simple', 'detailed', 'tasks', or 'replication'. The default is 'summary'. Examples:
hbase> status
# 查看简单信息
hbase> status 'simple'
# 查看概要信息
hbase> status 'summary'
# 查看详细信息
hbase> status 'detailed'
# 查看任务信息
hbase> status 'tasks'
# 查看副本信息
hbase> status 'replication'
hbase> status 'replication', 'source'
hbase> status 'replication', 'sink'
服务器状态。
hbase> status
1 active master, 1 backup masters, 3 servers, 0 dead, 0.6667 average load
Took 0.0092 seconds
版本。
hbase> version
2.5.3, rd385524561f771dcb402905c2bdcaeb4a8fecbdb, Wed Feb 1 09:01:02 UTC 2023
Took 0.0003 seconds
查看当前运行的任务列表。
hbase> processlist
当前登录用户是谁。
hbase> whoami
root (auth:SIMPLE)
groups: root
Took 0.0003 seconds
表操作帮助命令。
hbase> table_help
NameSpace 命令
命名空间类似于关系型数据库中的数据库的概念,他其实是表的逻辑分组。这种抽象为多租户相关功能奠定了基础。命名空间是可以管理维护的,可以创建,删除或更改命名空间。HBase 有两个特殊预定义的命名空间:
-
default:没有明确指定命名空间的表将自动落入此命名空间
-
hbase:系统命名空间,用于包含 HBase 的内部表和元数据表
HBase NameSpace 常用命令如下:
-
list_namespace
:查看命名空间,支持所有、正则、指定查询。 -
list_namespace_tables
:查看指定命令空间下的表。 -
create_namespace
:创建命名空间。 -
describe_namespace
:查看命名空间详情。 -
alter_namespace
:修改命名空间。 -
drop_namespace
:删除命名空间,命名空间必须为空。
list_namespace
:查看命名空间,支持所有、正则、指定查询。
# 查看所有命令空间
hbase> list_namespace
# 使用正则查看表
hbase> list_namespace 'h.*'
# 指定查询
hbase> list_namespace 'hbase'
list_namespace_tables
:查看指定命令空间下的表。
# 查看指定命令空间下的表
hbase> list_namespace_tables 'hbase'
create_namespace
:创建命名空间。
# 创建 ns1 命名空间
hbase> create_namespace 'ns1'
# 创建 ns1 命名空间并添加属性
hbase> create_namespace 'ns1', {'PROPERTY_NAME'=>'PROPERTY_VALUE'}
describe_namespace
:查看命名空间详情。
hbase> describe_namespace 'ns1'
alter_namespace
:修改命名空间。
# 修改命名空间,添加属性
hbase> alter_namespace 'ns1', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'}
# 修改命名空间,移除属性
hbase> alter_namespace 'ns1', {METHOD => 'unset', NAME=>'PROPERTY_NAME'}
drop_namespace
:删除命名空间,命名空间必须为空。
# 删除命名空间,命名空间必须为空
hbase> drop_namespace 'ns1'
DDL 命令
HBase DDL 常用命令如下:
-
create
:创建表。 -
list
:查看所有表(所有命名空间)。 -
describe
:查看表详情。 -
exists
:验证表是否存在。 -
alter
:修改表。 -
disable
:禁用表。 -
is_disabled
:验证表是否已禁用。 -
enable
:启用表。 -
is_enabled
:验证表是否已启用。 -
drop 和 drop_all
:删除表,表必须处于禁用状态才可以被删除。
create
:创建表,CREATE 语法如下。
# 在命名空间 ns1 下创建表名为 t1 的表,列族 f1 版本数为 5
hbase> create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}
# 在命名空间 default 下创建表名为 t2 的表,列族 f1、列族 f2、列族 f3 版本数均为 1
hbase> create 't2', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
# 上条命令简写方式如下
hbase> create 't3', 'f1', 'f2', 'f3'
# 是否使用 BlockCache,默认开启
hbase> create 't4', {NAME => 'f1', VERSIONS => 2, BLOCKCACHE => true}
# 在 colfam1 列族上启用 ROWCOL Bloom 过滤器,此时的布隆过滤器为 Rowkey + 列
# 默认情况下启用基于行的 Bloom 过滤器(BLOOMFILTER => 'ROW'),可以通过 BLOOMFILTER => 'NONE' 禁用
# 如果经常扫描整行,那么行 + 列组合将不会提供任何好处
hbase> create 'mytable',{NAME => 'colfam1', BLOOMFILTER => 'ROWCOL'}
list
:查看所有表(所有命名空间)。
hbase> list
TABLE
t2
t3
t4
ns1:t1
4 row(s)
Took 0.0105 seconds
=> ["t2", "t3", "t4", "ns1:t1"]
describe
:查看表详情。
hbase> describe 't2'
hbase> describe 'ns1:t1'
exists
:验证表是否存在。
hbase> exists 't1'
查看表在 HDFS 上的数据。
[root@node01 ~]# hdfs dfs -ls /hbase/data/default/
[root@node01 ~]# hdfs dfs -ls /hbase/data/ns1/
alter
:修改表。
# 修改 ns1:t1 表的 f1 列族版本数为 3
hbase> alter 'ns1:t1', {NAME => 'f1', VERSIONS => 3}
# 删除 t3 表的 f2 列族
hbase> alter 't3', {NAME => 'f2', METHOD => 'delete'}
# 给 t3 表添加 f4 列族,版本数为 5
hbase> alter 't3', {NAME => 'f4', VERSIONS => 5}
disable
:禁用表与 is_disabled :验证表是否已禁用。
hbase> disable 't3'
hbase> is_disabled 't3'
enable
:启用表与 is_enabled :验证表是否已启用。
hbase> enable 't3'
hbase> is_enabled 't3'
drop 和 drop_all
:删除表,表必须处于禁用状态才可以被删除。
# 表必须处于禁用状态才可以被删除
hbase> disable 't3'
hbase> drop 't3'
# 使用正则删除多张表
hbase> drop_all 't.*'
hbase> drop_all 'ns:t.*'
hbase> drop_all 'ns:.*'
创建表时预分区。
# 自定义切分点
# create 'tableName', 'columnFamily', {SPLITS => ['rowKey1', 'rowKey2', 'rowKey3']}
create 't5', 'f1', {SPLITS => ['10', '20', '30', '40', '50']}
# 基于随机字节键创建具有 8 个区域的表
hbase> create 't6', 'f1', {NUMREGIONS => 8, SPLITALGO => 'UniformSplit'}
# 基于十六进制创建具有 10 个区域的表
hbase> create 't7', 'f1', {NUMREGIONS => 10, SPLITALGO => 'HexStringSplit'}
DML 命令
HBase DML 常用命令如下:
-
put
:插入记录。 -
count
:统计表的行数。 -
get
:获取记录。 -
get_splits
:获取表 HRegion 信息。 -
incr
:让某个单元格的数据自增,可以指定自增数。 -
get_counter
:获取当前计数器的值。 -
append
:追加记录。 -
scan
:扫描表。全表扫描的时候一般和过滤器一起使用。 -
delete
:删除某个 Cell 数据(删除 Qualifier 的数据)。 -
deleteall
:删除某行的所有 Cell 数据,可以使用 RowKey 前缀。 -
truncate
:清空表,会把表分区也清除掉。truncate 是 disable、drop、create 三个动作的自动化集成。 -
truncate_preserve
:清空表,只清除数据。
首先创建 t_user 表用作练习。
# 创建 t_user 表,列族 personal 和列族 office 版本数均为 3
hbase> create 't_user', {NAME => 'personal', VERSIONS => 3}, {NAME => 'office', VERSIONS => 3},
{SPLITS => ['user-1', 'user-2', 'user-4']}
put
:插入记录。
# put 'tableName', 'rowKey', '列', '值'
# 通过 Shell 插入的 int 类型值会被自动转为 String
put 't_user', 'user-1', 'personal:name', 'zhangsan'
put 't_user', 'user-1', 'personal:gender', 1
put 't_user', 'user-1', 'office:phone', '13800000000'
put 't_user', 'user-1', 'office:address', 'beijing'
put 't_user', 'user-1', 'office:phone', '13000000000'
put 't_user', 'user-1', 'office:address', 'tianjin'
put 't_user', 'user-1', 'office:phone', '13800000000'
put 't_user', 'user-1', 'office:address', 'shanghai'
put 't_user', 'user-10', 'personal:name', 'lisi'
put 't_user', 'user-10', 'personal:gender', 0
put 't_user', 'user-10', 'office:address', 'shanghai'
put 't_user', 'user-11', 'personal:name', 'wangwu'
put 't_user', 'user-11', 'personal:gender', 1
put 't_user', 'user-11', 'personal:age', 20
put 't_user', 'user-11', 'office:phone', '13900000000'
put 't_user', 'user-11', 'office:address', 'guangzhou'
put 't_user', 'user-2', 'personal:name', 'zhaoliu'
put 't_user', 'user-2', 'personal:gender', 1
put 't_user', 'user-2', 'personal:age', 19
put 't_user', 'user-2', 'office:phone', '13100000000'
put 't_user', 'user-3', 'personal:name', 'tianqi'
put 't_user', 'user-3', 'personal:gender', 0
put 't_user', 'user-3', 'office:address', 'shaanxi'
注意:现在表的数据都在内存中,并没有落地到磁盘。如果这时想要落地到磁盘只能手动落地。
命令:flush 'tableName'
,例如:flush 't_user'
。然后在 HDFS 中就可以查看到对应的数据了。
count
:统计表的行数。
hbase> count 't_user'
get
:获取记录。
# 获取一个 RowKey 的所有数据
hbase> get 't_user', 'user-1'
# 获取一个 RowKey 的某个列族的所有数据
hbase> get 't_user', 'user-1', 'personal'
# 获取一个 RowKey 的某个列族中某个列的所有数据
hbase> get 't_user', 'user-11', 'personal:age'
get_splits
:获取表分割信息。
hbase> get_splits 't_user'
Total number of splits = 4
user-1
user-2
user-4
Took 0.1432 seconds
=> ["user-1", "user-2", "user-4"]
incr
:让某个单元格的数据自增,可以指定自增数。
给 user-1 增加 personal:age
字段,并使用 Counter 实现递增。
# 自增1
hbase> incr 't_user', 'user-1', 'personal:age', 1
hbase> get 't_user', 'user-1', 'personal:age'
# 自增2
hbase> incr 't_user', 'user-1', 'personal:age', 2
hbase> get 't_user', 'user-1', 'personal:age'
# 注意:incr 可以对不存在的列进行操作,如果列值已经存在会报错;如果使用 put 修改了 incr 的值再使用 incr 也会
报错
# ERROR: org.apache.hadoop.hbase.DoNotRetryIOException: Field is not a long, it's 4 bytes wide
get_counter
:获取当前计数器的值。
hbase> get_counter 't_user', 'user-1', 'personal:age'
append
:追加记录。
hbase> put 't_user', 'user-33', 'personal:gender', 0
hbase> append 't_user', 'user-33', 'office:phone', '18100000000'
hbase> append 't_user', 'user-33', 'office:address', 'sichuan'
hbase> get 't_user', 'user-33'
scan
:扫描表。全表扫描的时候一般和过滤器一起使用。
# 全表扫描
hbase> scan 't_user'
# 扫描时指定列族
hbase> scan 't_user', {COLUMNS => 'personal'}
`delete` :删除某个 Cell 数据(删除 Qualifier 的数据)。
hbase> delete 't_user', 'user-33', 'office:phone'
hbase> get 't_user', 'user-33'
deleteall
:删除某行的所有数据,使用 RowKey 过滤。
hbase> deleteall 't_user', 'user-33'
hbase> get 't_user', 'user-33'
truncate
:清空表,会把表分区也清除掉。 truncate_preserve
:清空表,只清除数据。
hbase> truncate 'namespace:tableName'
hbase> truncate_preserve 'namespace:tableName'
DQL 命令
其实本来就没有 DQL(DQL 属于 DML),日常使用中查询操作最多,所以经常被人单独拎出来讲。
# 获取一个 RowKey 的所有数据
hbase> get 't_user', 'user-1'
# 获取一个 RowKey 的某个列族的所有数据
hbase> get 't_user', 'user-1', 'personal'
# 获取一个 RowKey 的某个列族中某个列的所有数据
hbase> get 't_user', 'user-11', 'personal:age'
# 获取一个 RowKey 的某个列族中某个列的多版本数据
hbase> get 't_user', 'user-1', {COLUMN => 'office:phone', VERSIONS => 3}
# 通过 Timestamp 获取指定版本的数据
hbase> get 't_user', 'user-1', {COLUMN => 'office:phone', TIMESTAMP =>1661776792305}
# 全表扫描
hbase> scan 't_user'
注意:全表扫描的时候一般都和过滤器一起使用。
# 扫描时指定列族
hbase> scan 't_user', {COLUMNS => 'personal'}
# 扫描时指定列族,并显示最新的 3 个版本的内容
hbase> scan 't_user', {COLUMNS => 'office', VERSIONS => 3}
# 扫描时指定列族和列名,并显示最新的 3 个版本的内容
hbase> scan 't_user', {COLUMNS => 'office:phone', VERSIONS => 3}
# 设置开启 Raw 模式,开启 Raw 模式会把那些已添加删除标记但是未实际删除的数据也显示出来
hbase> scan 't_user', {COLUMNS => 'personal', RAW => true}
# 查询 t_user 表中列族为 personal 和 office 的数据
hbase> scan 't_user', {COLUMNS => ['personal', 'office']}
hbase> scan 't_user', {COLUMNS => ['personal', 'office'], RAW => true}
# 查询 t_user 表中列族为 personal,列名为 name 和列族为 office,列名为 phone 的数据
hbase> scan 't_user', {COLUMNS => ['personal:name', 'office:phone']}
# 查询 t_user 表中列族为 personal 和 office 且列名含有 a 字符的数据
hbase> scan 't_user', {COLUMNS => ['personal', 'office'], FILTER => "(QualifierFilter(=,
'substring:a'))"}
# 查询 t_user 表中列族为 personal,RowKey 范围是 [user-1, user-2) 的数据
hbase> scan 't_user', {COLUMNS => 'personal', STARTROW => 'user-1', ENDROW => 'user-2'}
# 查询 t_user 表中 RowKey 以 user-1 字符开头的
hbase> scan 't_user', {FILTER=>"PrefixFilter('u')"}
hbase> scan 't_user', {FILTER=>"PrefixFilter('user-1')"}
# 查询 t_user 表中指定时间范围的数据
hbase> scan 't_user', {TIMERANGE => [1661777937728, 1661777975593]}
TOOLS 命令
zk_dump
:查看 ZK 中的 HBase 节点信息。
hbase> zk_dump
Java API
HBase 有多种不同的客户端,如 REST 客户端,Thrift 客户端,ORM 框架 Kundera 等等。HBase 也提供了 Java 的 API来操作表与列族等信息,它的 Shell 就是对 Java API 的封装。
HBase 的 Java API 提供了许多高级的特性:
-
元数据管理,列族的数据压缩,HRegion 分隔;
-
创建,删除,更新,读取 RowKey 等。
创建项目
完整 pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yjxxt</groupId>
<artifactId>hbase-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- Hadoop 版本控制 -->
<hadoop.version>3.3.4</hadoop.version>
<!-- HBase 版本控制 -->
<hbase.version>2.5.3</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- JUnit 单元测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
添加配置
导出 Hadoop 和 HBase 的配置文件。主要拷贝以下配置文件:
-
core-site.xml
-
hdfs-site.xml
-
mapred-site.xml
-
yarn-site.xml
-
log4j.properties(Hadoop 的)
-
hbase-site.xml
NameSpace API
package com.yjxxt.namespace;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@DisplayName("命名空间测试类")
public class HBaseNameSpaceTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
@DisplayName("创建数据库连接并初始化管理类")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("查看命名空间")
@Test
public void testListNameSpace() throws IOException {
NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespace : namespaces) {
System.out.println(namespace);
}
}
@DisplayName("查看指定命名空间下的表")
@Test
public void testListNameSpaceTables() throws IOException {
TableName[] tableNames = admin.listTableNamesByNamespace("default");
for (TableName tableName : tableNames) {
System.out.println(tableName);
}
}
@DisplayName("创建命名空间")
@Test
public void testCreateNameSpace() throws IOException {
NamespaceDescriptor bd = NamespaceDescriptor.create("bd").build();
admin.createNamespace(bd);
System.out.println("HBaseDDL.createNameSpace[命名空间创建完成]");
}
@DisplayName("查看命名空间详情")
@Test
public void testDescribeNamespace() throws IOException {
NamespaceDescriptor bd = admin.getNamespaceDescriptor("bd");
System.out.println(bd);
}
@DisplayName("修改命名空间")
@Test
public void testAlterNamespace() throws IOException {
// 获取命名空间
NamespaceDescriptor bd = admin.getNamespaceDescriptor("bd");
// 设置属性
bd.setConfiguration("PROPERTY_NAME", "PROPERTY_VALUE");
admin.modifyNamespace(bd);
bd = admin.getNamespaceDescriptor("bd");
// 获取属性
System.out.println(bd.getConfiguration());
// 获取指定属性
System.out.println(bd.getConfigurationValue("PROPERTY_NAME"));
// 移除属性
bd.removeConfiguration("PROPERTY_NAME");
}
@DisplayName("删除命名空间")
@Test
public void testDropNamespace() throws IOException {
// 删除命名空间,命名空间必须为空
admin.deleteNamespace("bd");
}
}
DDL API
package com.yjxxt.ddl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Set;
@DisplayName("DDL测试类")
public class HBaseDDLTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
@DisplayName("创建数据库连接并初始化管理类")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("创建表")
@Test
public void testCreateTable() throws IOException {
// 判断表是否存在
if (admin.tableExists(TableName.valueOf("bd:t_user"))) {
System.out.println("bd:t_user 表已存在");
return;
}
// 在命名空间 bd 下创建表名为 t_user 的表
TableDescriptorBuilder table = TableDescriptorBuilder
.newBuilder(TableName.valueOf("bd:t_user"));
// 构建列族并设置列族版本数
ColumnFamilyDescriptor personal = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("personal"))
.setMaxVersions(3)
.build();
ColumnFamilyDescriptor office = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("office"))
.setMaxVersions(3)
.build();
// 设置列族
// table.setColumnFamily(personal);
// table.setColumnFamily(office);
// 或者以下代码置列族
table.setColumnFamilies(Arrays.asList(personal, office));
// 创建预分区
byte[][] splitKeys = new byte[][]{Bytes.toBytes("user-1"),
Bytes.toBytes("user-2"), Bytes.toBytes("user-4")};
// 创建表并设置预分区
admin.createTable(table.build(), splitKeys);
}
@DisplayName("查看所有表(所有命名空间)")
@Test
public void testListTable() throws IOException {
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
System.out.println(tableName);
}
}
@DisplayName("查看指定命名空间下的表")
@Test
public void testListNameSpaceTables() throws IOException {
TableName[] tableNames = admin.listTableNamesByNamespace("default");
for (TableName tableName : tableNames) {
System.out.println(tableName);
}
}
@DisplayName("查看表详情")
@Test
public void testDescribe() throws IOException {
// 获取表
TableDescriptor userTable = admin.getDescriptor(TableName.valueOf("bd:t_user"));
// 获取表名
System.out.println("========== 获取表名 ==========");
System.out.println(userTable.getTableName());
// 获取所有列族
System.out.println("========== 获取所有列族 ==========");
ColumnFamilyDescriptor[] columnFamilies = userTable.getColumnFamilies();
for (ColumnFamilyDescriptor columnFamily : columnFamilies) {
// 获取列族名
System.out.println(new String(columnFamily.getName(), StandardCharsets.UTF_8));
// 获取列族版本数
System.out.println(columnFamily.getMaxVersions());
// 获取列族属性
System.out.println(columnFamily.getConfiguration());
}
// 获取指定列族
System.out.println("========== 获取指定列族 ==========");
ColumnFamilyDescriptor office = userTable.getColumnFamily(Bytes.toBytes("office"));
// 获取列族名
System.out.println(new String(office.getName(), StandardCharsets.UTF_8));
// 获取列族版本数
System.out.println(office.getMaxVersions());
// 获取列族属性
System.out.println(office.getConfiguration());
// 获取所有列族名
System.out.println("========== 获取所有列族名 ==========");
Set<byte[]> columnFamilyNames = userTable.getColumnFamilyNames();
for (byte[] columnFamilyName : columnFamilyNames) {
System.out.println(new String(columnFamilyName, StandardCharsets.UTF_8));
}
// 获取列族数量
System.out.println("========== 获取列族数量 ==========");
System.out.println(userTable.getColumnFamilyCount());
// 表的所有信息
System.out.println("========== 表的所有信息 ==========");
System.out.println(userTable);
}
@DisplayName("修改表")
@Test
public void testAlterTable() throws IOException {
// 修改 office 列族的版本数为 5
ColumnFamilyDescriptor office = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("office"))
.setMaxVersions(5)
.build();
admin.modifyColumnFamily(TableName.valueOf("bd:t_user"), office);
System.out.println(admin.getDescriptor(TableName.valueOf("bd:t_user")));
// 删除 bd:t_user 表的 office 列族
admin.deleteColumnFamily(TableName.valueOf("bd:t_user"), Bytes.toBytes("office"));
System.out.println(admin.getDescriptor(TableName.valueOf("bd:t_user")));
// 添加 office 列族,版本数为 3
office = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("office"))
.setMaxVersions(3)
.build();
admin.addColumnFamily(TableName.valueOf("bd:t_user"), office);
System.out.println(admin.getDescriptor(TableName.valueOf("bd:t_user")));
}
@DisplayName("禁用表")
@Test
public void testDisableTable() throws IOException {
admin.disableTable(TableName.valueOf("bd:t_user"));
}
@DisplayName("验证表是否已禁用")
@Test
public void testIsDisableTable() throws IOException {
boolean result = admin.isTableDisabled(TableName.valueOf("bd:t_user"));
System.out.println(result);
}
@DisplayName("启用表")
@Test
public void testEnableTable() throws IOException {
admin.enableTable(TableName.valueOf("bd:t_user"));
}
@DisplayName("验证表是否已启用")
@Test
public void testIsEnabledTable() throws IOException {
boolean result = admin.isTableEnabled(TableName.valueOf("bd:t_user"));
System.out.println(result);
}
@DisplayName("删除表")
@Test
public void testDropTable() throws IOException {
// 删除表,表必须处于禁用状态才可以被删除
admin.deleteTable(TableName.valueOf("bd:t_user"));
}
}
DML API
package com.yjxxt.dml;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@DisplayName("DML测试类")
public class HBaseDMLTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
/**
* 表对象
*/
private Table table;
@DisplayName("创建数据库连接并初始化管理类和数据库表")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
// 获取数据库表
table = connection.getTable(TableName.valueOf("bd:t_user"));
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (table != null) {
table.close();
}
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("插入单行记录")
@Test
public void testOntPut() throws IOException {
// 初始化 RowKey
Put put = new Put(Bytes.toBytes("user-1"));
// 列族和列
put.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
put.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("1"));
// 列族和列
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("13800000000"));
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("beijing"));
// 插入一行数据(一个版本需要插入一次)
table.put(put);
// 初始化 RowKey
put = new Put(Bytes.toBytes("user-1"));
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("13000000000"));
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("tianjin"));
table.put(put);
// 初始化 RowKey
put = new Put(Bytes.toBytes("user-1"));
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("shanghai"));
put.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("13800000000"));
table.put(put);
}
@DisplayName("插入多行记录")
@Test
public void testBatchPut() throws IOException {
// 初始化 RowKey
Put put1 = new Put(Bytes.toBytes("user-10"));
// 列族和列
put1.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("lisi"));
put1.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("0"));
// 列族和列
put1.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("shanghai"));
// 初始化 RowKey
Put put2 = new Put(Bytes.toBytes("user-11"));
// 列族和列
put2.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("wangwu"));
put2.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("1"));
put2.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("age"), Bytes.toBytes("20"));
// 列族和列
put2.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("13900000000"));
put2.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("guangzhou"));
// 初始化 RowKey
Put put3 = new Put(Bytes.toBytes("user-2"));
// 列族和列
put3.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("zhaoliu"));
put3.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("1"));
put3.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("age"), Bytes.toBytes("19"));
// 列族和列
put3.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("13100000000"));
// 初始化 RowKey
Put put4 = new Put(Bytes.toBytes("user-3"));
// 列族和列
put4.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("tianqi"));
put4.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("0"));
// 列族和列
put4.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("shaanxi"));
// 创建 Put 容器
List<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
puts.add(put3);
puts.add(put4);
// 批量插入。注意:HBase 2.2.5 不支持 List 多版本插入,会以 RowKey 进行自动覆盖
table.put(puts);
}
@DisplayName("统计")
@Test
public void testCount() throws IOException {
Get get = new Get(Bytes.toBytes("user-1"));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(cells.length);
}
@DisplayName("获取单行记录")
@Test
public void testOneGet() throws IOException {
// 设置 RowKey
Get get = new Get(Bytes.toBytes("user-1"));
// 设置列族
// get.addFamily(Bytes.toBytes("personal"));
// 设置列
// get.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
// 设置一次性获取多少个版本的数据
// get.readVersions(3);
// 获取所有版本数据
get.readAllVersions();
Result result = table.get(get);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
}
@DisplayName("获取多行记录")
@Test
public void testBatchGet() throws IOException {
List<Get> gets = new ArrayList<>();
gets.add(new Get(Bytes.toBytes("user-1")));
gets.add(new Get(Bytes.toBytes("user-10")));
gets.add(new Get(Bytes.toBytes("user-11")));
Result[] results = table.get(gets);
for (Result result : results) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
System.out.println("========== 华丽的分割线 ==========");
}
}
@DisplayName("获取表 HRegion 信息")
@Test
public void testGetSplits() throws IOException {
List<RegionInfo> regions = admin.getRegions(table.getName());
for (RegionInfo region : regions) {
System.out.println(region.getRegionId());
System.out.println(region.getRegionNameAsString());
System.out.println(region.isFirst());
System.out.println(region.isLast());
System.out.println(new String(region.getStartKey(), StandardCharsets.UTF_8));
System.out.println(new String(region.getEndKey(), StandardCharsets.UTF_8));
System.out.println("========== 华丽的分割线 ==========");
}
}
@DisplayName("追加记录")
@Test
public void testAppend() throws IOException {
// 插入记录
Put put = new Put(Bytes.toBytes("user-33"));
put.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("gender"), Bytes.toBytes("0"));
table.put(put);
// 追加记录
Append append = new Append(Bytes.toBytes("user-33"));
append.addColumn(Bytes.toBytes("office"), Bytes.toBytes("phone"), Bytes.toBytes("18100000000"));
append.addColumn(Bytes.toBytes("office"), Bytes.toBytes("address"), Bytes.toBytes("sichuan"));
table.append(append);
// 获取记录
Get get = new Get(Bytes.toBytes("user-33"));
Result result = table.get(get);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
}
@DisplayName("扫描表")
@Test
public void testScan() throws IOException {
Scan scan = new Scan();
// 不设置 RowKey 的范围则为全表扫描,RowKey 的范围为左闭右开的关系
// scan.withStartRow(Bytes.toBytes("user-1"));
// scan.withStopRow(Bytes.toBytes("user-33"));
// 开始扫描
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
System.out.println("========== 华丽的分割线 ==========");
}
scanner.close();
}
@DisplayName("删除某个 Cell 数据(删除 Qualifier 的数据)")
@Test
public void testDeleteCell() throws IOException {
// 判断表是否存在
if (!admin.tableExists(TableName.valueOf("bd:t_user"))) {
System.out.println("bd:t_user 表不存在");
return;
}
// 指定 RowKey
Delete delete = new Delete(Bytes.toBytes("user-33"));
// 指定 Column Family 和 Column Qualifier
delete.addColumns(Bytes.toBytes("office"), Bytes.toBytes("phone"));
// 删除
table.delete(delete);
// 获取记录
Get get = new Get(Bytes.toBytes("user-33"));
Result result = table.get(get);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
}
@DisplayName("删除某行所有数据")
@Test
public void testDeleteRow() throws IOException {
// 判断表是否存在
if (!admin.tableExists(TableName.valueOf("bd:t_user"))) {
System.out.println("bd:t_user 表不存在");
return;
}
// 指定 RowKey
Delete delete = new Delete(Bytes.toBytes("user-33"));
// 删除
table.delete(delete);
// 获取记录
Get get = new Get(Bytes.toBytes("user-33"));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(cells.length);
}
@DisplayName("删除多行数据")
@Test
public void testBatchDeleteRow() throws IOException {
// 判断表是否存在
if (!admin.tableExists(TableName.valueOf("bd:t_user"))) {
System.out.println("bd:t_user 表不存在");
return;
}
// 指定 RowKey
Delete delete1 = new Delete(Bytes.toBytes("user-33"));
Delete delete2 = new Delete(Bytes.toBytes("user-4"));
Delete delete3 = new Delete(Bytes.toBytes("user-5"));
List<Delete> deletes = new ArrayList<>();
deletes.add(delete1);
deletes.add(delete2);
deletes.add(delete3);
// 删除
table.delete(deletes);
}
/**
* truncate 是 disable、drop、create 三个动作的自动化集成。
*/
@DisplayName("清空表")
@Test
public void testTruncatePreserve() throws IOException {
// 获取表
TableDescriptor oldTable = admin.getDescriptor(table.getName());
// 构建表
TableDescriptorBuilder newTable = TableDescriptorBuilder.newBuilder(oldTable.getTableName());
// 获取列族
ColumnFamilyDescriptor[] columnFamilies = oldTable.getColumnFamilies();
for (ColumnFamilyDescriptor columnFamily : columnFamilies) {
// 获取列族名
byte[] columnFamilyName = columnFamily.getName();
// 获取列族版本数
int columnFamilyMaxVersions = columnFamily.getMaxVersions();
// 获取列族属性
Map<String, String> columnFamilyConfiguration = columnFamily.getConfiguration();
// 构建列族并设置列族版本数
ColumnFamilyDescriptorBuilder newColumnFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(columnFamilyName)
.setMaxVersions(columnFamilyMaxVersions);
// 设置列族属性
for (Map.Entry<String, String> entry : columnFamilyConfiguration.entrySet()) {
newColumnFamilyBuilder.setConfiguration(entry.getKey(), entry.getValue());
}
// 设置列族
newTable.setColumnFamily(newColumnFamilyBuilder.build());
}
// 获取 HRegison
List<RegionInfo> regions = admin.getRegions(oldTable.getTableName());
// 创建预分区
byte[][] splitKeys = null;
if (!regions.isEmpty()) {
List<String> rowKeys = new ArrayList<>();
for (RegionInfo region : regions) {
String key = new String(region.getStartKey(), StandardCharsets.UTF_8);
if (key.length() != 0) {
rowKeys.add(key);
}
}
splitKeys = new byte[rowKeys.size()][];
for (int i = 0; i < rowKeys.size(); i++) {
splitKeys[i] = Bytes.toBytes(rowKeys.get(i));
}
}
// 禁用表
admin.disableTable(table.getName());
// 删除表
admin.deleteTable(table.getName());
// 创建表并设置预分区
admin.createTable(newTable.build(), splitKeys);
}
}
DQL API
package com.yjxxt.dql;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@DisplayName("DQL测试类")
public class HBaseDQLTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
/**
* 表对象
*/
private Table table;
@DisplayName("创建数据库连接并初始化管理类和数据库表")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
// 获取数据库表
table = connection.getTable(TableName.valueOf("default:t_user"));
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (table != null) {
table.close();
}
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("统计")
@Test
public void testCount() throws IOException {
Get get = new Get(Bytes.toBytes("user-1"));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(cells.length);
}
@DisplayName("获取单行记录")
@Test
public void testOneGet() throws IOException {
// 设置 RowKey
Get get = new Get(Bytes.toBytes("user-1"));
// 设置列族
// get.addFamily(Bytes.toBytes("personal"));
// 设置列
// get.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
// 设置一次性获取多少个版本的数据
// get.readVersions(3);
// 获取所有版本数据
get.readAllVersions();
Result result = table.get(get);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
}
@DisplayName("获取多行记录")
@Test
public void testBatchGet() throws IOException {
List<Get> gets = new ArrayList<>();
gets.add(new Get(Bytes.toBytes("user-1")).readAllVersions());
gets.add(new Get(Bytes.toBytes("user-10")).readAllVersions());
gets.add(new Get(Bytes.toBytes("user-11")).readAllVersions());
Result[] results = table.get(gets);
for (Result result : results) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
String columnName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
System.out.println("========== 华丽的分割线 ==========");
}
}
@DisplayName("扫描表")
@Test
public void scanWholeTable() throws IOException {
Scan scan = new Scan();
// 不设置 RowKey 的范围则为全表扫描,RowKey 的范围为左闭右开的关系
// scan.withStartRow(Bytes.toBytes("user-1"));
// scan.withStopRow(Bytes.toBytes("user-33"));
// 设置列族
// scan.addFamily(Bytes.toBytes("personal"));
// 设置列
// scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
// 设置一次性获取多少个版本的数据
// scan.readVersions(3);
// 获取所有版本数据
scan.readAllVersions();
// 提示:最好使用 RowKey,不建议使用过滤器,过滤器效率过低
/*
CompareOperator 比较运算符:
LESS, // 小于
LESS_OR_EQUAL, // 小于等于
EQUAL, // 等于
NOT_EQUAL, // 不等于
GREATER_OR_EQUAL, // 大于等于
GREATER, // 大于
NO_OP; // 无操作
*/
// RowKey 过滤器
// 查询 RowKey 以 u 开头
Filter rowKeyFilter1 = new PrefixFilter(Bytes.toBytes("u"));
// 查询 RowKey 为 user-1
Filter rowKeyFilter2 = new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("user-1")));
// 列族过滤器
// 查询列族小于 user-2
Filter familyFilter = new FamilyFilter(CompareOperator.LESS, new BinaryComparator(Bytes.toBytes("user-2")));
// 列过滤器
// 查询列为 office
Filter qualifierFilter = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("address")));
// 值过滤器
// 查询含有 a 字符的
Filter valueFilter = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("a"));
// 设置列族和列并与某值相等
SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("personal"),
Bytes.toBytes("office"),
CompareOperator.EQUAL, new SubstringComparator("a"));
// 如果为 true,当这一列不存在时,不会返回,如果为 false,当这一列不存在时,会返回所有列信息
scvf.setFilterIfMissing(true);
// 时间戳过滤器
List<Long> ts = new ArrayList<>();
ts.add(5L);
ts.add(10L);
ts.add(1686910219798L);
Filter timestampsFilter = new TimestampsFilter(ts);
// 添加一组过滤器
List<Filter> filters = new ArrayList<>();
filters.add(rowKeyFilter1);
filters.add(qualifierFilter);
filters.add(timestampsFilter);
FilterList filterAll = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
// 添加过滤器
// scan.setFilter(filterAll);
scan.setFilter(valueFilter);
// 开始扫描
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("rowKey:" + Bytes.toString(result.getRow()));
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
System.out.println(columnName + ":" + columnValue);
}
System.out.println("========== 华丽的分割线 ==========");
}
scanner.close();
}
@DisplayName("分页查询测试")
@Test
public void testPage2() throws IOException {
Scan scan = new Scan();
// 模拟页码数与每页条数
int pageNum = 1, pageSize = 3;
// 模拟总页码数
int totalPages = 2;
// 分页过滤器(每个 HRegion 的前 N 行)
Filter pageFilter = new PageFilter(pageSize);
// 添加过滤器
scan.setFilter(pageFilter);
// 开始扫描
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}
scanner.close();
}
@DisplayName("分页查询")
@Test
public void testPage() throws IOException {
Scan scan = new Scan();
// 模拟页码数与每页条数
int pageNum = 1, pageSize = 2;
// 模拟总页码数
int totalPages = 2;
// 分页过滤器(每个 HRegion 的前 N 行)
Filter pageFilter = new PageFilter(pageSize);
// 添加过滤器
scan.setFilter(pageFilter);
// 页码计数器
int pageCount = 0;
// 循环处理
while (true) {
pageCount++; // 第一页,第二页,第三页……
// 当页码计时器大于页码数时结束循环
if (pageCount > totalPages) {
break;
}
// 行计数器
int rowCount = 0;
// 开始扫描
ResultScanner scanner = table.getScanner(scan);
System.out.printf("========== 第 %s 页 ==========\n", pageCount);
for (Result result : scanner) {
rowCount++; // 第一行,第二行,第三行……
// 当记录计数器大于每页条数相等时进入下一页
if (rowCount > pageSize) {
// 重新设置 RowKey
scan.withStartRow(result.getRow());
break;
}
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}
scanner.close();
}
}
}
手机通话记录生成练习
package com.yjxxt.phone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@DisplayName("手机通话记录生成测试类")
public class PhoneRecordTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
/**
* 表对象
*/
private Table table;
/**
* 表名
*/
private final static String TABLE_NAME = "t_phone";
/**
* 列族
*/
private final static String COLUMN_FAMILY_NAME = "info";
@DisplayName("创建数据库连接并初始化管理类和数据库表")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
// 获取数据库表
table = connection.getTable(TableName.valueOf(TABLE_NAME));
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (table != null) {
table.close();
}
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("创建含有预分区的表")
@Test
public void createTable() throws IOException {
// 获取表
TableDescriptorBuilder table = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME));
// 创建列族
ColumnFamilyDescriptorBuilder columnBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY_NAME));
ColumnFamilyDescriptor familyDescriptor = columnBuilder.build();
// 将列族添加到表对象
table.setColumnFamily(familyDescriptor);
// 获取预分区
byte[][] phoneRange = createPhoneSplits();
// 创建表对象
admin.createTable(table.build(), phoneRange);
}
@DisplayName("批量插入手机通话记录")
@Test
public void putPhoneRecords() throws IOException {
// 创建一个容器
List<Put> puts = new ArrayList<>();
// 批量插入数据
for (int i = 0; i < 100; i++) {
// 随机生成一个手机号码
String phone = getRandomPhone();
// 开始随机生成通话记录
for (int j = 0; j < 100; j++) {
// 随机生成数据
String otherphone = getRandomPhone();
Date date = getRandomDate();
// 创建插入对象
Put put = new Put(Bytes.toBytes(phone + "_" + date.getTime()));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("otherphone"), Bytes.toBytes(otherphone));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("time"), Bytes.toBytes(getRandomNum(1, 3600 * 3)));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("date"), Bytes.toBytes(date.getTime()));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("type"), Bytes.toBytes(String.valueOf(getRandomNum(0, 1))));
// 添加至容器
puts.add(put);
}
}
// 将数据插入到数据库
table.put(puts);
}
@DisplayName("扫描数据")
@Test
public void scanPhoneRecords() throws Exception {
// 创建扫描对象
Scan scan = new Scan();
// 本次查询开始的 RowKey 和结束的 RowKey
String startRow = "138";
String endRow = "139";
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(endRow));
// 创建结果扫描器
ResultScanner resultScanner = table.getScanner(scan);
// 获取所有的结果
for (Result result : resultScanner) {
// 获取 Cell
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
if (qualifier.equals("date")) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(Bytes.toLong(CellUtil.cloneValue(cell)))));
} else if (qualifier.equals("time")) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toInt(CellUtil.cloneValue(cell)));
} else {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
resultScanner.close();
}
/**
* 创建手机号码通讯记录表的预分区
*
* @return
*/
private byte[][] createPhoneSplits() {
String[] telArrays = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
// 对数组进行排序
Arrays.sort(telArrays);
// 创建分区
byte[][] phoneSplits = new byte[telArrays.length][];
for (int i = 0; i < telArrays.length; i++) {
phoneSplits[i] = Bytes.toBytes(telArrays[i]);
}
return phoneSplits;
}
/**
* 随机生成手机号码
*
* @return
*/
private static String getRandomPhone() {
String[] telArrays = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
int index = getRandomNum(0, telArrays.length - 1);
String first = telArrays[index];
String second = String.valueOf(getRandomNum(1, 888) + 10000).substring(1);
String third = String.valueOf(getRandomNum(1, 9100) + 10000).substring(1);
return first + second + third;
}
/**
* 生成随机数字
*
* @param start
* @param end
* @return
*/
public static int getRandomNum(int start, int end) {
return (int) (Math.random() * (end - start + 1) + start);
}
/**
* 随机获取日期
*
* @return
*/
private static Date getRandomDate() {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
Date start = format.parse("2021-01-01");
Date end = format.parse("2022-01-01");
if (start.getTime() >= end.getTime()) {
return null;
}
long date = random(start.getTime(), end.getTime());
return new Date(date);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 随机生成数据
*
* @param begin
* @param end
* @return
*/
private static long random(long begin, long end) {
long rtn = begin + (long) (Math.random() * (end - begin));
if (rtn == begin || rtn == end) {
return random(begin, end);
}
return rtn;
}
}
读写流程
HBase 中单表的数据量通常可以达到 TB 级或 PB 级,但大多数情况下数据读取可以做到毫秒级。HBase 是如何做到的呢?要想实现表中数据的快速访问,通用的做法是数据保持有序并尽可能的将数据保存在内存里。HBase 也是这样实现的(HRegionServer 的 BlockCache)。
对于海量级的数据,首先要解决的是存储的问题。数据存储上,HBase 将表切分成一个个的 HRegion 托管到HRegionServer 上,类似关系型数据库的分区表,但比关系型数据库分区、分库易用。
解决完存储接下来聊访问。数据表被切分成多个 HRegion,用户在访问数据时,如何找到某条数据对应的 HRegion呢?这里我们分别讲解 0.96 以前和 0.96 以后两种情况。
三层索引
HBase 0.96 以前
HBase 0.96 以前内部维护了两张特殊的表: -ROOT-
表和.META.
表,用来查找各种表的 HRegion 位置。这两张特殊的表也像 HBase 中的其他表一样会切分成多个HRegion。 -ROOT-
表比 .META.
更特殊一些,永远不会切分超过一个HRegion,这样保证了只需要三次跳转,就能定位到任意 HRegion。
-ROOT-
:记录 .META. 表的 HRegion 信息。.META.
:记录用户的表的 HRegion 信息。
而 -ROOT-
表的 HRegion 位置信息存放在 ZooKeeper
中,通过ZooKeeper
可以找到 -ROOT-
的 HRegion 托管的HRegionServer。再通过-ROOT-
表找到 .META.
表的 HRegion 位置。.META.
表中存放着用户的表的 HRegion 切分信息。
整个流程为: Client
→ZooKeeper
→ -ROOT-
→ .META.
→ 用户的表的 HRegion
。
当用户的表特别大时,用户的表的 HRegion 也会非常多。.META. 表存储了这些 HRegion 信息,也会变得非常大,这时.META. 自己也需要划分成多个 HRegion,托管到多个 HRegionServer 上。这时就出现了一个问题:当 .META. 被托管在多个RegionServer 上,如何去定位 .META. 呢? HBase 的做法是用另外一个表来记录 .META. 的 HRegion 信息,就和 .META. 记录用户的表的 HRegion 一样,这个表就是 -ROOT- 表。
-ROOT- 表永远只有一个 HRegion,也就是说只会存放在一台 HRegionServer 上,这个信息至关重要,是所有客户端定位 HRegion 的入口,所以这个映射信息存储在 ZooKeeper 上面。
HBase 0.96 以后
HBase 0.96 以后,-ROOT- 表被移除,直接将 .META. 表 HRegion 位置信息存放在 ZooKeeper 中,并将 .META. 表更名为hbase:meta
。
此时整个流程为:Client
→ ZooKeeper
→ hbase:meta
→ 用户的表的 HRegion
。
hbase:meta
表结构如下:
- rowkey:表名,格式为
表名,起始键,HRegion的时间戳.Encode编码.
; - table:state:表的状态,启用还是禁用状态;
- info:state:HRegion 的状态,正常情况下为 OPEN;
- info:server:HRegionServer 的地址和端口,如 node03:16020;
- info:serverstartcode:HRegionServer 启动的 13 位时间戳;
- info:sn:server 和 serverstartcode 的组合,如 node03:16020,1662183040273;
- info:seqnumDuringOpen:HRegion 在线时长的二进制串;
- info:regioninfo:HRegion 的详细信息,如:ENCODED、NAME、STARTKEY、ENDKEY 等。
部分内容如下:
读取数据流程
数据组织
让我们回顾一下 HBase 数据的组织架构,首先 Table 横向切割为多个 HRegion ,按照一个列族的情况,每一个HRegion 中包含一个 MemStore 和多个 HFile 文件。结合这些背景信息,我们可以把一个读取请求的处理转化为下面的问题:如何从一个 MemStore,多个 HFile 中获取到用户需要的正确的数据(默认情况下是最新版本,非删除,没有过期的数据。同时用户可能会设定 Filter,指定返回条数等过滤条件)。
首先,在 HRegionServer 内部,会把读取可能涉及到的所有组件都初始化为对应的 Scanner 对象,针对 HRegion 的读取,封装为一个 RegionScanner 对象,而一个列族对应一个 Store,对应封装为 StoreScanner,在 Store 内部,MemStore则封装为 MemStoreScanner,每一个 HFile 都会封装为 StoreFileScanner。最后数据的查询就会落在 MemStoreScanner 和StoreFileScanner 上。
这些 Scanner 首先根据 Scan 的 TimeRange 和 RowKey Range 会过滤掉一些,剩下的 Scanner 在 HRegionServer 内部组成一个最小堆 KeyValueHeap,该数据结构核心为一个 PriorityQueue 优先级队列,队列里按照 Scanner 指向的 KeyValue 排序。
// 用来组织所有的Scanner
protected PriorityQueue<KeyValueScanner> heap = null;
// PriorityQueue当前排在最前面的Scanner
protected KeyValueScanner current = null;
最终完成读取流程如下(以下为 Scan 流程,理解了 Scan,Get 自然就懂了):
- Client 访问 ZooKeeper,获取 hbase:meta 所在 HRegionServer 的节点信息;
- Client 访问 hbase:meta 所在的 HRegionServer,获取 hbase:meta 记录的元数据后先加载到内存中,然后再从内存中查询出 RowKey 所在的 HRegion (HRegion 所在的 HRegionServer);
- Client 对 RowKey 所在的 HRegion 对应的 HRegionServer 发起读取数据请求;
- HRegionServer 构建 RegionScanner(需要查询的 RowKey 分布在多少个 HRegion 中就需要构建多少个RegionScanner),用于对该 HRegion 的数据检索;
- RegionScanner 构建 StoreScanner(HRegion 中有多少个 Store 就需要构建多少个 StoreScanner,Store 的数量取决于Table 的 ColumnFamily 的数量),用于对该列族的数据检索;
- 所有的 StoreScanner 合并构建最小堆(已排序的完全二叉树)StoreHeap:PriorityQueue;
- StoreScanner 构建一个 MemStoreScanner 和一个或多个 StoreFileScanner(数量取决于 StoreFile 数量);
- 过滤掉能够确定所要查询的 RowKey 一定不在的 StoreFileScanner 或 MemStoreScanner(布隆过滤器);
- 经过筛选后留下的 Scanner 开始做读取数据的准备,将对应的 StoreFile 定位到满足的 RowKey 的起始位置;
- 将所有的 StoreFileScanner 和 MemStoreScanner 合并构建最小堆 KeyValueHeap:PriorityQueue,排序的规则按照KeyValue 从小到大排序;
- 从 KeyValueHeap:PriorityQueue 中经过一系列筛选后一行行的得到需要查询的 KeyValue。
默认情况下:先从 BlockCache 查找数据,如果没有,再从 MemStore 上查找,如果 MemStore 中也没有,再到StoreFile 上进行查找。其中 StoreFile 的扫瞄先会使用 Bloom Filter 过滤那些不可能符合条件的 HFile,然后使用 Data BlockIndex 快速定位 Cell,并将其加载到 BlockCache 中,然后从 BlockCache 中读取,目的是为了加快后续的查询,然后在返回结果给客户端。
面试题
面试题:为什么 HBase 可以做到百亿数据秒级查询?
HBase 适合存储 PB 级别的海量数据(百亿千亿量级条记录),如果根据记录主键 RowKey 来查询,能在几十到百毫秒内返回数据。HBase 是如何做到的呢?接下来,简单阐述一下数据的查询思路和过程。
查询过程
第一步
项目有 100 亿业务数据,存储在一个 HBase 集群上(由多个服务器数据节点构成),每个数据节点上有若干个HRegion(区域),每个 HRegion 实际上就是 HBase 中一批数据的集合(一段连续范围 RowKey 的数据)。
现在我们根据主键 RowKey 来查询对应的记录,通过 hbase:meta 表可以帮我们迅速定位到该记录所在的数据节点,以及数据节点中的 HRegion,目前我们有 100 亿条记录,占空间 10TB。所有记录被切分成 5000 个 HRegion,每个 HRegion大约 2G。
由于记录在 1 个 HRegion 中,所以我们只需要查询这 2G 的 HRegion,就可以找到对应的记录。
第二步
由于 HBase 存储数据是按照列族存储的。比如一条记录有 400 个字段,前 100 个字段是人员信息相关,这是一个列族(列的集合);中间 100 个字段是公司信息相关,是一个列族。另外 100 个字段是人员交易信息相关,也是一个列族;最后还有 100 个字段是其他信息,也是一个列族。
这四个列族是分开存储的,假设 2G 的 HRegion 文件中,分为 4 个列族,那么每个列族就是 500M。我们只需要遍历这500M 的列族,就可以找到对应的记录。
第三步
1 个列族在 HDFS 中会包含 1 个或者多个 HFile(StoreFile)。如果一个 HFile 的大小为 100 M,那么该列族包含 5 个HFile 在磁盘上或内存中。由于 HBase 内存 Flush 到磁盘中的数据是排好序的,要查询的记录有可能在最前面,也有可能在最后面,按平均来算,我们只需遍历 2.5 个 HFile 共 250M,就可以找到对应的记录。
第四步
每个 HFile 中,是以键值对(Key/Value)方式存储,只要遍历文件中的 Key 位置并判断符合条件即可。一般 Key 是有限的长度,假设 Key/Value 比是 1:24,最终只需要 10M 的数据量,就可以找到对应的记录。
如果数据在机械磁盘上,按其访问速度 100M/S,只需 0.1 秒即可查到。如果是 SSD 的话,0.01 秒即可查到。当然,扫描 HFile 时还可以通过布隆过滤器快速定位到对应的 HFile,以及 HBase 是有内存缓存机制的,如果数据在内存中(BlockCache),效率会更高。
第 N 步
HFile 继续划分,有 Data Block,Data Block Index,Trailler 等组成,已经定位到 RowKey 所在的 HFile 时,会先读取HFile 的 Trailer 的信息以获取 Data Block Index 的位置。Data Block Index 的 Key 就是 Data Block 的 RowKey,所以通过 DataBlock Index 的 Key 就能精确的定位到要检索的 RowKey 在哪个 Data Block 上,然后直接将该 Data Block 读取到内存,需要注意的是这里的 Data Block 已经很小了(默认是 64K,不同于 HDFS 上的 Block 默认为 128M,HBase 的 HFile 中的 Block 要小的多)这样子足以读取该 Block 到内存中,将该 Block 进行遍历就能获取到需要的 RowKey 并取出数据。因为这里的Block 只有 64K,所以遍历会非常迅速。这就是为什么 HFile 的 Data Block 要设置的如此之小的原因。
总结
正因为以上流程,即使数据量剧增,也不会导致 HBase 的查询性能下降。同时,HBase 是一个面向列存储的数据库(列族机制),当表字段非常多时,可以把其中一些字段独立出来放在一部分机器上,而另外一些字段放到另一部分机器上,分散存储,分散列查询。
正是由于这样复杂的存储结构和分布式的存储方式,保证了 HBase 海量数据下的查询效率。
写入数据流程
-
Client 访问 ZooKeeper,获取 hbase:meta 所在 HRegionServer 的节点信息;
-
Client 访问 hbase:meta 所在的 HRegionServer,获取 hbase:meta 记录的元数据后先加载到内存中,然后再从内存中查询出 RowKey 所在的 HRegion (HRegion 所在的 HRegionServer);
-
Client 对 RowKey 所在的 HRegion 对应的 HRegionServer 发起写入数据请求;
-
建立连接后,首先将 DML 要做的操作写入到日志 HLog;
-
然后将数据的修改更新到 MemStore 中,本次操作结束。一个 HRegion 由多个 Store 组成,一个 Store 对应一个列族,Store 包括位于内存中的 Memstore 和位于磁盘的 StoreFile,写操作先写入 MemStore;
-
当 MemStore 数据达到阈值后(默认 128M),创建一个新的 MemStore;
-
旧的 MemStore 将刷写为一个独立的 StoreFile(HRegionServer 会启动 FlushCache 进程写入 StoreFile)并存放到HDFS,最后删除 HLog 中的历史数据。
-
当 StoreFile 文件的数量增长到一定阈值后,系统会进行合并(次/小 Minor Compaction、主/大 Major Compaction);
-
在合并过程中会进行版本合并和删除工作,形成更大的 StoreFile;
-
当一个 HRegion 所有 StoreFile 的大小和数量超过一定阈值后,会把当前的 HRegion 分割为两个,并由 HMaster 分配到相应的 HRegionServer 服务器,实现负载均衡。
数据刷写
触发时机
内存阈值
HRegion 中的每个 MemStore 占用的内存超过相关阈值 hbase.hregion.memstore.flush.size
时会触发刷写,默认为 128MB。
如果我们的数据增加得很快,达到了 hbase.hregion.memstore.flush.size * hbase.hregion.memstore.block.multiplier(默认为 4) 的大小,也就是 128 * 4 = 512MB 的时候,除了触发 MemStore 刷写之外,HBase 还会在刷写的时候阻塞所有写入该 Store 的请求。
内存总和
整个 HRegionServer 的 MemStore 占用内存总和大于相关阈值时会触发刷写。如果达到了 HRegionServer 级别的刷写,当前 HRegionServer 的所有写操作将会被阻塞,这个阻塞可能会持续到分钟级别。
HBase 为 HRegionServer 所有的 MemStore 分配了一定的写缓存,大小等于 hbase_heapsize(HRegionServer 占用的堆内存大小) * hbase.regionserver.global.memstore.size(默认为 0.4)。
相关阈值计算公式为:hbase_heapsize * hbase.regionserver.global.memstore.size*hbase.regionserver.global.memstore.size.lower.limit(默认为 0.95) = MAX_SIZE
。例如:HBase 堆内存总共是 32G,MemStore 占用内存为:32 * 0.4 * 0.95 = 12.16G 将触发刷写。
日志阈值
HBase 使用了 WAL 机制(日志先行),当数据到达 HRegion 时是先写入日志的,然后再被写入到 MemStore。如果日志的数量越来越大,这就意味着 MemStore 中未持久化到磁盘的数据越来越多。当 HRegionServer 挂掉的时候,恢复时间将会变得很长,所以有必要在日志到达一定的数量时进行一次刷写操作。相关公式为:Math.max(32, hbase_heapsize *hbase.regionserver.global.memstore.size * 2 / logRollSize)。
定期刷写
当定时器到达 hbase.regionserver.optionalcacheflushinterval
(默认值 3600000 毫秒,即 1 小时)时,HBase 会自动触发刷写。一般建议调大,比如 10 小时,因为很多场景下 1 小时 Flush 一次会产生很多小文件,一方面导致Flush 比较频繁,另一方面导致小文件很多,影响随机读性能。
更新频率
如果 HBase 的某个 HRegion 更新的很频繁,而且既没有达到自动刷写阀值,也没有达到内存的使用限制,但是内存中的更新数量已经足够多,比如超过 hbase.regionserver.flush.per.changes
参数配置,默认为 30000000 次,也会触发刷写。
手动刷写
Shell 中通过 flush
命令。
hbase> flush 'TABLENAME'
hbase> flush 'REGIONNAME'
hbase> flush 'ENCODED_REGIONNAME'
hbase> flush 'REGION_SERVER_NAME'
注意
以上所有条件触发的刷写操作最后都会检查对应的 Store 包含的 StoreFiles 文件数是否超过hbase.hstore.blockingStoreFiles
参数配置的个数,默认为 16。如果满足这个条件,那么当前刷写会被推迟到hbase.hstore.blockingWaitTime
参数设置的时间后再刷写。
如果是阻塞刷写,HBase 还会请求 Compaction 压实处理或者 Split 分割操作。
8.2. 刷写策略
HBASE 1.1 之前:MemStore 刷写是 HRegion 级别的。就是说,如果要刷写某个 MemStore ,MemStore 所在的 HRegion中其他的 MemStore 也是会被一起刷写的(简单的理解:Flush 一个列族时其它列族也会一起 Flush)。
HBASE 2.x 之后:
- FlushAllStoresPolicy:HBASE 1.1 的策略。
- FlushAllLargeStoresPolicy:判断 HRegion 中每个 MemStore 的使用内存是否大于指定阀值,大于阀值的 MemStore 将会被刷写。阈值计算公式:
flushSizeLowerBound $=$ max((long)128 / 3, 16) = 42
。 - FlushNonSloppyStoresFirstPolicy:将 Region 中的 MemStore 按照
isSloppyMemStore
分到两个 HashSet 里面(sloppyStores
和regularStores
)然后:- 判断
regularStores
里面是否有 MemStore 内存占用大于相关阀值的 MemStore,有的话就会对这些 MemStore进行刷写,其他的不做处理,这个阀值计算和FlushAllLargeStoresPolicy
的阀值计算逻辑一致。 - 如果
regularStores
里面没有 MemStore 内存占用大于相关阀值的 MemStore,这时候就开始在sloppyStores
里面寻找是否有 MemStore 内存占用大于相关阀值的 MemStore,有的话就会对这些 MemStore 进行刷写,其他的不做处理。 - 如果上面
sloppyStores
和regularStores
都没有满足条件的 MemStore 需要刷写,这时候就将FlushNonSloppyStoresFirstPolicy
策略久退化成FlushAllStoresPolicy
策略了。
- 判断
FlushAllLargeStoresPolicy 在 HBase 2.0 之前版本是 FlushLargeStoresPolicy,后面被拆分成分 FlushAllLargeStoresPolicy和 FlushNonSloppyStoresFirstPolicy,参见 HBASE-14920。这种策略会先判断 HRegion 中每个 MemStore 的使用内存(OnHeap + OffHeap)是否大于某个阀值,大于这个阀值的 MemStore 将会被刷写。
HBase 2.0 引入了 in-memory compaction,参见 BASE-13408。如果我们对相关列族
hbase.hregion.compacting.memstore.type
参数的值不是 NONE ,那么这个 MemStore 的isSloppyMemStore
值就是 true,否则就是 false。
刷写流程
prepareFlush 阶段
刷写的第一步是对 MemStore 做 Snapshot,为了防止刷写过程中更新的数据同时在 Snapshot 和 MemStore 中而造成后续处理的困难,所以在刷写期间需要持有 updateLock。持有了 updateLock 之后,将阻塞客户端的写操作。所以只在创建 Snapshot 期间持有 updateLock,而且 Snapshot 的创建非常快,所以此锁期间对客户的影响一般非常小。对 MemStore做 Snapshot 是在 internalPrepareFlushCache 里面进行的。
flushCache 阶段
如果创建快照没问题,那么返回的 result 将为 null。这时候我们就可以进行下一步 internalFlushCacheAndCommit。其实 internalFlushCacheAndCommit 里面包含两个步骤:flushCache 阶段和 commit 阶段。
-
flushCache 阶段:将 prepareFlush 阶段创建好的快照写到临时文件里面,临时文件是存放在对应 HRegion 文件夹下面的 .tmp 目录里面。
-
commit 阶段:将 flushCache 阶段生产的临时文件移到(rename)对应的列族目录下面,并做一些清理工作,比如删除第一步生成的 Snapshot。
数据合并
承载了大量 IO 请求但是文件很小的 HFile,Compaction 本身不会消耗太多 IO,而且合并完成之后对读的性能会有显著提升。
合并分类
HBase 根据合并规模将压实 Compaction 分为了两类:Minor Compaction 和 Major Compaction。
Minor Compaction (次要/小)
选取一些小的、相邻的 StoreFile 将他们合并成一个更大的 StoreFile,在这个过程中不做任何删除数据、多版本数据的清理工作,但是会对 minVersion $_{1=0}$ 并且设置 TTL 的过期版本数据进行清理。一次 Minor Compaction 的结果是让小的StoreFile 变的更少并且产生更大的 StoreFile。
Major Compaction(主要/大)
将所有的 StoreFile 合并成一个 StoreFile 清理三类无意义数据:被删除的数据、TTL 过期数据、版本号超过设定版本号的数据。一般情况下,Major Compaction 时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会关闭自动触发 Major Compaction 功能,改为手动在业务低峰期触发。
总结:
- Minor Compaction:快速让小文件合并成大文件
- Major Compaction:清理大文件不必要的数据,释放空间
合并时机
触发 Compaction 的方式有三种:MemStore 刷盘、后台线程周期性检查、手动触发。
MemStore 刷盘
MemStore Flush 会产生 HFile 文件,文件越来越多就需要 Compact。每次执行完 Flush 操作之后,都会对当前 Store 中的文件数进行判断,一旦文件数大于配置,就会触发 Compaction。Compaction 都是以 Store 为单位进行的,整个 HRegion的所有 Store 都会执行 Compact。
周期性检查
后台线程定期触发检查是否需要执行 Compaction,检查周期可配置。线程先检查文件数是否大于配置,一旦大于就会触发 Compaction。如果不满足,它会接着检查是否满足 Major Compaction 条件(默认 7 天触发一次,可配置手动触发)。
周期性检查线程 CompactionChecker 大概 2hrs 46mins 40sec 执行一次。计算公式为:hbase.server.thread.wakefrequency(默认为 10000 毫秒) * hbase.server.compactchecker.interval.multiplier(默认1000)。
手动执行
一般来讲,手动触发 Compaction 通常是为了执行 Major Compaction,一般有这些情况需要手动触发合并:
- 因为很多业务担心自动 Major Compaction 影响读写性能(可以选择直接关闭),因此会选择低峰期手动触发;
- 用户在执行完 alter 操作之后希望立刻生效,手动执行触发 Major Compaction;
- HBase 管理员发现硬盘容量不够的情况下手动触发 Major Compaction 删除大量过期数据。
合并策略
线程池
HBase CompacSplitThread 类内部对于 Split、Compaction 等操作专门维护了各自所使用的线程池。和 Compaction 相关的是 longCompactions 和 shortCompactions。前者用来处理大规模 Compaction,后者处理小规模 Compaction。默认值为 2 * maxFlilesToCompact(默认为 10) * hbase.hregion.memstore.flush.size,如果 flush size 大小是 128M,该参数默认值就是 2 * 10 * 128M = 2.5G。
合并策略
HBase 主要有两种 Minor Compaction 策略:RatioBasedCompactionPolicy(0.96.x 之前) 和 ExploringCompactionPolicy(当前默认)。
- RatioBasedCompactionPolicy(基于比列的合并策略):从老到新逐一扫描 HFile 文件,满足以下条件之一停止扫描:当前文件大小 $<$ 比当前文件新的所有文件大小总和 * Ratio(高峰期1.2,非高峰期5),当前所剩候选文件数 $<=$ 阈值(默认为3)。例如:当前文件 $2\mathsf{G}<$ 所有文件大小总和 $16^{\star}1.2$ ,高峰期不合并,非高峰期合并;
- ExploringCompactionPolicy 策略(默认策略):基于 Ratio 策略,不同之处在于 Ratio 策略找到一个合适文件集合就停止扫描,而 Exploring 策略会记录所有合适的文件集合,然后寻找最优解,待合并文件数最多或者待合并文件数相同的情况下文件较小的进行合并;
-
FIFO Compaction 策略:收集过期文件并删除,对应业务的列族必须设置有 TTL;
-
Tier-Based Compaction 策略(分层策略):针对数据热点情况设计的策略,根据候选文件的新老程度将其划分为不同的等级,每个等级都有对应的 Ratio,表示该等级文件比选择为参与 Compation 的概率(基于 Ratio 策略根据热点情况设置不同的 Ratio);
-
Stripe Compation 策略(条纹策略):将整个 Store 中的文件按照 Key 划分为多个 Range,此处称为 Stripe,一个 Stripe 内部就类似于一个小 HRegion,可以执行 Minon Compation 和 Major Compation。
文件合并
分别读出待合并 HFile 文件的 KV,并顺序写到位于 ./tmp 目录下的临时文件中,再将临时文件移动到对应 HRegion 的数据目录。将 Compaction 的输入文件路径和输出文件路径封装为 KV 写入 WAL 日志,并打上 Compaction 标记,最后强制执行 Sync。将对应 HRegion 数据目录下的 Compaction 输入文件全部删除。
数据切分
通过切分,一个 HRegion 变为两个近似相同大小的子 HRegion,再通过 balance 机制均衡到不同 HRegionServer上,使系统资源使用更加均衡。
切分原因
数据分布不均匀
同一 HRegionServer 上数据文件越来越大,读请求也会越来越多。一旦所有的请求都落在同一个 HRegionServer 上,尤其是很多热点数据,必然会导致很严重的性能问题。
Compaction 性能损耗严重
Compaction 本质上是一个排序合并的操作,合并操作需要占用大量内存,因此文件越大,占用内存越多。Compaction 有可能需要迁移远程数据到本地进行处理(balance 之后的 Compaction 就会存在这样的场景),如果需要迁移的数据是大文件的话,带宽资源就会损耗严重。
资源耗费严重
HBase 的数据写入量也是很惊人的,每天都可能有上亿条的数据写入不做切分的话一个热点 HRegion 的新增数据量就有可能几十G,用不了多长时间大量读请求就会把单台 HRegionServer 的资源耗光。
触发时机
在 HBase 中 Split 是一个很重要的功能,HBase 是通过把数据分配到一定数量的 HRegion 来达到负载均衡的。一个Table 会被分配到一个或多个 HRegion 中,这些 HRegion 会被分配到一个或者多个 HRegionServer 中。在自动 Split 策略中,当一个 HRegion 达到一定的大小就会自动 Split 成两个 Region。Table 在 HRegion 中是按照 RowKey 来排序的,并且一个 RowKey 所对应的行只会存储在一个 HRegion 中,这一点保证了 HBase 的强一致性。
当一个 Table 刚被创建的时候,HBase 默认的分配一个 HRegion 给 Table。也就是说这个时候,所有的读写请求都会访问到同一个 HRegionServer 的同一个 HRegion 中,这个时候就达不到负载均衡的效果了,集群中的其他 HRegionServer 可能处于比较空闲的状态。解决这个问题可以用 pre-splitting 在创建 Table 时提前生成多个 HRegion。
在 Table 初始化的时候如果不配置的话,HBase 是不知道如何去 Split HRegion 的,因为 HBase 不知道应该把哪个RowKey 作为 Split 的开始点。如果我们可以大概预测到 RowKey 的分布,我们可以使用 pre-spliting 来帮助我们提前 SplitHRegion。
如果我们的预测不是特别准确,还是会导致某个 HRegion 过热被集中访问,不过还好我们还有 auto-split,默认按 10G自动切分。但是如果文件到达 9G 后迟迟未到 10G 此时对于 HBase 来说是比较难受的。最好的办法就是首先预测 Split 的切分点,做 pre-splitting,后面再交给 auto-split 来处理。
HBase 在每次数据合并之后都会针对相应 HRegion 生成一个 requestSplit 请求,requestSplit 首先会执行 checkSplit,检测 FileSize 是否达到阈值,如果超过阈值,就进行切分。
HBase 自带了两种 pre-split 的算法,分别是 HexStringSplit
和 UniformSplit
。如果我们的 RowKey
是十六进制的字符串作为前缀的,就比较适合用 HexStringSplit 作为 pre-split 的算法。例如,我们使用 HexHash(prefix) 作为 RowKey的前缀,其中 HexHash 为得到十六进制字符串的 hash 算法。我们也可以用我们自己的 Split 算法。
当一个 HRegion 达到一定的大小,他会自动 Split 成两个 HRegion。如果我们的 HBase 版本是 0.94 之后,那么默认的有三种自动 Split 的策略,ConstantSizeRegionSplitPolicy,IncreasingToUpperBoundRegionSplitPolicy 还有KeyPrefixRegionSplitPolicy。
在 0.94 版本之前 ConstantSizeRegionSplitPolicy 是默认和唯一的 Split 策略。当某个 Store(对应一个 Column Family)的大小大于配置值 hbase.hregion.max.filesize 的时候(默认 10G)HRegion 就会自动分裂。
而 0.94 版本之后 IncreasingToUpperBoundRegionSplitPolicy 是默认的 Split 策略。这个策略中,最小的分裂大小和Table 的某个 HRegionServer 的 HRegion 个数有关,当 StoreFile 的大小大于以下公式得出的值的时候就会 Split。公式如下:
# R 为同一个 Table 中在同一个 HRegionServer 中的 HRegion 的个数
Min(R^2 * "hbase.hregion.memstore.flush.size", "hbase.hregion.max.filesize")
例如:
- hbase.hregion.memstore.flush.size 默认值 128MB。
- hbase.hregion.max.filesize 默认值为 10GB。
- 如果初始时
R=1
,那么Min(128MB, 10GB)=128MB
,也就是说在第一个 Flush 的时候就会触发分裂操作。 - 当
R=2
的时候Min(2*2*128MB, 10GB)=512MB
,当某个 StoreFile 大小达到 512MB 的时候,就会触发分裂。 - 如此类推,当
R=9
的时候,StoreFile 达到 10GB 的时候就会分裂,也就是说当R>=9
的时候,StoreFile 达到 10GB 的时候就会分裂。
KeyPrefixRegionSplitPolicy 可以保证相同的前缀的 RowKey 保存在同一个 HRegion 中。指定 RowKey 前缀位数划分HRegion,通过读取 KeyPrefixRegionSplitPolicy.prefix_length 属性,该属性为数字类型,表示前缀长度,在进行 Split 时,按此长度对 SplitPoint 进行截取。此种策略比较适合固定前缀的 RowKey。当 Table 中没有设置该属性,指定此策略效果等同与使用IncreasingToUpperBoundRegionSplitPolicy。
我们可以通过配置 hbase.regionserver.region.split.policy 来指定 Split 策略,也可以写我们自己的 Split 策略。
切分流程
寻找切分点
将一个 HRegion 切分为两个近似大小的子 HRegion,首先要确定切分点。切分操作是基于 HRegion 执行的,每个HRegion 有多个 Store(对应多个 Column Famliy)。系统首先会遍历所有 Store,找到其中最大的一个,再在这个 Store 中找出最大的 HFile,定位这个文件中心位置对应的 RowKey,作为 HRegion 的切分点。
开启切分事务
切分线程会初始化一个 SplitTransaction 对象,从字面上就可以看出来 Split 流程是一个类似“事务”的过程,整个过程分为三个阶段:prepare - execute - rollback。
prepare 阶段
在内存中初始化两个子 HRegion,具体是生成两个 HRegionInfo 对象,包含 tableName、regionName、startkey、endkey 等。同时会生成一个 Transaction Journal,这个对象用来记录切分的进展。
execute 阶段
- HRegionServer 更改 ZK 节点 /region-in-transition 中该 HRegion 的状态为 SPLITING。
- HMaster 检测到 HRegion 状态改变。
- HRegion 在存储目录下新建临时文件夹 .split 保存 Split 后的 Daughter Region 信息。
- Parent HRegion 关闭数据写入并触发 Flush 操作,将写入 HRegion 的数据全部持久化到磁盘。
- 在 .split 文件夹下新建两个子文件夹,称之为 Daughter A、Daughter B,并在文件夹中生成引用文件,分别指向父HRegion 中对应文件。
- 将 Daughter A、Daughter B 拷贝到 HBase 根目录下,形成两个新的 HRegion。
- Parent HRegion 通知修改 hbase.meta 表后下线,不再提供服务。
- 开启 Daughter A、Daughter B 两个子 HRegion。
- 通知修改 hbase.meta 表,正式对外提供服务。
rollback 阶段
如果 execute 阶段出现异常,则执行 rollback 操作。为了实现回滚,整个切分过程被分为很多子阶段,回滚程序会根据当前进展到哪个子阶段清理对应的垃圾数据。
切分优化
对于预估数据量较大的表,需要在创建表的时候根据 RowKey 执行 HRegion 的预分区。通过 HRegion 预分区,数据会被均衡到多台机器上,这样可以一定程度解决热点应用数据量剧增导致的性能问题。
表设计
HBase 表设计是和需求相关的,但是遵守表设计的一些硬性指标对性能的提升还是很有帮助的,这里整理了一些设计时要用到的要点。
行健设计
重要性
原因
HBase 中的行默认按行键的字典序进行排序,这种设计优化了扫描(Scan),允许将相关的行或彼此靠近的行一起读取。但是,设计不佳的行键是 Hot-Spotting 的常见来源。
热点发生在大量 Client 直接访问集群的一个或极少数个节点,大量访问会使热点 HRegion 所在的单个机器超出自身承受能力,性能下降甚至 HRegion 不可用。这也会对同一台区域服务器(HRegionServer)托管的其他区域(HRegion)产生不利影响(主机资源全部被这个热点 HRegion 占用,已无法服务其他 HRegion 的请求),所以行健的设计以使群集得到充分和均匀的利用就变得非常重要。
策略
防止数据热点的有效措施:
-
反转策略:反转固定长度或者数字格式的 RowKey。这样可以使得 RowKey 中经常改变的部分放在前面。这样可以有效的随机 RowKey,但是牺牲了 RowKey 的有序性。例如:以手机号为 RowKey,可以将手机号反转后的字符串作为RowKey,这样就避免了以手机号那样比较固定的开头而导致的热点问题。
-
加盐策略:这里所说的加盐不是密码学中的加盐,而是在 RowKey 的前面增加随机数,具体就是给 RowKey 分配一个随机前缀以使得它和之前的 RowKey 的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的 HRegion 的数量一致。加盐之后的 Rowkey 就会根据不同的前缀分散到各个 HRegion 上以避免热点。例如:
- A-user-1、A-user-10、A-user-11B-user-2
- B-user-3C-user-33
- C-user-4、C-user-5
-
哈希策略:哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的RowKey,并且可以使用 Get 操作准确获取某一个行数据。例如:
- 原本 RowKey 为 1001 的,MD5 后变成:b8c37e33defde51cf91e1e03e51657da
- 原本 RowKey 为 3001 的,MD5 后变成:908c9a564a86426585b29f5335b619bc
- 原本 RowKey 为 5001 的,MD5 后变成:03b264c595403666634ac75d828439bc
反转、加盐、Hash 都属于散列思想,目的就是把 RowKey 打散,但是又有迹可循。
案例
加盐案例:
假设有以下的行键,字母表中的每个字母都表示一个区域。前缀"a"是一个区域,前缀"b"是另一个区域,前缀"o"是另一个区域。
apple-1
apple-10
apple-11
apple-2
apple-3
apple-33
apple-4
apple-5
banana-1
banana-10
banana-11
banana-2
banana-3
banana-33
banana-4
banana-5
orange-1
orange-10
orange-11
orange-2
orange-3
orange-33
orange-4
orange-5
现在,想象你想要在三个不同的地区传播这些信息。你决定使用三个不同的 Salt:B(北京),S(上海),G(广州)。在这种情况下,每个字母前缀将位于不同的区域。应用 Salt 后,你可以使用以下 RowKey。由于你现在可以写入三个不同的区域,因此理论上写入时的吞吐量是如果所有写入操作都在同一个区域的三倍,但在读取期间会产生成本。
B-apple-1
B-apple-10
B-apple-11
S-apple-2
S-apple-3
G-apple-33
G-apple-4
G-apple-5
预分区
HBase 中,表会被划分为 1…n 个 HRegion,被托管在 HRegionServer中。HRegion 有两个非常重要的属性:StartKey 与EndKey,表示这个 HRegion 维护的 RowKey 范围,当我们要读/写数据时,如果 RowKey 落在某个 StartKey - EndKey 范围内,那么就会定位到目标 HRegion 并且读/写到相关的数据。
Pre-Creating Regions。默认情况下,在创建 HBase 表的时候会自动创建一个 HRegion 分区,当导入数据的时候,所有的 HBase 客户端都会向这一个 HRegion 写数据,直到这个 HRegion 足够大了才进行切分,所以建表时一般会提前预分区,这样当数据写入 HBase 时,会按照 HRegion 分区的情况,在集群内做数据的负载均衡。好的行键设计一般配合预分区一起使用。
注意:行健不能改变,唯一可以改变的方式是先删除再插入。
三原则
唯一原则
必须在设计上保证其唯一性。其次,RowKey 是按字典序排序存储的,设计 RowKey 时,要充分利用这个排序的特点,将经常一起读取的行存放到一起。
如果无法确定唯一性,可以使用组合键。组合键是指拼接多个业务字段,例如: 手机号时间戳
, 用户ID时间戳
, 用户ID订单ID
等。使用组合键时需要注意字段组合的顺序。比如一对多的关系,那么一应该放在前面,以便能够Scan 得到结果。例如, 用户ID订单ID
,如果反过来则无法得到用户下的所有订单。
长度原则
RowKey 是一个二进制码流,最大长度是 64KB,建议越短越好。10-100 长度即可,不要超过 16 个字节。数据的持久化文件 HFile 中是按照 KeyValue 存储的,如果 RowKey 过长比如 100 个字节,1000 万列数据光 RowKey 就要占用 100 *$1000\sqrt{5}=10$ 亿个字节,将近 1G 数据,这会极大影响 HFile 的存储效率。
MemStore 将缓存部分数据到内存,如果 RowKey 字段过长,内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此 RowKey 的字节长度越短越好。目前操作系统是都是 64 位系统,内存 8 字节对齐。控制在16 个字节,8 字节的整数倍利用操作系统的最佳特性。
根据不同的场景,在有些情况下,还需要对齐 RowKey 的长度:因为 RowKey 是按字典序排列的,所以需要对齐长度,比如 ID 取 12 位,9 位 ID 前就需要补齐 3 个 0,否则就会出现 123456789 比 654321 排在前面的问题。对齐长度后,000000654321 就会排在 000123456789 之前,符合预期。
散列原则
如果 RowKey 是按时间戳的方式递增,不要将时间放在二进制码的前面。建议将 RowKey 的高位作为散列字段,低位放时间字段,这样将提高数据均衡分布在每个 HRegionServer 实现负载均衡的几率。例如:1000020220901100250135、2000020220901122335367、3000020220901155117621。
如果没有散列字段,首字段直接是时间信息,那么所有新产生的数据都会写入一个 HRegionServer,从而造成热点现象。这样在做数据检索的时候负载将会集中在个别 HRegionServer,降低查询效率。
反转、加盐、Hash 都属于散列思想,目的就是把 RowKey 打散,但是又有迹可循。
总结
HBase 的 RowKey 设计需要遵循以下原则:
-
唯一原则:
- 单主键
- 组合主键(注意顺序)
-
长度原则:
- 不要超过 16 个字节
- 对齐 RowKey 长度
-
散列原则:
- 反转
- 加盐
- Hash
列族设计
最优设计
追求原则:在合理范围内,尽可能的减少列族。
最优设计:将所有相关性很强的 key-value 都放在同一个列族。这样既能做到查询效率最高,也能保证尽可能少的访问不同的磁盘文件。
控制长度:列族名的长度要尽量小,一个为了节省空间,一个为了加快效率,最好是一个字符,比如 d 表示 data 或default,v 表示 value。
列族属性
HFile 会被切分为多个大小相等的 Block,每一个 Block 大小可以在创建表列族的时候通过 BlockSize 参数指定,默认是 64K。Block 大的话一次加载进内存的数据就多,扫描查询 Scan 效果好,但是 Block 小的话,随机查询 Get 效果好。
hbase> create 'tableName', {NAME => 'cf1', BLOCKSIZE => '65536'}
BlockCache:数据块缓存默认是打开的,如果一些比较少访问的数据可以选择关闭缓存。
hbase> create 'tableName', {NAME => 'cf1', BLOCKCACHE => 'FALSE'}
Compression:压缩会提高磁盘利用率,但是会增加CPU的负载,看情况进行控制。
hbase> create 'tableName', {NAME => 'cf1', COMPRESSION => 'SNAPPY'}
案例设计
单表设计
中国移动通话记录
-
数据量统计:1E手机号码 * 100通话记录 * 365 = 36500E 通话记录/年。
-
查询方式:从开始日期到结束日期。
-
实现思想:尽量将要查询的数据体现在主键上,借助于 RowKey 实现快速定位。
方案一:
-
身份证号_手机号码
,所有的数据都存放到一行,通过版本进行控制 -
缺点:
- 只能保存一条记录,如果想扩充数据只能修改 Version 迭代
- 每个人通话记录数是不同的,Version 数目受限
方案二:
-
时间戳_Random
,20220901133852123_UUID -
缺点:
- 这种设计如果要从 100E 数据中获取到 100 条,要整体遍历数据,效率低
方案三:
-
手机号码_时间
,首先按照手机号码排序,手机号相同的会排在一起,然后按照时间排序,会按照时间先后顺序排序。 -
例如:
- 180-1661776792305
- 180-1661776803162
- 180-16617769XXXXX
- 180-16617770XXXXX
- 180-16617771XXXXX
-
手机号都是 1 开头的,如果没有提前预分区也会产生热点情况,怎么解决?
-
在设计 RowKey 的时候,可以这样设计:
手机号反转timestamp
,在查询用户的所有通话记录的时候,直接指定反转后的手机号,StartKey 是手机号反转0000000000000
,EndKey 是手机号反转当前时间的时间戳
。如果需要查询某个时间段的通话记录,StartKey 是手机号反转起始时间戳
,EndKey 是手机号反转_结束时间戳+1
。
多表设计
RBAC
角色:语文课代表,数学课代表,体育课代表,音乐课代表,学生会主席。
id | name |
---|---|
role1 | 语文课代表 |
role2 | 数学课代表 |
role3 | 体育课代表 |
role4 | 音乐课代表 |
role5 | 学生会主席 |
用户:张三,李四,王五,赵六。
id | name |
---|---|
stu1 | 张三 |
stu2 | 李四< |
stu3 | 王五 |
stu4 | 赵六 |
规则:
- 一个人可以有多个角色;
- 一个角色可以被多个人拥有;
- 一个人可以指定他的主要角色。
要求:
-
通过用户的 ID 可以获取角色;
-
通过角色可以获取用户;
-
可以修改用户对应的角色;
-
可以修改角色对应的用户;
-
用户可以设置主角色。
方案一:MySQL 中间表(中间表数据量过大)。
方案二:HBase 将用户的角色直接放在用户表,将角色的用户直接放在角色表。构建一个列族,列族中维护一个集合列,专门存放数据。当用户新增角色数据的时候直接添加到集合,当用户取消角色的时候,需要从集合中找到对应的记录进行删除。
缺点:每个用户和角色都需要维护一个集合,成本太高。
角色表。
人员表。
**方案三:**HBase 冗余存储。此方案基于方案二。角色表和用户表各自构建一个列族,将和自己关联到一起的数据全部以列名的方式存放。
角色表。
人员表。
京东订单
- RowKey: uname_timestramp 将相同用户的订单全部排列到一起,查询的时候根据时间戳取一小部分。
- Column Family:orderno 订单编号。
- Column Qualifier:orderid、datetime、goodsimg、title、number、money。
- Version:1。
思考:京东订单不管你搜什么都会很快返回,难道 HBase 把所有的模糊查询都设置为了 RowKey?肯定不是,所以最优解就是 HBase + Elasticsearch。
微博好友
好友功能是目前社交软件的必备功能之一,一般好友相关的功能包含有:关注/取关、我(他)的关注、我(他)的粉丝、共同关注、我关注的人也关注他等这样一些功能。
设计思路
类似这样的功能,我们如果采用关系型数据库做的话,只是单纯得到用户的一些粉丝或者关注列表的是很简单,也很容易实现。但是如果我想要查出两个甚至多个用户共同关注了哪些人或者想要查询两个或者多个用户的共同粉丝的话就会很麻烦,效率也不会很高。但是如果你用 Redis 去做的话就会相当的简单而且效率很高。原因是 Redis 自己本身带有专门针对于这种集合的交集、并集、差集的一些操作。
总体思路我们采用 HBase + Redis 的方式结合完成。HBase 主要是保存落地数据,而利用 Redis 的 Sets 进行集合操作。Sets 拥有去重(我们不能多次关注同一用户)功能。一个用户我们存贮两个集合:一个是用户关注的人,另一个是关注用户的人。
Redis 相关命令如下:
- SADD:添加成员;命令格式:SADD key member [member …] —– 关注
- SREM:移除某个成员;命令格式:SREM key member [member …] —– 取关
- SCARD:统计集合内的成员数;命令格式:SCARD key —– 关注/粉丝个数
- SISMEMBER:判断是否是集合成员;命令格式:SISMEMBER key member —– 判断是否关注(如果关注那么只可以点击取关)
- SMEMBERS:查询集合内的成员;命令格式:SMEMBERS key —– 列表使用(关注列表和粉丝列表)
- SINTER:查询集合的交集;命令格式:SINTER key [key …] —– 共同关注(我关注的人关注了他) HBase 设计如下:
常用优化
一个系统上线之后,开发和调优将一直贯穿系统的生命周期中,HBase 也不列外。这里主要说一些 HBase 的常用调优。
表优化
预分区
Pre-Creating Regions。默认情况下,在创建 HBase 表的时候会自动创建一个 HRegion 分区,当导入数据的时候,所有的 HBase 客户端都会向这一个 HRegion 写数据,直到这个 HRegion 足够大了才进行切分,所以建表时一般会提前预分区,这样当数据写入 HBase 时,会按照 HRegion 分区的情况,在集群内做数据的负载均衡。
RowKey
HBase 中 RowKey 用来检索表中的记录,支持以下三种方式:
- 通过单个 RowKey 访问:即按照某个 RowKey 键值进行 Get 操作;
- 通过 RowKey 的 Range 进行 Scan:即通过设置 startRowKey 和 endRowKey,在这个范围内进行扫描;
- 全表扫描:即直接扫描整张表中所有行记录。
RowKey 可以是任意字符串,最大长度 64KB,实际应用中一般为 10~100bytes,存为 byte[] 字节数组,一般设计成定长的。RowKey 是按照字典序存储,因此,设计 RowKey 时,将经常一起读取的数据存储到一块,将经常一起读取的行存放到一起。
RowKey 规则:
-
越小越好
-
Rowkey 的设计是要根据实际业务来
-
定长
-
散列性
-
反转
-
加盐
-
Hash
-
Column Family
不要在一张表里定义太多的 Column Family。目前 HBase 并不能很好的处理超过 2~3 个 Column Family 的表。因为某个 Column Family 在 Flush 的时候,它邻近的 Column Family 也会因关联效应被触发 Flush,最终导致系统产生更多的I/O。
官方原文:HBase currently does not do well with anything above two or three column families so keep the number of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so if one column family is carrying the bulk of the data bringing on flushes, the adjacent families will also be flushed even though the amount of data they carry is small. When many column families exist the flushing and compaction interaction can make for a bunch of needless i/o (To be addressed by changing flushing and compaction to work on a per column family basis).
Version
创建表的时候,可以通过 setMaxVersions(int maxVersions) 设置表中数据的最大版本。如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。
创建表的时候,可以通过 setTimeToLive(int timeToLive) 设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置 setTimeToLive(2 * 24 * 60 * 60)。
Compact & Split
实际应用中,可以考虑必要时手动进行 Major Compact,将同一个 RowKey 的修改进行合并形成一个大的 StoreFile。同时,可以将 StoreFile 设置大些,减少 Split 的发生。
写入优化
多 Table 并发写
创建多个 Table 客户端用于写操作,提高写数据的吞吐量,一个例子:
public void dbTables() throws IOException {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
String table_log_name = "test";
Table[] tableLogs = new Table[2];
Put p = new Put(Bytes.toBytes("row1"));
p.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("12"));
Put p2 = new Put(Bytes.toBytes("row2"));
p2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("12"));
for (int i = 0; i < 2; i++) {
tableLogs[i] = conn.getTable(TableName.valueOf(table_log_name));
}
tableLogs[0].put(p);
tableLogs[1].put(p2);
tableLogs[1].close();
tableLogs[0].close();
}
WAL Flag
在 HBae 中,客户端向集群中的 RegionServer 提交数据时(Put/Delete操作),首先会先写 WAL(Write Ahead Log)日志(即 HLog,一个 HRegionServer 上的所有 HRegion 共享一个 HLog),当 WAL 日志写成功后,再接着写 MemStore,然后客户端被通知提交数据成功;如果写 WAL 日志失败,客户端则被通知提交失败。这样做的好处是可以做到HRegionServer 宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在 Put/Delete 操作时,通过调用 Put.setWriteToWAL(false) 或Delete.setWriteToWAL(false) 函数,放弃写 WAL 日志,从而提高数据写入的性能。
值得注意的是:谨慎选择关闭 WAL 日志,因为这样的话,一旦 HRegionServer 宕机,Put/Delete 的数据将会无法根据WAL 日志进行恢复。
批量写
通过调用 Table.put(Put) 方法可以将一个指定的 RowKey 记录写入 HBase,同样 HBase 提供了另一个方法:通过调用Table.put(List) 方法可以将指定的 RowKey 列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络 I/O 开销,这对于对数据实时性要求高,网络传输 RTT(Round Trip Time) 高的情景下可能带来明显的性能提升。
HTable参数设置
- Auto Flush:通过调用 HTable.setAutoFlush(false) 方法可以将 HTable 写客户端的自动 Flush 关闭,这样可以批量写入数据到 HBase,而不是有一条 put 就执行一次更新,只有当 put 填满客户端写缓存时,才实际向 HBase 服务端发起写请求。默认情况下 Auto Flush 是开启的。
- Write Buffer:通过调用 HTable.setWriteBufferSize(writeBufferSize) 方法可以设置 HTable 客户端的写 buffer 大小,如果新设置的 buffer 小于当前写 buffer 中的数据时,buffer 将会被 Flush 到服务端。其中,writeBufferSize 的单位是 byte 字节数,可以根据实际写入数据量的多少来设置该值。
- 多线程并发写:在客户端开启多个 HTable 写线程,每个写线程负责一个 HTable 对象的 Flush 操作,这样结合定时Flush 和写 buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被 Flush(如 1 秒内),同时又保证在数据量大的时候,写 buffer 一满就及时进行 Flush。下面给个具体的例子:
for (int i = 0; i < threadN; i++) {
Thread th = new Thread() {
public void run() {
while (true) {
try {
sleep(1000); // 1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (tableLogs[i]) {
try {
tableLogs[i].flushCommits();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
th.setDaemon(true);
th.start();
}
读取优化
作为 NoSQL 数据库,增删改查是其最基本的功能,其中查询是最常用的一项,接下来聊聊查询相关的优化。
显示的指定列
当使用 Scan 或者 GET 获取大量的行时,最好指定所需要的列,因为服务端通过网络传输到客户端,数据量太大可能是瓶颈。如果能有效过滤部分数据,能很大程度的减少网络 I/O 的花费。
13.3.2. 关闭 ResultScanner
如果在使用 table.getScanner 之后,忘记关闭该类,它会一直和服务端保持连接,资源无法释放,从而导致服务端的某些资源不可用。所以在用完之后,需要执行关闭操作,这点与 JDBS 操作 MySQL 类似。
查询结果
对于频繁查询 HBase 的应用场景,可以考虑在应用程序和 HBase 之间做一层缓存系统,新的查询先去缓存查,缓存没有再去查 HBase,例如 Redis。除此之外 HBase 的查询结果优化还有以下方案:
-
多 Table 并发读、批量读;
-
Scanner Cache:hbase.client.scanner.caching 配置项可以设置 HBase Scanner 一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少 Scan 过程中 next() 的时间开销,代价是 Scanner 需要通过客户端的内存来维持这些被 Cache 的行记录;
- 在 HBase 的 conf 配置文件中进行配置 —–> 整个集群生效
- 在 Table.setScannerCaching(int scannerCaching) 中进行配置 —–> 本次表连接生效
- Scan.setCaching(int caching) —–> 本次查询生效
- 优先级:本次查询 > 本次连接 > 整个集群
-
Scan 指定列族或者列:Scan 时指定需要的 Column Family 可以减少网络传输数据量
-
Close ResultScanner:通过 Scan 取完数据后,记得要关闭 ResultScanner,否则 HRegionServer 可能会出现问题(对应的 Server 资源无法释放)。
缓存优化
设置 Scan 缓存
HBase 中 Scan 查询可以设置缓存,这样可以有效的减少服务端与客户端的交互,更有效的提升扫描查询的性能。
hbase.client.scanner.caching 配置项可以设置 HBase Scanner 一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少 Scan 过程中 next() 的时间开销,代价是 Scanner 需要通过客户端的内存来维持这些被 Cache 的行记录;
-
在 HBase 的 conf 配置文件中进行配置 —>整个集群生效在
-
Table.setScannerCaching(int scannerCaching) 中进行配置 —–> 本次表连接生效
-
Scan.setCaching(int caching) —-> 本次查询生效
-
优先级:本次查询 > 本次连接 > 整个集群
开启块缓存
如果批量进行全表扫描,默认是有缓存的,如果此时没有缓存,会降低扫描的效率。scan.setCacheBlocks(true|false);对于经常读到的数据,建议使用默认值,开启块缓存。
缓存查询结果
对于频繁查询 HBase 的应用场景,可以考虑在应用程序和 HBase 之间做一层缓存系统,新的查询先去缓存查,缓存没有再去查 HBase。例如 Redis。
框架整合
HBase 和 MapReduce
添加依赖
在之前的 hbase-demo
项目中添加hbase-mapreduce
依赖。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
完整 pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yjxxt</groupId>
<artifactId>hbase-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- Hadoop 版本控制 -->
<hadoop.version>3.3.4</hadoop.version>
<!-- HBase 版本控制 -->
<hbase.version>2.5.3</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- JUnit 单元测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
创建数据库
将 harry potter.txt
文件上传至 HDFS 的 /yjx/harry potter.txt
。
在 HBase 中创建 t_wordcount
表并添加 word
列族。
hbase> create 't_wordcount', 'word'
Hdfs2HBase
编写 Job、Mapper、Reducer。
Hdfs2HBaseWordCountJob.java
package com.yjxxt.hbase.mapred.hdfs2hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class Hdfs2HBaseWordCountJob {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// 加载配置文件
Configuration configuration = HBaseConfiguration.create();
// 本地模式运行
configuration.set("mapreduce.framework.name", "local");
// 创建作业并设置作业名称
Job job = Job.getInstance(configuration, "yjx-hdfs2hbase-" + System.currentTimeMillis());
// 设置作业主类
job.setJarByClass(Hdfs2HBaseWordCountJob.class);
// 设置 Reduce 的数量
job.setNumReduceTasks(2);
// 设置数据的输入路径(需要计算的数据从哪里读)
FileInputFormat.setInputPaths(job, new Path("/yjx/harry potter.txt"));
// 设置 Map 的输出的 Key 和 Value 的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置 Map 的处理类
job.setMapperClass(Hdfs2HBaseWordCountMapper.class);
// 设置 Reduce 写出到 HBase 的数据库
TableMapReduceUtil.initTableReducerJob("t_wordcount",
Hdfs2HBaseWordCountReducer.class, job, null, null, null, null, false);
job.waitForCompletion(true);
}
}
Hdfs2HBaseWordCountMapper.java
package com.yjxxt.hbase.mapred.hdfs2hbase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Hdfs2HBaseWordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
/**
* @param key 前行的偏移量
* @param value 当前行的数据
* @param context 可以理解为环形数据缓冲区
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 替换特殊字符
String line = value.toString().replaceAll("[^\\w'-]+", " ");
// 切分字符串
String[] words = line.split("\\s+");
// 写出数据
for (String word : words) {
context.write(new Text(word), one);
}
}
}
Hdfs2HBaseWordCountReducer.java
package com.yjxxt.hbase.mapred.hdfs2hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class Hdfs2HBaseWordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 声明计数器
int count = 0;
// 循环处理
for (IntWritable value : values) {
count += value.get();
}
// 创建插入数据对象 Put
// 初始化 RowKey
Put put = new Put(Bytes.toBytes("harry_potter_" + key.toString()));
// 列族和列
put.addColumn(Bytes.toBytes("word"), Bytes.toBytes(key.toString()), Bytes.toBytes(String.valueOf(count)));
// 写出数据
context.write(null, put);
}
}
HBase2Hdfs
Mock 数据
使用以下程序生成一批随机的手机通话记录。
package com.yjxxt.phone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@DisplayName("手机通话记录生成测试类")
public class PhoneRecordTest {
/**
* HBase 的管理对象
*/
private Admin admin;
/**
* 数据库连接
*/
private Connection connection;
/**
* 表对象
*/
private Table table;
/**
* 表名
*/
private final static String TABLE_NAME = "t_phone";
/**
* 列族
*/
private final static String COLUMN_FAMILY_NAME = "info";
@DisplayName("创建数据库连接并初始化管理类和数据库表")
@BeforeEach
public void init() throws IOException {
// 初始化 HBase 配置类
Configuration configuration = HBaseConfiguration.create();
// 创建数据库连接
connection = ConnectionFactory.createConnection(configuration);
// 初始化 HBase 管理类
admin = connection.getAdmin();
// 获取数据库表
table = connection.getTable(TableName.valueOf(TABLE_NAME));
}
@DisplayName("释放 HBase 资源")
@AfterEach
public void destory() throws IOException {
if (table != null) {
table.close();
}
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
@DisplayName("创建含有预分区的表")
@Test
public void createTable() throws IOException {
// 获取表
TableDescriptorBuilder table = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME));
// 创建列族
ColumnFamilyDescriptorBuilder columnBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY_NAME));
ColumnFamilyDescriptor familyDescriptor = columnBuilder.build();
// 将列族添加到表对象
table.setColumnFamily(familyDescriptor);
// 获取预分区
byte[][] phoneRange = createPhoneSplits();
// 创建表对象
admin.createTable(table.build(), phoneRange);
}
@DisplayName("批量插入手机通话记录")
@Test
public void putPhoneRecords() throws IOException {
// 创建一个容器
List<Put> puts = new ArrayList<>();
// 批量插入数据
for (int i = 0; i < 100; i++) {
// 随机生成一个手机号码
String phone = getRandomPhone();
// 开始随机生成通话记录
for (int j = 0; j < 100; j++) {
// 随机生成数据
String otherphone = getRandomPhone();
Date date = getRandomDate();
// 创建插入对象
Put put = new Put(Bytes.toBytes(phone + "_" + date.getTime()));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("otherphone"), Bytes.toBytes(otherphone));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("time"), Bytes.toBytes(getRandomNum(1, 3600 * 3)));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("date"), Bytes.toBytes(date.getTime()));
put.addColumn(Bytes.toBytes(COLUMN_FAMILY_NAME), Bytes.toBytes("type"), Bytes.toBytes(String.valueOf(getRandomNum(0, 1))));
// 添加至容器
puts.add(put);
}
}
// 将数据插入到数据库
table.put(puts);
}
@DisplayName("扫描数据")
@Test
public void scanPhoneRecords() throws Exception {
// 创建扫描对象
Scan scan = new Scan();
// 本次查询开始的 RowKey 和结束的 RowKey
String startRow = "138";
String endRow = "139";
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(endRow));
// 创建结果扫描器
ResultScanner resultScanner = table.getScanner(scan);
// 获取所有的结果
for (Result result : resultScanner) {
// 获取 Cell
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
if (qualifier.equals("date")) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(Bytes.toLong(CellUtil.cloneValue(cell)))));
} else if (qualifier.equals("time")) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toInt(CellUtil.cloneValue(cell)));
} else {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ":" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
resultScanner.close();
}
/**
* 创建手机号码通讯记录表的预分区
*
* @return
*/
private byte[][] createPhoneSplits() {
String[] telArrays = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
// 对数组进行排序
Arrays.sort(telArrays);
// 创建分区
byte[][] phoneSplits = new byte[telArrays.length][];
for (int i = 0; i < telArrays.length; i++) {
phoneSplits[i] = Bytes.toBytes(telArrays[i]);
}
return phoneSplits;
}
/**
* 随机生成手机号码
*
* @return
*/
private static String getRandomPhone() {
String[] telArrays = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
int index = getRandomNum(0, telArrays.length - 1);
String first = telArrays[index];
String second = String.valueOf(getRandomNum(1, 888) + 10000).substring(1);
String third = String.valueOf(getRandomNum(1, 9100) + 10000).substring(1);
return first + second + third;
}
/**
* 生成随机数字
*
* @param start
* @param end
* @return
*/
public static int getRandomNum(int start, int end) {
return (int) (Math.random() * (end - start + 1) + start);
}
/**
* 随机获取日期
*
* @return
*/
private static Date getRandomDate() {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
Date start = format.parse("2021-01-01");
Date end = format.parse("2022-01-01");
if (start.getTime() >= end.getTime()) {
return null;
}
long date = random(start.getTime(), end.getTime());
return new Date(date);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 随机生成数据
*
* @param begin
* @param end
* @return
*/
private static long random(long begin, long end) {
long rtn = begin + (long) (Math.random() * (end - begin));
if (rtn == begin || rtn == end) {
return random(begin, end);
}
return rtn;
}
}
编码
编写 Job、Mapper、Reducer。
HBase2HdfsPhoneJob.java
package com.yjxxt.hbase.mapred.hbase2hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class HBase2HdfsPhoneJob {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
// 加载配置文件
Configuration configuration = HBaseConfiguration.create();
// 本地模式运行
configuration.set("mapreduce.framework.name", "local");
// 创建作业并设置作业名称
Job job = Job.getInstance(configuration, "yjx-hbase2hdfs-" + System.currentTimeMillis());
// 设置当前任务的主类
job.setJarByClass(HBase2HdfsPhoneJob.class);
// 设置 Reduce 的数量
job.setNumReduceTasks(2);
// 设置数据的输出路径(计算后的数据输出到哪里)
FileOutputFormat.setOutputPath(job, new Path("/yjx/result/" + job.getJobName()));
// 设置扫描器
Scan scan = new Scan();
scan.withStartRow("138".getBytes());
scan.withStopRow("139".getBytes());
// 设置 Map 从 HBase 读取数据至 HDFS
TableMapReduceUtil.initTableMapperJob("t_phone", scan,
HBase2HdfsPhoneMapper.class, Text.class, IntWritable.class, job, false);
// 设置 Reduce 的处理类
job.setReducerClass(HBase2HdfsPhoneReducer.class);
// 将作业提交到集群并等待完成
job.waitForCompletion(true);
}
}
HBase2HdfsPhoneMapper.java
package com.yjxxt.hbase.mapred.hbase2hdfs;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class HBase2HdfsPhoneMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 获取 RowKey 并分割处理得到手机号
String rowkey = new String(value.getRow());
String phoneNum = rowkey.split("_")[0];
context.write(new Text(phoneNum), new IntWritable(1));
}
}
HBase2HdfsPhoneReducer.java
package com.yjxxt.hbase.mapred.hbase2hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class HBase2HdfsPhoneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
HBase 和 Hive
Hive 提供了与 HBase 的集成,使得能够在 HBase 表上使用 HQL 语句进行查询、插入以及 JOIN、UNION 等复杂查询,同时也可以将 Hive 表中的数据映射到 HBase 中。
拷贝 Jar
三台机器分别执行以下命令,将 hive-hbase-handler-3.1.2.jar
拷贝至 HBase。
cp /opt/yjx/apache-hive-3.1.2-bin/lib/hive-hbase-handler-3.1.2.jar /opt/yjx/hbase-2.5.3/lib/
检查 Jar 包是否已经拷贝。
ls /opt/yjx/hbase-2.5.3/lib/hive-hbase-handler-3.1.2.jar
修改配置文件
修改 Hive(MetaStore 机器) 的配置文件vim /opt/yjx/apache-hive-3.1.2-bin/conf/hive-site.xml
,配置HBase 的 ZooKeeper 集群。
<property>
<name>hbase.zookeeper.quorum</name>
<value>node01:2181,node02:2181,node03:2181</value>
</property>
测试
HBase 创建 hbase_user 表并添加 info 列族。
# 创建 test 命令空间
create_namespace 'test'
# 创建 hbase_user 表并添加 info 列族
create 'test:hbase_user', 'info'
# 插入数据
put 'test:hbase_user', '1', 'info:name', 'zhangsan'
put 'test:hbase_user', '1', 'info:age', '18'
put 'test:hbase_user', '1', 'info:gender', 'male'
put 'test:hbase_user', '2', 'info:name', 'lisi'
put 'test:hbase_user', '2', 'info:age', '19'
put 'test:hbase_user', '2', 'info:gender', 'female'
put 'test:hbase_user', '3', 'info:name', 'wangwu'
put 'test:hbase_user', '3', 'info:age', '20'
put 'test:hbase_user', '3', 'info:gender', 'male'
Hive 创建 hive_user 外部表并使用 HBase 序列化器。
-- 创建 test 数据库
CREATE DATABASE IF NOT EXISTS test;
-- 创建 hive_user 外部表
CREATE EXTERNAL TABLE IF NOT EXISTS test.hive_user (
id string,
name string,
age string,
gender string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES('hbase.columns.mapping'=':key,info:name,info:age,info:gender')
TBLPROPERTIES('hbase.table.name'='test:hbase_user');
-- 通过查询测试是否整合成功
SELECT * FROM test.hive_user;
HBase 缺点
对于 HBase 而言,它将“节点探活”这一重要的任务交给 ZooKeeper 来做,是可以商榷的。因为如果运维不够细致的话,会使得 ZooKeeper 成为影响 HBase 稳定性的一个坑。 在 HBase 中,HRegionServer 对“ZooKeeper 会话超时”的处理方式是“自杀”。而 HRegionServer 上“多个 HRegion 合写一个 WAL 到 HDFS”的实现方式会使得“自杀”这一行为的成本较高,因为自杀之后 HRegionServer 重启时会拆分和重放 WAL。这就意味着假如整个 HBase 集群挂了,想要将 HBase 重新给拉起来,时间会比较长。 即使我们能保证 ZooKeeper 的稳定性,“节点探活”这一功能也不能非常稳定的运行。因为 HBase 是用 Java 实现的。GC 的存在,会使得 ZooKeeper 把正常运行的 HRegionServer 误判为死亡,进而又会引发 HRegionServer 的自杀;在其之上的 HRegion,需要其他的 HRegionServer 从 HDFS 上加载重放 WAL 才能提供服务。而这一过程,同样也是比较耗时的。在此期间内,HRegion 所服务的 Key 都是不可读写的。 对于这一问题,可以通过将“节点探活”的时间阈值拉长来解决,但这会使得真正的“HRegionServer 死亡”不能被及时发现,从而引发另一个方面的可用性问题。