Spark通过CLI写入Cassandra

编程技术  /  houtizong 发布于 3年前   67

上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。

 

与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下

 

 

val sc = new SparkContext("local[3]", "casDemo")val job = new Job()job.setOutputFormatClass(classOf[ColumnFamilyOutputFormat])ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")//casDemo是keyspace,和sc中的casDemo没有任何关系ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

 

这样子,最基本的配置就已经配置好了。接下来就可以通过RDD来进行写入了。在这里我们假设已经存在一个RDD[(String, Int)],变量名为counts。其中String代表的是一个单词,比如“China”,Int代表的是出现的次数,如1,2.....。然后可以进行如下操作

       counts.map{            case (word, count) => {                //store the StringType                val colWord = new org.apache.cassandra.thrift.Column()//cli通过Thrift访问,所以我们也需要libthrift这个jar包                colWord.setName(ByteBufferUtil.bytes("word"))//column名字为word                colWord.setValue(ByteBufferUtil.bytes(word ))//此column值为word                colWord.setTimestamp(System.currentTimeMillis())                //store the LongType                val colCount = new org.apache.cassandra.thrift.Column()                colCount.setName(ByteBufferUtil.bytes("count"))                colCount.setValue(ByteBufferUtil.bytes(count.toLong))                colCount.setTimestamp(System.currentTimeMillis())                //store the BooleanType                val colmorethan = new org.apache.cassandra.thrift.Column()                colmorethan.setName(ByteBufferUtil.bytes("larger5"))                if(count>5) {                    colmorethan.setValue(TRUE)                } else {                    colmorethan.setValue(FALSE)                }                colmorethan.setTimestamp(System.currentTimeMillis())                                //store the FloatType                val colPercentage = new org.apache.cassandra.thrift.Column()                colPercentage.setName(ByteBufferUtil.bytes("percentage"))                colPercentage.setValue(ByteBufferUtil.bytes(1.22.toFloat))                colPercentage.setTimestamp(System.currentTimeMillis())                //store the DoubleType                val colRate = new org.apache.cassandra.thrift.Column()                colRate.setName(ByteBufferUtil.bytes("rate"))                colRate.setValue(ByteBufferUtil.bytes(1.888888))                colRate.setTimestamp(System.currentTimeMillis())                val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)                val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() ::  Nil                mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())                mutations.get(0).column_or_supercolumn.setColumn(colWord)                mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())                mutations.get(1).column_or_supercolumn.setColumn(colCount)                mutations.get(2).setColumn_or_supercolumn(new ColumnOrSuperColumn())                mutations.get(2).column_or_supercolumn.setColumn(colmorethan)                mutations.get(3).setColumn_or_supercolumn(new ColumnOrSuperColumn())                mutations.get(3).column_or_supercolumn.setColumn(colPercentage)                mutations.get(4).setColumn_or_supercolumn(new ColumnOrSuperColumn())                mutations.get(4).column_or_supercolumn.setColumn(colRate)                (outputkey, mutations)            }        }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],            classOf[ColumnFamilyOutputFormat], job.getConfiguration)

 

这里使用了RDD的map函数以及scala的case class,这样word就对应着单词,ercount就对应着出现次数。在Cassandra中column name以及timestamp都是必须的,column name是可选的,但是这里我们全部都填上了,主要是将每个基本类型都是用了下。其中比较tricky的是bool类型,因为RDD会分出很多partition在各个节点上,每个节点也需要和Cassandra交流,所以需要将所有的数据都转化成Byte类型。在这里我们ByteBufferUitl工具包完成了这个操作。但是false和true到底怎么表达呢?这个问题我也是尝试了很多次才搞清楚的。如下

    val TRUE = new Array[Byte](1)        TRUE(0) = 1.toByte    val FALSE = new Array[Byte](1)//default to 0

 

0就是false,非0就是ture,这个还是比较无可厚非的。以为需要的是Byte类型所以将1变成了Byte类型也是可以理解的,唯一让我比较困惑的是为什么需要搞出一个长度为1的数组。因为时间过得比较久了,我也不记得我当初是怎么搞出来的了。Okay,回到上一段代码。所有这些都设置好了之后,根据Cassandra提供的ColumnFamilyOutputFormat接口我们需要提供给一个二元组,这个二元组中第一个参数是ByteBuffer类型的,outputkey正好是,第二个参数应该是List[Mutation]类型的,正好我们设置的mutations满足要求,所以就返回(outputkey, mutations)。接下来就是使用hadoop的newapi完成任务了,不过这里需要注意的就是,为了使用这个函数,需要导入SparkContext._中的implicit函数,将使用PairFunctionRDD中的功能。

 

这就是基本的通过CLI接口来写入Cassandra。在下一篇中,会讲解使用Cassandra新的CQL接口进行读写。

Have a nice weekend!

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客