【Spark七十六】Spark计算结果存到MySQL

Mysql  /  houtizong 发布于 3年前   86
package spark.examples.dbimport java.sql.{PreparedStatement, Connection, DriverManager}import com.mysql.jdbc.Driverimport org.apache.spark.{SparkContext, SparkConf}object SparkMySQLIntegration {  case class Person(name: String, age: Int)  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local");    val sc = new SparkContext(conf);    val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25)))    def func(iter: Iterator[(String, Int)]): Unit = {//      Class.forName("com.mysql.jdbc.Driver ")      var conn:Connection = null      val d :Driver = null      var pstmt:PreparedStatement = null      try {        val url="jdbc:mysql://localhost:3306/person";        val user="root";        val password=""        //在forPartition函数内打开连接,这样连接将在worker上打开        conn = DriverManager.getConnection(url, user, password)        while (iter.hasNext) {          val item = iter.next()          println(item._1 + "," + item._2)          val sql = "insert into TBL_PERSON(name, age) values (?, ?)";          pstmt = conn.prepareStatement(sql);          pstmt.setString(1, item._1)          pstmt.setInt(2, item._2)          pstmt.executeUpdate();        }      } catch {        case e: Exception => e.printStackTrace()      } finally {        if (pstmt != null) {          pstmt.close()        }        if (conn != null) {          conn.close()        }      }    }    data.foreachPartition(func);  }}

 

这个代码遇到了两个坑,

1. 按照Java程序员使用JDBC的习惯,首先通过Class.forName("com.mysql.jdbc.Driver ")注册MySQL的JDBC驱动,但是在Scala中却不需要这么做,这么做还出错,包ClassNotFoundExeception(但是com.mysql.jdbc.Driver明明在classpath上)

所以代码中添加了注释

2. 在本地运行这个代码时,反反复复报错说sql语句的(?,?)附近有语法错误,反反复复的看也没看出来哪里有错,后来发现原来是pstmt.executeUpdate();写成了pstmt.executeUpdate(sql);如此严重的编译错,Intellij Idea竟然编译不报错!!!

 

 

Spark RDD存入MySQL等存储系统最佳实践

将Spark的RDD写入数据存储系统,不管是关系型数据库如MySQL,还是NoSQL,如MongoDB,HBase,都面临着比较大的存储压力,因为每个RDD的每个partition的数据量可能非常大,因为必须节省有限的存储服务器连接,如下是一些最佳实践:

 

  • You can write your own custom writer and call a transform on your RDD to write each element to a database of your choice, but there's a lot of ways to write something that looks like it would work, but does not work well in a distributed environment. Here are some things to watch out for:
  • A common naive mistake is to open a connection on the Spark driver program, and then try to use that connection on the Spark workers. The connection should be opened on the Spark worker, such as by calling forEachPartition and opening the connection inside that function.
  • Use partitioning to control the parallelism for writing to your data storage. Your data storage may not support too many concurrent connections.
  • Use batching for writing out multiple objects at a time if batching is optimal for your data storage.
  • Make sure your write mechanism is resilient to failures.
  • Writing out a very large dataset can take a long time, which increases the chance something can go wrong - a network failure, etc.
  • Consider utilizing a static pool of database connections on your Spark workers.
  • If you are writing to a sharded data storage, partition your RDD to match your sharding strategy. That way each of your Spark workers only connects to one database shard, rather than each Spark worker connecting to every database shard.
  • Be cautious when writing out so much data, and make sure you understand the distributed nature of Spark!

 

**上面提到了batch操作,batch应该是一个节省连接资源非常有效的手段,将多个更新或者插入操作组成一个batch,使用一个连接将数据传送到存储系统引擎,关注下MySQL和MongoDB的batch操作**

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客