欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,488,256
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5844
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark RDD写入RMDB(Mysql)方法二

  在本博客的《Spark将计算结果写入到Mysql中》文章介绍了如果将Spark计算后的RDD最终 写入到Mysql等关系型数据库中,但是这些写操作都是自己实现的,弄起来有点麻烦。不过值得高兴的是,前几天发布的Spark 1.3.0已经内置了读写关系型数据库的方法,我们可以直接在代码里面调用。
  Spark 1.3.0中对数据库写操作是通过DataFrame类实现的,这个类也是新增的,是将之前的SchemaRDD重命名之后又定义了一些新方法的类。我们需要通过SQLContext来构造DataFrame对象,在SQLContext类中提供了大量可以构造DataFrame对象的方法,感兴趣的可以去看下。本文是通过SQLContext类中的createDataFrame方法来构造的。函数原型如下:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

  接收的RDD是Row类型的,他代表的是one row of output from a relational operator。第二个参数就是我们需要写入表的结构,包括了表的字段名和对应的类型,完整的代码如下:


import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * User: 过往记忆
 * Date: 15-03-17
 * Time: 上午07:24
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1290
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

object SparkToJDBC {
  def main(args: Array[String]): Unit = {

    val url = "jdbc:mysql://localhost:3306/spark?user=iteblog&password=iteblog"

    val sc = new SparkContext
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val schema = StructType(
      StructField("name", StringType) ::
        StructField("age", IntegerType)
        :: Nil)

    val data = sc.parallelize(List(("iteblog", 30), ("iteblog", 29), 
        ("com", 40), ("bt", 33), ("www", 23))).
      map(item => Row.apply(item._1, item._2))
    import sqlContext.implicits._

    val df = sqlContext.createDataFrame(data, schema)
    df.createJDBCTable(url, "sparktomysql", false)

    sc.stop
  }
}

  DataFrame类中提供了很多写数据库的操作,本例中的createJDBCTable就是可以创建表,它的函数原型如下:

def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit

table是表的名字,最后一个参数是如果表存在是否删除表的意思,false代表不删除。
  DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:

def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

  前面两个参数和createJDBCTable一致,第三个参数如果设置为true,则在插入数据之前会调用mysql的TRUNCATE TABLE语句先清掉表中的数据。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark RDD写入RMDB(Mysql)方法二】(https://www.iteblog.com/archives/1290.html)
喜欢 (16)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(6)个小伙伴在吐槽
  1. 在向sqlserver 数据库插入数据的时候 url应该怎么写呢
    Young2015-12-13 09:02 回复
  2. pyspark是如何实现的呀 save吗 那应该怎么写呢
    好吧,我就叫于小布。。。2015-06-26 18:39 回复
    • pyspark是如何实现的我真不知道,可以去参考官方的文档说明。
      w3970907702015-06-26 23:42 回复
  3. 我看上面的代码也没加载mysql的包
    丿獨自メ侃 塰2015-06-09 18:26 回复
  4. spark会自动识别mysql的驱动吗,我是用java写的,即使指定了mysql包还是找不到合适的驱动iNo suitable driver found for jdbc
    丿獨自メ侃 塰2015-06-09 18:24 回复
    • 在分布式环境下,加载mysql驱动包存在一个Bug,1.3及以前的版本 --jars 分发的jar在executor端是通过Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。不过Spark 1.4应该已经fix了这个问题,即 --jars 分发的 jar 也会纳入 YARN 的 classloader 范畴。
      w3970907702015-06-09 18:36 回复