美文网首页
spark操作mysql的demo

spark操作mysql的demo

作者: DuLaGong | 来源:发表于2019-04-22 21:02 被阅读0次

用于本地测试,语言scala,spark版本2.2.0,mysql的表名为test1.a,只有一个字段age。

利用typesafe库,读取配置文件,pom文件中添加如下:

<dependency>

    <groupId>com.typesafe</groupId>

    <artifactId>config</artifactId>

    <version>1.3.0</version>

</dependency>

resource目录下application.conf中添加如下配置内容:

#测试

test_mysql{

user="xxx"

  pwd="xxx"

  test_table_name="test1.a"

  url="jdbc:mysql://xxx.xxx.xxx.xxx:3306"

  sql="(select age from test1.a where age<30) t"

}

代码:

import java.util.Properties

import com.typesafe.config.ConfigFactory

import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkToMysql {

def main(args: Array[String]):Unit = {

val spark = SparkSession.builder().appName("SparkToMysql").master("local[2]").getOrCreate()

val config = ConfigFactory.load()

val props =Map(

"url" -> config.getString("test_mysql.url"),

      "user" -> config.getString("test_mysql.user"),

      "password" -> config.getString("test_mysql.pwd"),

      "dbtable" -> config.getString("test_mysql.test_table_name")

)

//spark查询mysql,整表查询

    val mysqlRDD = spark.read.format("jdbc").options(props).load()

mysqlRDD.show()

//spark查询mysql,条件查询(1)

    val mysqlRDD2 = spark.read.format("jdbc").options(props).load().where("age < 30")

mysqlRDD2.show()

//spark查询mysql,条件查询(2)

    val prop2=new Properties()

prop2.put("user",config.getString("test_mysql.user"))

prop2.put("password",config.getString("test_mysql.pwd"))

val mysqlRDD3=spark.read.jdbc(config.getString("test_mysql.url"),config.getString("test_mysql.sql"),prop2)

mysqlRDD3.show()

//spark查询mysql,条件查询(3)

    val props3 =Map(

"url" -> config.getString("test_mysql.url"),

      "user" -> config.getString("test_mysql.user"),

      "password" -> config.getString("test_mysql.pwd"),

      "dbtable" -> config.getString("test_mysql.sql")

)

val mysqlRDD4 = spark.read.format("jdbc").options(props3).load()

mysqlRDD4.show()

//spark写入mysql

// SaveMode.Overwrite: if data/table already exists, 

//existing data is expected to be overwritten by the contents of the DataFrame.

//慎用 Overwrite,会把表结构也改掉

    mysqlRDD.write.mode(SaveMode.Append).format("jdbc").options(props).save()

spark.stop()

}

}

另附一些注意事项:

数据存入Mysql注意事项

A. 尽量先设置好存储模式

默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:

SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;

SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;

SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

B. 设置存储模式的步骤为:

org.apache.spark.sql.SaveMode

......

df.write.mode(SaveMode.Append)

C. 若提前在数据库中手动创建表,需要注意列名称和数据类型,

下面的源码说明了,需要保证Spark SQL中schema中的field name与Mysql中的列名称一致!

若提前手动创建Mysql表,需要注意Spark SQL 中Schema中的数据类型与Mysql中的数据类型的对应关系

特别注意: Scala中的String类型,在MySQL中对应的是Text类型或者varchar(n)类型

相关文章

网友评论

      本文标题:spark操作mysql的demo

      本文链接:https://www.haomeiwen.com/subject/yolugqtx.html