13-SparkSQL整合Hive
星期一, 4月 7, 2025 | 4分钟阅读

关于Linux系统大数据环境搭建——13-SparkSQL整合Hive。
13-SparkSQL整合Hive
SparkSQL版本:SparkSQL3.3.2
内部 Hive
SparkSQL 为了方便大家学习,内置了一套 Hive 环境。在内置 Hive 环境中,Hive 的元数据存储在 Derby 数据库中,默认仓库地址在 $SPARK_HOME/spark-warehouse
,如果是 Windows 环境,就在项目的根路径下的 spark-warehouse 中。
接下来我们将刚开始学习 Hive 时的创建表案例一使用内置 Hive 的方式构建一次。 user.txt
1,admin,123456,男,18
2,zhangsan,abc123,男,23
3,lisi,654321,女,16
InnerHiveDemo.scala
package com.yjxxt.hive
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object InnerHiveDemo {
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("InnerHiveDemo")
// 根据配置对象初始化 SparkSession 对象
// enableHiveSupport() 开启 Hive 支持
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// ==================== 业务处理 ====================
lazy val createSql =
"""
|CREATE TABLE IF NOT EXISTS t_user (
|id int,
|username string,
|password string,
|gender string,
|age int
|)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|LINES TERMINATED BY '\n';
|""".stripMargin
// 创建表并载入数据
spark.sql(createSql)
// 注意这里使用的是 LOCAL 载入本地数据,不加 LOCAL 则从 HDFS 载入数据
spark.sql("LOAD DATA LOCAL INPATH 'data/user/user.txt' INTO TABLE t_user")
spark.sql("SELECT * FROM t_user").show()
// ==================== 关闭连接 ====================
spark.stop
}
}
运行后效果如下。
外部 Hive
内置的 Hive 毕竟是学习使用的,真实工作中都是连接外部的 Hive,我们一起来学习一下。连接外部 Hive 我们分两种情况去讲,一种是直接在服务端连接操作,一种是在 Windows 端连接操作。
Linux
如果想连接外部已经部署好的 Hive,需要通过以下几个步骤:
- 将
Hive
的hive-site.xml
拷贝到Spark
的conf
目录; - 将
MySQL
的 驱动包 拷贝到Spark
的jars
目录; - 将
core-site.xml
和hdfs-site.xml
拷贝到Spark
的conf
目录(为了连接 HDFS); - 重启
spark-shell
。
将 Hive
的 hive-site.xml
拷贝到Spark
的 conf
目录。
[root@node01 ~]# cp /opt/yjx/apache-hive-3.1.2-bin/conf/hive-site.xml /opt/yjx/spark-3.3.2/conf/
将 MySQL
的 驱动包 拷贝到 Spark
的 jars
目录。
[root@node01 ~]# cp /opt/yjx/apache-hive-3.1.2-bin/lib/mysql-connector-java-8.0.18.jar /opt/yjx/spark-
3.3.2/jars/
将 core-site.xml
和 hdfs-site.xml
拷贝到 Spark
的 conf
目录(为了连接 HDFS)。
[root@node01 ~]# cp /opt/yjx/hadoop-3.3.4/etc/hadoop/core-site.xml /opt/yjx/spark-3.3.2/conf/
[root@node01 ~]# cp /opt/yjx/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /opt/yjx/spark-3.3.2/conf/
最后重启 spark-shell
spark-shell
使用 bin/spark-shell
直接访问 Hive。
scala> spark.sql("SHOW DATABASES").show
+------------+
|databaseName|
+------------+
| default|
| scott|
+------------+
scala> spark.sql("USE scott")
scala> spark.sql("SHOW TABLES").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| scott| dept| false|
| scott| emp| false|
| scott| salgrade| false|
+--------+---------+-----------+
scala> spark.sql("SELECT * FROM emp").show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800|null| 20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22|1250| 500| 30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975|null| 20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01|2850|null| 30|
| 7782| CLARK| MANAGER|7839|1981-06-09|2450|null| 10|
| 7788| SCOTT| ANALYST|7566|1987-07-13|3000|null| 20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000|null| 10|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500| 0| 30|
| 7876| ADAMS| CLERK|7788|1987-07-13|1100|null| 20|
| 7900| JAMES| CLERK|7698|1981-12-03| 950|null| 30|
| 7902| FORD| ANALYST|7566|1981-12-03|3000|null| 20|
| 7934|MILLER| CLERK|7782|1982-01-23|1300|null| 10|
+-----+------+---------+----+----------+----+----+-----+
这种访问方式是最麻烦的,每次写 SQL 都要写 spark.sql()
结构。
spark-sql
使用 bin/spark-sql
直接访问 Hive。
spark-sql (default)> SHOW DATABASES;
databaseName
default
scott
Time taken: 2.516 seconds, Fetched 2 row(s)
spark-sql (default)> USE scott;
Response code
Time taken: 0.052 seconds
spark-sql (default)> SHOW TABLES;
namespace tableName isTemporary
dept
emp
salgrade
Time taken: 0.238 seconds, Fetched 3 row(s)
这种方式就和之前 Hive 自己的 hive-shell 感觉是一样的,也不太好用。
beeline
使用 bin/beeline
访问 Hive。
首先启动 Hive 的 HiveServer2 服务,然后使用 beeline 访问 Hive。
[root@node01 spark-3.3.2]# bin/beeline -u jdbc:hive2://node01:10000 -n root
这种方式是最爽的,也是最安全的,因为我们之前学习 Hive 时也是用的 beeline 来访问的 Hive。
0: jdbc:hive2://node01:10000> SHOW DATABASES;
+------------+
| namespace |
+------------+
| default |
| scott |
+------------+
2 rows selected (0.079 seconds)
0: jdbc:hive2://node01:10000> USE scott;
0: jdbc:hive2://node01:10000> SHOW TABLES;
+------------+------------+--------------+
| namespace | tableName | isTemporary |
+------------+------------+--------------+
| scott | dept | false |
| scott | emp | false |
| scott | salgrade | false |
+------------+------------+--------------+
3 rows selected (0.119 seconds)
但是这种方式如果要执行复杂 SQL,底层执行的还是 MR 作业并非 Spark 作业。
Spark ThriftServer
更高级的用法是使用 Spark 的 ThriftServer 服务,这又是什么呢?我们先来区分一下它们之间的关系:
spark-sql
是一个Session
级别应用任务,而非是一个服务;HiveServer2
是一个 Java 进程,在服务器本地运行,通过 JDBC 的方式接受客户端的调度,通过语法解析,生成执行计划,最终发送给 YARN 进行调度执行,自己并不负责具体执行。Spark ThriftServer
进程本质上是 Spark 的一个 Application(是在 YARN 上的一个 Application 应用,前提是使用YARN 做资源管理)。通过 JDBC 的方式接受客户端的调度,通过语法解析,生成执行计划,最后在当前 Application 内去执行,而不是单独去启动一个 YARN 任务执行。这就要求当前的 Application 资源足够丰富且弹性。
启动 Spark 的 ThriftServer 服务。
[root@node01 ~]# cd /opt/yjx/spark-3.3.2/
[root@node01 spark-3.3.2]# sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 512M \
--num-executors 1 --executor-cores 1 --executor-memory 1G
提示:
--deploy-mode
只支持client
,使用 cluster 会报以下错误。 org.apache.spark.SparkException: Cluster deploy mode is not applicable to Spark Thrift server.
YARN 视角。
windows
首先 Windwos 系统配置好 Hadoop 环境变量,然后将 HDFS 的 core-site.xml
、 hdfs-site.xml
配置文件以及Hive 的 hive-site.xml
配置文件拷贝至 Maven 项目的 resources 目录,最后在项目的 pom 文件中添加以下依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
具体使用时需要在代码中开启 Hive 支持 enableHiveSupport
。
package com.yjxxt.hive
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object OuterHiveDemo {
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("OuterHiveDemo")
// 根据配置对象初始化 SparkSession 对象
// enableHiveSupport() 开启 Hive 支持
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// ==================== 业务处理 ====================
spark.sql("USE scott")
spark.sql("SELECT deptno, AVG(sal) AS avg_sal FROM emp GROUP BY deptno").show()
// ==================== 关闭连接 ====================
spark.stop
}
}
可以通过 conf.set("spark.sql.warehouse.dir", path)
修改 Hive 仓库的地址。