博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
DirectStream、Stream的区别-SparkStreaming源码分析02
阅读量:6258 次
发布时间:2019-06-22

本文共 1430 字,大约阅读时间需要 4 分钟。

转http://hadoop1989.com/2016/03/15/KafkaStreaming/

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同。

1. Receiver-based Approach

示例代码:

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,

[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

2. Direct Approach (No Receivers)

示例代码:

import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])

源码实现

1、 KafkaUtils.createStream

首先从源码层面来看,其主要调用栈顺序:

KafkaUtils.createStream--->createStream--->new KafkaInputDStream--->new KafkaReceiver

KafkaReceiver类继承了Receiver,当Reciver被调用起来时,执行onStart()方法,MessageHandler负责将收到的数据进行存储。执行流程如下:

  1. 创建createStreamReceiver被调起执行
  2. 连接ZooKeeper,读取相应的ConsumerTopic配置信息等
  3. 通过consumerConnector连接到Kafka集群,收取指定topic的数据
  4. 创建KafkaMessageHandler线程池来对数据进行处理,通过ReceiverInputDStream中的方法,将数据转换成BlockRDD,供后续计算

2、 KafkaUtils.createDirectStream

主要调用栈顺序:

KafkaUtils.createDirectStream—> new DirectKafkaInputDStream

执行流程如下:

  1. 实例化KafkaCluster,根据用户配置的Kafka参数,连接Kafka集群
  2. 通过Kafka API读取Topic中每个Partition最后一次读的Offset
  3. 接收成功的数据,直接转换成KafkaRDD,供后续计算

架构

通过两张图,来看下他们架构。

1、 Receiver-based Approach

2、 Direct Approach (No Receivers)

优缺点

相关的优缺点,在官网上已经说得很清楚了。追求效率、数据准确可以使用Direct方式,但需要自己对Offset进行处理。

参考资料:

 

转载地址:http://wjxsa.baihongyu.com/

你可能感兴趣的文章
hdu1753()模拟大型实景数字相加
查看>>
Cocos2d-x之MenuItem
查看>>
Esper学习之六:EPL语法(二)
查看>>
流和文件
查看>>
iOS:UIMapView地图视图控件的简单使用
查看>>
关于Python的3张图
查看>>
作IFRAME于iOS您的设备上支持滚动
查看>>
后台数据库优化——板机
查看>>
C++ redirect input
查看>>
linux_sound_alsa_Android+alsa音频系统中的几个问题
查看>>
IOS Core Image之二
查看>>
python---__getattr__\__setattr_重载'.'操作
查看>>
VMware克隆虚拟机后网络不能正常使用的解决方法
查看>>
android平台TextView使用ImageSpan画廊GIF图像
查看>>
Android开发之ListView-SimpleAdapter的使用
查看>>
App.config提示错误“配置系统未能初始化”
查看>>
Angular - - ngChange、ngChecked、ngClick、ngDblclick
查看>>
JAVA学习第五十九课 — 网络编程概述
查看>>
远程共享文件夹
查看>>
convert2utf8withbom
查看>>