Spark通过CLI写入Cassandra
编程技术  /  houtizong 发布于 3年前   67
上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。
与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下
val sc = new SparkContext("local[3]", "casDemo")val job = new Job()job.setOutputFormatClass(classOf[ColumnFamilyOutputFormat])ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")//casDemo是keyspace,和sc中的casDemo没有任何关系ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
这样子,最基本的配置就已经配置好了。接下来就可以通过RDD来进行写入了。在这里我们假设已经存在一个RDD[(String, Int)],变量名为counts。其中String代表的是一个单词,比如“China”,Int代表的是出现的次数,如1,2.....。然后可以进行如下操作
counts.map{ case (word, count) => { //store the StringType val colWord = new org.apache.cassandra.thrift.Column()//cli通过Thrift访问,所以我们也需要libthrift这个jar包 colWord.setName(ByteBufferUtil.bytes("word"))//column名字为word colWord.setValue(ByteBufferUtil.bytes(word ))//此column值为word colWord.setTimestamp(System.currentTimeMillis()) //store the LongType val colCount = new org.apache.cassandra.thrift.Column() colCount.setName(ByteBufferUtil.bytes("count")) colCount.setValue(ByteBufferUtil.bytes(count.toLong)) colCount.setTimestamp(System.currentTimeMillis()) //store the BooleanType val colmorethan = new org.apache.cassandra.thrift.Column() colmorethan.setName(ByteBufferUtil.bytes("larger5")) if(count>5) { colmorethan.setValue(TRUE) } else { colmorethan.setValue(FALSE) } colmorethan.setTimestamp(System.currentTimeMillis()) //store the FloatType val colPercentage = new org.apache.cassandra.thrift.Column() colPercentage.setName(ByteBufferUtil.bytes("percentage")) colPercentage.setValue(ByteBufferUtil.bytes(1.22.toFloat)) colPercentage.setTimestamp(System.currentTimeMillis()) //store the DoubleType val colRate = new org.apache.cassandra.thrift.Column() colRate.setName(ByteBufferUtil.bytes("rate")) colRate.setValue(ByteBufferUtil.bytes(1.888888)) colRate.setTimestamp(System.currentTimeMillis()) val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis) val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: Nil mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(0).column_or_supercolumn.setColumn(colWord) mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(1).column_or_supercolumn.setColumn(colCount) mutations.get(2).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(2).column_or_supercolumn.setColumn(colmorethan) mutations.get(3).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(3).column_or_supercolumn.setColumn(colPercentage) mutations.get(4).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(4).column_or_supercolumn.setColumn(colRate) (outputkey, mutations) } }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], classOf[ColumnFamilyOutputFormat], job.getConfiguration)
这里使用了RDD的map函数以及scala的case class,这样word就对应着单词,ercount就对应着出现次数。在Cassandra中column name以及timestamp都是必须的,column name是可选的,但是这里我们全部都填上了,主要是将每个基本类型都是用了下。其中比较tricky的是bool类型,因为RDD会分出很多partition在各个节点上,每个节点也需要和Cassandra交流,所以需要将所有的数据都转化成Byte类型。在这里我们ByteBufferUitl工具包完成了这个操作。但是false和true到底怎么表达呢?这个问题我也是尝试了很多次才搞清楚的。如下
val TRUE = new Array[Byte](1) TRUE(0) = 1.toByte val FALSE = new Array[Byte](1)//default to 0
0就是false,非0就是ture,这个还是比较无可厚非的。以为需要的是Byte类型所以将1变成了Byte类型也是可以理解的,唯一让我比较困惑的是为什么需要搞出一个长度为1的数组。因为时间过得比较久了,我也不记得我当初是怎么搞出来的了。Okay,回到上一段代码。所有这些都设置好了之后,根据Cassandra提供的ColumnFamilyOutputFormat接口我们需要提供给一个二元组,这个二元组中第一个参数是ByteBuffer类型的,outputkey正好是,第二个参数应该是List[Mutation]类型的,正好我们设置的mutations满足要求,所以就返回(outputkey, mutations)。接下来就是使用hadoop的newapi完成任务了,不过这里需要注意的就是,为了使用这个函数,需要导入SparkContext._中的implicit函数,将使用PairFunctionRDD中的功能。
这就是基本的通过CLI接口来写入Cassandra。在下一篇中,会讲解使用Cassandra新的CQL接口进行读写。
Have a nice weekend!
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接