用于本地测试,语言scala,spark版本2.2.0,mysql的表名为test1.a,只有一个字段age。
利用typesafe库,读取配置文件,pom文件中添加如下:
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.0</version>
</dependency>
resource目录下application.conf中添加如下配置内容:
#测试
test_mysql{
user="xxx"
pwd="xxx"
test_table_name="test1.a"
url="jdbc:mysql://xxx.xxx.xxx.xxx:3306"
sql="(select age from test1.a where age<30) t"
}
代码:
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkToMysql {
def main(args: Array[String]):Unit = {
val spark = SparkSession.builder().appName("SparkToMysql").master("local[2]").getOrCreate()
val config = ConfigFactory.load()
val props =Map(
"url" -> config.getString("test_mysql.url"),
"user" -> config.getString("test_mysql.user"),
"password" -> config.getString("test_mysql.pwd"),
"dbtable" -> config.getString("test_mysql.test_table_name")
)
//spark查询mysql,整表查询
val mysqlRDD = spark.read.format("jdbc").options(props).load()
mysqlRDD.show()
//spark查询mysql,条件查询(1)
val mysqlRDD2 = spark.read.format("jdbc").options(props).load().where("age < 30")
mysqlRDD2.show()
//spark查询mysql,条件查询(2)
val prop2=new Properties()
prop2.put("user",config.getString("test_mysql.user"))
prop2.put("password",config.getString("test_mysql.pwd"))
val mysqlRDD3=spark.read.jdbc(config.getString("test_mysql.url"),config.getString("test_mysql.sql"),prop2)
mysqlRDD3.show()
//spark查询mysql,条件查询(3)
val props3 =Map(
"url" -> config.getString("test_mysql.url"),
"user" -> config.getString("test_mysql.user"),
"password" -> config.getString("test_mysql.pwd"),
"dbtable" -> config.getString("test_mysql.sql")
)
val mysqlRDD4 = spark.read.format("jdbc").options(props3).load()
mysqlRDD4.show()
//spark写入mysql
// SaveMode.Overwrite: if data/table already exists,
//existing data is expected to be overwritten by the contents of the DataFrame.
//慎用 Overwrite,会把表结构也改掉
mysqlRDD.write.mode(SaveMode.Append).format("jdbc").options(props).save()
spark.stop()
}
}
另附一些注意事项:
数据存入Mysql注意事项
A. 尽量先设置好存储模式
默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。
B. 设置存储模式的步骤为:
org.apache.spark.sql.SaveMode
......
df.write.mode(SaveMode.Append)
C. 若提前在数据库中手动创建表,需要注意列名称和数据类型,
下面的源码说明了,需要保证Spark SQL中schema中的field name与Mysql中的列名称一致!
若提前手动创建Mysql表,需要注意Spark SQL 中Schema中的数据类型与Mysql中的数据类型的对应关系
特别注意: Scala中的String类型,在MySQL中对应的是Text类型或者varchar(n)类型
网友评论