【Spark二十】运行Spark Streaming的NetworkWordCount实例

编程技术  /  houtizong 发布于 3年前   91

Spark Streaming简介

 

NetworkWordCount代码

 

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.spark.examples.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.storage.StorageLevel/*** Counts words in UTF8 encoded, '\n' delimited text received from the network every second.** Usage: NetworkWordCount <hostname> <port>* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.** To run this on your local machine, you need to first run a Netcat server* `$ nc -lk 9999`* and then run the example* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`*/object NetworkWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: NetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()// Create the context with a 1 second batch size// 创建SparkConf实例val sparkConf = new SparkConf().setAppName("NetworkWordCount")///创建Spark Streaming Context,每隔1秒钟处理一批数据,那么这一秒收集的数据存放在哪,如何将收集的数据推送出去?是生产者主动推出去还是消费者每隔1秒钟来拉取一次数据val ssc = new StreamingContext(sparkConf, Seconds(1))// Create a socket stream on target ip:port and count the// words in input stream of \n delimited text (eg. generated by 'nc')// Note that no duplication in storage level only for running locally.// Replication necessary in distributed scenario for fault tolerance.val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)//flatMap是把将每一行使用空格做分解,那么words对应的数据结构是怎么样的?        ///words是个集合,每个集合元素依然是个集合,这个集合存放单词        val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.print()//启动计算作业ssc.start()//等待结束,什么时候结束作业,即触发什么条件会让作业执行结束ssc.awaitTermination()   }}

 

 运行NetworkWordCount

 1. 启动NetCat命令

 

[hadoop@hadoop ~]$ nc -lk 9999This is a book 

启动后,可以在后面输入文本,比如This is a book

 

2. 在另外一个终端,Spark提交NetworkWordCount任务

 

./bin/run-example streaming.NetworkWordCount localhost 9999

 

启动后,发现每隔一秒钟,打印一行日志,例如

 

15/01/11 00:10:09 INFO scheduler.JobScheduler: Added jobs for time 1420953009000 ms15/01/11 00:10:10 INFO scheduler.JobScheduler: Added jobs for time 1420953010000 ms15/01/11 00:10:11 INFO scheduler.JobScheduler: Added jobs for time 1420953011000 ms15/01/11 00:10:12 INFO scheduler.JobScheduler: Added jobs for time 1420953012000 ms15/01/11 00:10:13 INFO scheduler.JobScheduler: Added jobs for time 1420953013000 ms15/01/11 00:10:14 INFO scheduler.JobScheduler: Added jobs for time 1420953014000 ms15/01/11 00:10:15 INFO scheduler.JobScheduler: Added jobs for time 1420953015000 ms15/01/11 00:10:16 INFO scheduler.JobScheduler: Added jobs for time 1420953016000 ms15/01/11 00:10:17 INFO scheduler.JobScheduler: Added jobs for time 1420953017000 ms15/01/11 00:10:18 INFO scheduler.JobScheduler: Added jobs for time 1420953018000 ms///Added jobs是个什么概念?不就是一个Job吗15/01/11 00:10:19 INFO scheduler.JobScheduler: Added jobs for time 1420953019000 ms15/01/11 00:10:20 INFO scheduler.JobScheduler: Added jobs for time 1420953020000 ms15/01/11 00:10:21 INFO scheduler.JobScheduler: Added jobs for time 1420953021000 ms15/01/11 00:10:22 INFO scheduler.JobScheduler: Added jobs for time 1420953022000 ms15/01/11 00:10:23 INFO scheduler.JobScheduler: Added jobs for time 1420953023000 ms15/01/11 00:10:24 INFO scheduler.JobScheduler: Added jobs for time 1420953024000 ms15/01/11 00:10:25 INFO scheduler.JobScheduler: Added jobs for time 1420953025000 ms15/01/11 00:10:26 INFO scheduler.JobScheduler: Added jobs for time 1420953026000 ms15/01/11 00:10:27 INFO scheduler.JobScheduler: Added jobs for time 1420953027000 ms15/01/11 00:10:28 INFO scheduler.JobScheduler: Added jobs for time 1420953028000 ms

 

3. 在nc -lk运行的终端,输入文本,发现Spark作业没有将文本输出

不过通过nc -lk输入一行文本后,控制台会显示如下信息

 

5/01/11 00:29:08 INFO storage.MemoryStore: ensureFreeSpace(11) called with curMem=91198, maxMem=28024897515/01/11 00:29:08 INFO storage.MemoryStore: Block input-0-1420954147800 stored as bytes in memory (estimated size 11.0 B, free 267.2 MB)15/01/11 00:29:08 INFO scheduler.JobScheduler: Added jobs for time 1420954148000 ms15/01/11 00:29:08 INFO storage.BlockManagerInfo: Added input-0-1420954147800 in memory on localhost:57786 (size: 11.0 B, free: 267.2 MB)15/01/11 00:29:08 INFO storage.BlockManagerMaster: Updated info of block input-0-142095414780015/01/11 00:29:08 INFO receiver.BlockGenerator: Pushed block input-0-1420954147800

 

4. 关闭nc命令的执行,此时,Spark报错,提示9999端口连接不上

 

///相对于数据源而言,Spark是Receiver,所以Spark Streaming有Receiver这个模块15/01/11 00:09:36 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 015/01/11 00:09:37 INFO scheduler.JobScheduler: Added jobs for time 1420952977000 ms15/01/11 00:09:38 INFO scheduler.JobScheduler: Added jobs for time 1420952978000 ms///重提开始Receiver15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver again15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart15/01/11 00:09:38 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Receiver started again15/01/11 00:09:38 INFO dstream.SocketReceiver: Connecting to localhost:999915/01/11 00:09:38 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method)at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)at java.net.Socket.connect(Socket.java:579)at java.net.Socket.connect(Socket.java:528)at java.net.Socket.<init>(Socket.java:425)at java.net.Socket.<init>(Socket.java:208)at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)

 

关于接不到消息

下面的代码收不到消息

 

val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local")

 

而下面的代码则能收到消息

 

val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local[2]")

原因来自于http://spark.apache.org/docs/latest/streaming-programming-guide.html:

  • When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).

  • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

 关于Receiver的线程数

上面提到,有一个线程用于运行Receiver,实际中,可能需要多个线程运行Receiver:

Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers on two workers, thus allowing data to be received in parallel, and increasing overall throughput. These multiple DStream can be unioned together to create a single DStream. Then the transformations that was being applied on the single input DStream can applied on the unified stream. This is done as follows

 

val numStreams = 5val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }val unifiedStream = streamingContext.union(kafkaStreams)unifiedStream.print()

 

关于Spark内部处理数据的并行数

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客