上面在Spark-Streaming中介绍了foreach,dstream.foreachRDD
是一个功能强大的原语primitive,它允许将数据发送到外部系统。输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。下面就是一些常见的错误用法。
错误使用
在Spark驱动中创建一个连接对象,在 Spark worker 中尝试调用这个连接对象将记录保存到RDD中,很容易下出下面的代码:
1 | dstream.foreachRDD { rdd => |
这是不正确的,因为这需要先序列化连接对象,然后将它从driver端发送到worker中。这样的连接对象在机器之间不能传送。通常会报不能序列化的错误:
正确做法是在worker中创建连接对象
1 | dstream.foreachRDD { rdd => |
但是这么做,很明显也有问题:为每一条记录都创建一个连接对象。下面开始改进
改进
创建一个连接对象会有资源和时间的开销,为每条记录创建和销毁连接对象会有非常高的开支,第一种优化的方案是:为RDD的每个分区创建一个连接对象:
1 | dstream.foreachRDD { rdd => |
进一步改进
创建连接对象对资源和时间要求很高,那么可以利用连接池来维护有限的连接对象资源。
创建静态连接对象池:
1 | public class ConnectionPool { |
这样,每使用一个连接对象将一批数据写入外部系统之后,就将该连接对象放回连接池。
1 | dstream.foreachRDD { rdd => |
再进一步改进
经过上面连接池的改进,基本上性能已经难满足要求了。如果RDD数据量比较大,也可以考虑分批次写入外部存储。
对于rdd中一个分区的数据,首先从连接池中获取一个连接对象,然后准备好SQL语句,超过500条记录后,向数据库提交一次数据(如果不足500条,就将这批数据放到一个批次),提交数据后,将连接对象放回连接池以便后面使用。
1 | dstream.foreachRDD { (rdd, time) => |