【Spark七十八】Spark Kyro序列化
编程技术  /  houtizong 发布于 3年前   147
当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。
When Spark is transferring data over the network or spilling data to disk, it needs toserialize objects into a binary format. This comes into play during shuffle operations,where potentially large amounts of data are transferred. By default Spark will useJava’s built-in serializer. Spark also supports the use of Kryo, a third-party serializationlibrary that improves on Java’s serialization by offering both faster serializationtimes and a more compact binary representation, but cannot serialize all types ofobjects “out of the box.” Almost all applications will benefit from shifting to Kryo forserialization.
代码示例:
package spark.examples.kryoimport com.esotericsoftware.kryo.Kryoimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.serializer.KryoRegistrator//两个成员变量name和age,同时必须实现java.io.Serializable接口class MyClass1(val name: String, val age: Int) extends java.io.Serializable {}//两个成员变量name和age,同时必须实现java.io.Serializable接口class MyClass2(val name: String, val age: Int) extends java.io.Serializable {}//注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializableclass MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[MyClass1]); kryo.register(classOf[MyClass2]); }}object SparkKryo { def main(args: Array[String]) { //设置序列化器为KryoSerializer,也可以在配置文件中进行配置 System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator") val conf = new SparkConf() conf.setAppName("SparkKryo") conf.setMaster("local[3]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(new MyClass1("Tom", 31), new MyClass1("Jack", 23), new MyClass1("Mary", 19))) val fileDir = "file:///d:/wordcount" + System.currentTimeMillis() //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中 rdd.saveAsObjectFile(fileDir) //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1 val rdd1 = sc.objectFile[MyClass1](fileDir + "/" + "part-00000") rdd1.foreachPartition(iter => { while (iter.hasNext) { val objOfMyClass1 = iter.next(); println(objOfMyClass1.name) } }) sc.stop }}
查看保存到文件中的内容,是个二进制数据:
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable 蓑_xi??蛔?z汲 i e ur [Lspark.examples.kryo.MyClass1;?独#v? xp sr spark.examples.kryo.MyClass1z 峌# xp
对于普通字符,数字,字符串写入到object文件,是否也是序列化的过程?明确指定使用kvro序列化Int之后,保存的文件确实是二进制的。去掉对Int的注册之后,结果还是一样,序列化的结果完全一样,结果都是:
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable F脗?庻籡陭姯&? ' # ur [IM篳&v瓴? xp
import com.esotericsoftware.kryo.Kryoimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.serializer.KryoRegistrator//注册使用Kryo序列化的类,对Int进行序列化class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[Int]); }}object SparkKryoPrimitiveType { def main(args: Array[String]) { //设置序列化器为KryoSerializer,也可以在配置文件中进行配置 System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator") val conf = new SparkConf() conf.setAppName("SparkKryoPrimitiveType") conf.setMaster("local[3]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(1, 3, 7, 9, 11, 22)) val fileDir = "file:///d:/wordcount" + System.currentTimeMillis() //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中 rdd.saveAsObjectFile(fileDir) //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1 val rdd1 = sc.objectFile[Int](fileDir + "/" + "part-00000") rdd1.foreachPartition(iter => { while (iter.hasNext) { println(iter.next()) } }) sc.stop }}
其它:
指定使用Kyro序列化,以及注册Kyro序列化类,可可以使用如下方式
val conf = new SparkConf()//这句是多于的,调用conf的registerKryoClasses时,已经设置了序列化方法conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// Be strict about class registration///如果一个要序列化的类没有进行Kryo注册,则强制Spark报错conf.set("spark.kryo.registrationRequired", "true")conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))
/** * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { val allClassNames = new LinkedHashSet[String]() allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) set("spark.kryo.classesToRegister", allClassNames.mkString(",")) set("spark.serializer", classOf[KryoSerializer].getName) this }
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接