Scala连接MySQL和Hive

连接MySQL

参考链接:https://www.cnblogs.com/Jaryer/p/13671449.html

maven添加依赖

1
2
3
4
5
<dependency>
  <groupId>com.mysql</groupId>
  <artifactId>mysql-connector-j</artifactId>
  <version>8.0.33</version>
</dependency>

连接数据库

1
2
3
4
5
val host = "localhost"
val port = 3306
val database = "sparktest"
val jdbcUrl = s"jdbc:mysql://$host:$port/$database?useUnicode=true&characterEncoding=utf-8"
val mysqlConn: Connection = DriverManager.getConnection(jdbcUrl, "root", "123456")

执行查询

SQL语句在执行时有三种:executeQuery,executeUpdate,execute。具体细节可查看此节开头的参考资料。

1
2
3
4
5
6
7
8
9
    val statement: Statement = mysqlConn.createStatement()
    //插入数据
    statement.executeUpdate("insert into employee values (3,'Mary','F',26)")
    statement.executeUpdate("insert into employee values (4,'Tom','M',23)")

    val result: ResultSet = statement.executeQuery("select max(age) as max_age,avg(age) as avg_age from employee")
    while (result.next()) {
      println(result.getString("max_age"),result.getString("avg_age"))
    }

完整代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package sparkjob5

import java.sql.{Connection, DriverManager, ResultSet, Statement}

object task3 {
  def main(args: Array[String]): Unit = {
    //连接mysql
    val host = "localhost"
    val port = 3306
    val database = "sparktest"
    val jdbcUrl = s"jdbc:mysql://$host:$port/$database?useUnicode=true&characterEncoding=utf-8"
    val mysqlConn: Connection = DriverManager.getConnection(jdbcUrl, "root", "123456")

    val statement: Statement = mysqlConn.createStatement()
    //插入数据
    statement.executeUpdate("insert into employee values (3,'Mary','F',26)")
    statement.executeUpdate("insert into employee values (4,'Tom','M',23)")

    val result: ResultSet = statement.executeQuery("select max(age) as max_age,avg(age) as avg_age from employee")
    while (result.next()) {
      println(result.getString("max_age"),result.getString("avg_age"))
    }

    result.close()
    statement.close()
  }
}

连接Hive

参考链接:https://www.jianshu.com/p/27a798013990

连接Hive前需要开启Hive的metastore和hiverserver2。开启命令如下。

  1. 开启Hadoop集群
1
start-all.sh
  1. 开启Hive,第二三行的启动命令需要分别开一个终端启动,输出的日志在/usr/local/hive/logs
1
2
3
cd /usr/local/hive
hive --service metastore >logs/metastore.log 2>&1
hive --service hiveserver2 >logs/hiveServer2.log 2>&1

添加依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>3.3.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.3.4</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>3.1.3</version>
</dependency>

完整依赖如下(包含了Scala连接MySQL的依赖)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Spark</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>sparkCore</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.3</version>
        </dependency>

    </dependencies>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <build>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

修改配置文件hive-site.xml

在resource下新建一个hive-site.xml,填入下列内容。注意:要把hadoop1修改成自己的Hadoop集群主节点名字或者ip。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 添加文件调用 -->
    <property>
        <name>hive.exec.scratchdir</name>
        <value>hdfs://hadoop1:8020/user/hive/tmp</value>
    </property>
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>hdfs://hadoop1:8020/user/hive/warehouse</value>
    </property>
    <property>
        <name>hive.querylog.location</name>
        <value>hdfs://hadoop1:8020/user/hive/log</value>
    </property>
    <!-- 指定存储元数据要连接的地址 -->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop1:9083</value>
    </property>

    <!-- jdbc连接的URL -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop1:3306/metastore?useUnicode=true&amp;characterEncodeing=UTF-8&amp;allowPublicKeyRetrieval=true&amp;useSSL=false&amp;serverTimezone=GMT</value>
    </property>

    <!-- jdbc连接的Driver-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <!-- jdbc连接的username-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>

    <!-- jdbc连接的password -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>


    <!-- 指定hiveserver2连接的host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop1</value>
    </property>

    <!-- 指定hiveserver2连接的端口号 -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>

    <!-- hiveserver2的高可用参数,开启此参数可以提高hiveserver2的启动速度 -->
    <property>
        <name>hive.server2.active.passive.ha.enable</name>
        <value>true</value>
    </property>
</configuration>

Scala代码

spark.sql()里写上正常的SQL语句即可完成查询。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package sparkjob5


import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object task4 {
  val driverName = "org.apache.hive.jdbc.HiveDriver"
  try {
    Class.forName(driverName)
  } catch {
    case e: ClassNotFoundException =>
    println("Missing Class", e)
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("hive")
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    spark.sql("use spark_test")
    spark.sql("show tables").show()
    spark.close()
  }
}

补充:将查询结果保存到hdfs上,如果想保存到本地,则可以将save的路径改成本地路径。

1
2
3
4
5
6
7
    val dataFrame = spark.sql("select uid,keyword from sougou_records where keyword like '%仙剑奇侠传%'")

    dataFrame.write
      .format("csv")
      .option("header", "false")
      .option("sep", "\t")
      .save("hdfs://hadoop1:8020/xianJianTest")

如果想以表格保存到MySQL或者Hive,可以使用saveAsTable()

1
2
3
4
5
6
7
8
9
val host = "localhost"
val port = 3306
val database = "sparktest"
val jdbcUrl = s"jdbc:mysql://$host:$port/$database?useUnicode=true&characterEncoding=utf-8"
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "123456")

df.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl,"company",connectionProperties)
1
df.write.mode(SaveMode.Overwrite).saveAsTable("spark_test.company")
网站总访客数:Loading
网站总访问量:Loading
使用 Hugo 构建
主题 StackJimmy 设计