流处理类型
Spark Streaming是Spark解决方案中实时处理的组件,本质是将数据源分割为很小的批次,以类似离线批处理的方式处理这部分数据。这种方式提升了数据吞吐能力,但是也增加了数据处理的延迟,延迟通常是秒级或者分钟级。
Spark Streaming底层依赖 Spark Core的 RDD,内部的调度方式也依赖于DAG调度器。Spark Streaming的离散数据流DStream本质上是RDD在流式数据上的抽象。
编写Spark Streaming应用
添加下述依赖到你的Maven项目中:
1 | <dependency> |
下面是WordCount程序的Streaming版本:
1 | import org.apache.spark._ |
这个HelloWorld程序主要有下面几个步骤组成:
- 初始化StreamingContext
- 创建DStream接收器 (会单独起一个线程运行接收器)
- 定义DStream的转换操作
- 定义DStream的输出
- 启动流处理程序
接收器
接收器有两类:基础的接收器(可以自己实现) 和高级数据源(例如 Kafka, Flume等)
下面模拟实现一个自定义的接收器,需要继承Receiver抽象类,然后实现 onStart()
方法和onStop()
方法
1 | class CustomReceiver(host: String, port: Int) |
Spark Streaming Custom Receivers
关于例如Kafka和Flume等高级数据源,下面会有一章介绍
DStream的转换操作
上面说过DStream本质上是RDD在流式数据上的抽象,所以RDD上很多的转换操作在这里都能通用,例如下面几个:
- map(func)
- flatMap(func)
- filter(func)
- repartition(numPartitions)
- union(otherStream)
- count()
- reduce(func)
- countByValue()
- reduceByKey(func, [numTasks])
- join(otherStream, [numTasks])
- cogroup(otherStream, [numTasks])
- transform(func)
- updateStateByKey(func)
下面是流处理需要处理的窗口函数(Window Operations)
Window Operation 窗口操作
在窗口操作中需要指定两个参数:
- window length 窗口长度 —— 窗口的时间长度(上图中为3)
- sliding interval 滑动间隔 —— 窗口操作的执行间隔(上图中为2)
这两个参数必须为流处理定义的批处理间隔的整数倍。
1 | // Reduce last 30 seconds of data, every 10 seconds |
DStreams上的输出操作
foreachRDD
dstream.foreachRDD
是一个功能强大的原语primitive,它允许将数据发送到外部系统。下面将结果保存到数据库中
1 | dstream.foreachRDD { rdd => |
Caching/Persistence 缓存/持久化
类似于RDD,DStream也允许开发者在内存中持久化stream流数据
Checkpoint 设置
一个Streaming程序是需要7X24运行的,所以故障恢复能力是很重要的,为此,Spark Streaming需要检查点以便从故障中恢复。
有两种类型的检查点:
- Metadata检查点
- 数据检查点,将生成RDD保存在可靠的存储中
note: 如果在应用程序中使用updateStateByKey
或者reduceByKeyAndWindow
(带有反转函数),那么必须提供checkpointing目录
集成Kafka、Flume等
下面是集成Kafka的方法,首先添加依赖包
1 | <dependency> |
这里有个坑: 版本不匹配的依赖可能会产生很多兼容性问题,由于Kafka 0.10.0之后引入了新的Kafka Consumer API,所以现在的这个版本的集成还是实验性的,以后可能还会有变化。Spark 2.3.0开始,kafka-0-8的依赖不再被支持。
连接方式的比较
Kafka consumer传统消费者(老方式)需要连接zookeeper,简称Receiver方式,是高级的消费API,自动更新偏移量,支持WAL,但是效率比较低。
新的方式(高效的方式)不需要连接Zookeeper,但是需要自己维护偏移量,简称直连方式,直接连在broker上,但是需要手动维护偏移量,以迭代器的方式边接收数据边处理,效率较高。
Kafka 0.10以后的只支持直连方式
创建DirectStream
1 | import org.apache.kafka.clients.consumer.ConsumerRecord |
KafkaUtils.creatDirectStream
需要传入3个参数:
- StreamingContext
- LocationStrategy,默认采用
LocationStrategies.PreferConsistent
- ConsumerStrategy
偏移量操作
上面说过,直连的方式需要手动维护偏移量。Kafka提供了一个提交offset的API,这个API把特定的kafka topic的offset进行存储。默认情况下,新的消费者会周期性的自动提交offset,这里把enable.auto.commit
设置为false。但是,使用commitAsync api,用户可以在确保计算结果被成功保存后自己来提交offset。和使用checkpoint方式相比,Kafka是一个不用关心应用代码的变化可靠存储系统。
获取偏移量
1 | stream.foreachRDD { rdd => |
可以使用commitAsync
来提交偏移量
1 | stream.foreachRDD { rdd => |