spark与kafka的交互_spark.streaming.kafka.consumer.cache.enabled-程序员宅基地

技术标签: spark  kafka  

spark如何与kafka的交互

本文采用的是Driect连接方式,KafkaUtils.createDirectStream开连接kafka,可以看到两个createDirectStream方法,参数不同,最终都是通过new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)来创建流

  @Experimental
  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]
    ): InputDStream[ConsumerRecord[K, V]] = {
    
    val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
    createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
  }
   @Experimental
  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V],
      perPartitionConfig: PerPartitionConfig
    ): InputDStream[ConsumerRecord[K, V]] = {
    
    new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
  }

Driver端和Executor端都分别做了什么

**Driver端**
private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
    

类DirectKafkaInputDStream初始化一个真正的流,类extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets;

/**
 * This is the abstract base class for all input streams. This class provides methods
 * start() and stop() which are called by Spark Streaming system to start and stop
 * receiving data, respectively.
 * Input streams that can generate RDDs from new data by running a service/thread only on
 * the driver node (that is, without running a receiver on worker nodes), can be
 * implemented by directly inheriting this InputDStream....
 */
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
  extends DStream[T](_ssc) {
    

通过源码的描述可以得出类InputDStream中start() 和 stop()方法来控制Spark Streaming 开始和停止消费数据,并且是running在driver node,入口从start()方法开始,其中有两个地方要介绍:①consumer,②paranoidPoll()方法

 override def start(): Unit = {
    
    val c = consumer
    paranoidPoll(c)
    if (currentOffsets.isEmpty) {
    
      currentOffsets = c.assignment().asScala.map {
     tp =>
        tp -> c.position(tp)
      }.toMap
    }

    // don't actually want to consume any messages, so pause all partitions
    c.pause(currentOffsets.keySet.asJava)
  }

①为consumer
driver端会启动一个消费者,这个消费者加载偏移量,不进行数据的消费,

 @transient private var kc: Consumer[K, V] = null
  def consumer(): Consumer[K, V] = this.synchronized {
    
    if (null == kc) {
    
      kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
    }
    kc
  }
    /**
   * Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
   * See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.
   * This consumer will be used on the driver to query for offsets only, not messages.
   * The consumer must be returned in a state that it is safe to call poll(0) on.
   * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
   * has successfully read.  Will be empty on initial start, possibly non-empty on restart from
   * checkpoint.
   */
  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
}

其中onStart方法实现,并kafka设置的auto.offset.reset参数将要被使用

def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
    
    val consumer = new KafkaConsumer[K, V](kafkaParams)
    consumer.subscribe(topics)
    val toSeek = if (currentOffsets.isEmpty) {
    
      offsets
    } else {
    
      currentOffsets
    }
    if (!toSeek.isEmpty) {
    
      // work around KAFKA-3370 when reset is none
      // poll will throw if no position, i.e. auto offset reset none and no explicit position
      // but cant seek to a position before poll, because poll is what gets subscription partitions
      // So, poll, suppress the first exception, then seek
      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
      val shouldSuppress =
        aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
      try {
    
        consumer.poll(0)
      } catch {
    
        case x: NoOffsetForPartitionException if shouldSuppress =>
          logWarning("Catching NoOffsetForPartitionException since " +
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See KAFKA-3370")
      }
      toSeek.asScala.foreach {
     case (topicPartition, offset) =>
          consumer.seek(topicPartition, offset)
      }
      // we've called poll, we must pause or next poll may consume messages and set position
      consumer.pause(consumer.assignment())
    }

    consumer
  }

②paranoidPoll()方法
主要是来当前topic每个分区的偏移量,在初始启动有offset传入的情况下会把consumer的offset游标seek到对应的地址(已提交的下一个)

  private def paranoidPoll(c: Consumer[K, V]): Unit = {
    
    val msgs = c.poll(0)
    if (!msgs.isEmpty) {
    
      // position should be minimum offset per topicpartition
      msgs.asScala.foldLeft(Map[TopicPartition, Long]()) {
     (acc, m) =>
        val tp = new TopicPartition(m.topic, m.partition)
        val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
        acc + (tp -> off)
      }.foreach {
     case (tp, off) =>
          logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
          c.seek(tp, off)
      }
    }
  }

DirectKafkaInputDStream的另一个核心方法是compute,用来生成RDD,计算任务,其中:
① val untilOffsets = clamp(latestOffsets()) ------根据maxRatePerPartition、backpressure.enabled的参数来计算下批要截止的offse;
② offsetRanges ------为每个分区起始和终止的offset;
③val rdd = new KafkaRDD[…] ------真正初始化RDD;

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    

    val untilOffsets = clamp(latestOffsets()) 
    val offsetRanges = untilOffsets.map {
     case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter {
     offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map {
     offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

其中:

val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)

初始化RDD时会默认使用kafkaConsumerCache–kafka消费者缓存,默认的初始容量为16,最大为64,也可以通过配置参数来指定; 注意ConsumerCache在特定环境下线程不安全,–KafkaConsumer is not safe for multi-threaded access

private void acquire() {
    
    ensureNotClosed();
    long threadId = Thread.currentThread().getId();
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
}

想要避免把spark.streaming.kafka.consumer.cache.enabled 设置为false,不使用ConsumerCache

val consumer = if (useConsumerCache) {
    
  CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
  if (context.attemptNumber >= 1) {
    
    // just in case the prior attempt failures were cache related
    CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
  }
  CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
} else {
    
  CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
}

缺点:每次都要new,使用完再close

**Executor端**

在 new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)时 executor端的事情就开始了,为了防止与driver创建的consumer发生冲突,在程序初始kafkaParams基础上调用fixKafkaParams方法进行了部分参数的修改

 /**
 1. Tweak kafka params to prevent issues on executors
   */
  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
    
    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)

    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

    // driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
    
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

    // possible workaround for KAFKA-3135
    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
    
      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
    }
  }

再通过protected def getPartitions: Array[Partition]来把rdd分区与topic的分区一一对应起来,包括起始结束offset

  override def getPartitions: Array[Partition] = {
    
    offsetRanges.zipWithIndex.map {
     case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }

调用compute方法初始化KafkaRDDIterator,根据分区中的偏移量调用next()方法迭代偏移量从kafka获取消息,在driver端说的缓存将在这里被使用!

 override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
    
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
    if (part.fromOffset == part.untilOffset) {
    
      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")
      Iterator.empty
    } else {
    
      new KafkaRDDIterator(part, context)
    }
  }

  private class KafkaRDDIterator(
      part: KafkaRDDPartition,
      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    

    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

    context.addTaskCompletionListener{
     context => closeIfNeeded() }

    val consumer = if (useConsumerCache) {
    
      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
      if (context.attemptNumber >= 1) {
    
        // just in case the prior attempt failures were cache related
        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
      }
      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    } else {
    
      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    }

    var requestOffset = part.fromOffset

    def closeIfNeeded(): Unit = {
    
      if (!useConsumerCache && consumer != null) {
    
        consumer.close
      }
    }

    override def hasNext(): Boolean = requestOffset < part.untilOffset

    override def next(): ConsumerRecord[K, V] = {
    
      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
      val r = consumer.get(requestOffset, pollTimeout)
      requestOffset += 1
      r
    }
  }

偏移量是怎么管控的

偏移量管控方式有三种:
①checkpoint
在spark参数中设置了checkpoint.dir,spark会把每次rdds的数据状态保存到HDFS等,当然这里面也包含了offset ------If you enable Spark checkpointing, offsets will be stored in the checkpoint

弊端: 一旦流式代码发生改变、配置改变、文件损坏等,程序就会报错,这时只有删除checkpoint文件,新启动的程序,只能从kafka的smallest或者largest的偏移量消费,默认是从最新的,如果是最新的,而不是上一次程序停止的那个偏移量,就会导致有数据丢失,如果是老的,那么就会导致数据重复。不管怎么样搞,都有问题。

②自动提交
kafka.enable.auto.commit true 设置自动提交,但是这个自动提交最好不与spark的state一起使用,不是说会报错,有state会强制设置checkpoint路径,这时还是出现方式①的问题。

③手动提交
由于kafka 0.10+版本偏移量在本身_consumer_offsets,Spark Streaming也提供了commitAsync() API用于手动提交offset,其用法

realStream.
      foreachRDD(rdd => {
    

        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        ...
        })
         realStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

使用时要注意不能破坏掉RDD分区与topic分区之间的映射关系,map()/mapPartitions()这样的算子是安全的,会引起shuffle或者repartition的算子是不安全的。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/dddddssssa/article/details/107522424

智能推荐

[ubuntu 14.04] android 编译环境搭建_ubuntu20.4编译android8.1-程序员宅基地

文章浏览阅读544次。最近想学习下自己搭建编译安卓源码的服务器。安装了ubuntu 14.04 64位桌面版操作系统。我下载的是 jdk1.7.0_71版本的。下面安装参考:http://www.cnblogs.com/fangbo/p/3941816.html1、下载最新的jdk安装,地址:http://www.oracle.com/technetwork/java/javase/download_ubuntu20.4编译android8.1

[Haskell] CodeWars|Sum of Digits_codewars who likes it?-程序员宅基地

文章浏览阅读326次。https://www.codewars.com/kata/541c8630095125aba6000c00/haskell题目本题你需要写一个Digital Root函数。 Digital root是一个数字所有位的递归和。给定n,算出n各位的和n′n',继续这个操作直到n(p)=n(p−1)n^{(p)}=n^{(p-1)}。以下是范例:digital_root(16)=> 1 + 6=>_codewars who likes it?

练习1-程序员宅基地

文章浏览阅读964次。实现以下需求: 1、baidu公司提供www.baidu.com网站服务,页面显示内容“It’s www.baidu.com”,网站根目录在/www/html目录,网站通过www2.baidu.com也可以正常访问; 2、baidu公司同时提供邮件服务,邮件服务器地址为mail.baidu.com; 3、baidu公司有三个用户,xixi,maomao和haha可以和qq公司的邮件互相收发。...

Java GUI编程的几种常用布局管理器_java为gui提供了哪些布局管理器?-程序员宅基地

文章浏览阅读7.3k次,点赞9次,收藏27次。Java GUI编程的几种常用布局管理器 本人是一个大二的学生。因为最近有做JavaGUI界面的需求,因此重新开始熟悉JavaGUI的各种控件和布局。然后以次博文为笔记,总结、完善以及发表最近学习的一些技术要点。希望各位支持和指正。目录Java GUI编程的几种常用布局管理器目录一BorderLayout默认布局管理器二FlowLayout布局管理器三GridLayout布局管理器四G_java为gui提供了哪些布局管理器?

Quartz在Spring中设置动态定时任务_spring quartz 动态配置-程序员宅基地

文章浏览阅读1w次。什么是动态定时任务: 是由客户制定生成的,服务端只知道该去执行什么任务,但任务的定时是不确定的(是由客户制定)。这样总不能修改配置文件每定制个定时任务就增加一个trigger吧,即便允许客户修改配置文件,但总需要重新启动web服务啊,研究了下Quartz在Spring中的动态定时,发现: cronExpression是关键,如果可以动态设置cronExpression的值,也就说如果我们可以直_spring quartz 动态配置

中南大学c语言程序设计2013年下学期期末考试,2013级计算机专业本科生C语言程序设计期末考试资料.doc...-程序员宅基地

文章浏览阅读87次。中南大学考试试卷—2014学年第 1 学期期末考试试题 时间95分钟计算机与程序设计语言基础 课程40学时 2.5学分 考试形式:闭卷专业年级:计算机科学与技术2013级 总分 100分,占总评成绩70 %注:此页不作答题纸,请将答案写在答题纸上判断对错(101分。对的划“√”,错的划“×”)1.表达式1/4+2.75的值是3。( )函数无返回值,它形参。( )在Turbo C中,实型数据在内..._中南大学c++期末考试

随便推点

IDEA 不自动复制资源文件到编译目录 classes 的问题_idea不会自动复制web文件-程序员宅基地

文章浏览阅读5.8k次。问题:当引入jQuery.js资源文件时,一直报资源文件未找到的问题,在排除路径错误后,再次去target目录下查看,发现target目录下根本没有我所引入的文件。解决方法:1.手动复制文件及目录到target对应目录下,一开始我是这么做的---不建议,更不是长久之计2.推荐,重构项目,会把所有资源文件映射到target目录下..._idea不会自动复制web文件

802.1Q帧格式_画出 802.1q 协议的帧格式-程序员宅基地

文章浏览阅读1.8w次,点赞2次,收藏14次。802.1Q帧格式802.1QTag的长度是4bytes,它位于以太网帧中源MAC地址和长度/类型之间。802.1QTag包含4个字段。Type:长度为2bytes,表示帧类型,802.1Qtag帧中type字段取固定值0x8100,如果不支持802.1Q的设备收到802.1Q帧,则将其丢弃。PRI:priority字段,长度为3bit,表示以太网帧的优先级,取值范围是0~7,数值越大_画出 802.1q 协议的帧格式

CQRS 架构-程序员宅基地

文章浏览阅读2.7k次。CQRS 是一个读写分离的架构思想,全称是:Command Query Responsibility Segregation,即命令查询职责分离,表示在架构层面,将一个系统分为写入(命令)和查询两部分。一个命令表示一种意图,表示命令系统做什么修改,命令的执行结果通常不需要返回;一个查询表示向系统查询数据并返回。读写两边可以用不同的架构实现,方便实现 CQ 两端的分别优化。CQRS 架构里通常读..._cqrs 架构

efishell无法开机shell_电脑出现EFIshellversion解决方法W1048/海尔青春小蓝-程序员宅基地

文章浏览阅读1.2w次,点赞2次,收藏4次。Haier W1048 海尔青春小蓝由于Bios设置错误。UEFI设置为第一启动项后,电脑出现 EFI shell version 2.50 如图。无法进入BIOS也无法操作,到处看帖一遍一遍的尝试终于解决了。方法见文末亲测方法有效。建议回到BIOS恢复最优的BIOS原始设置。进入方法我的机器WIN10系统。长按Shift键,然后开始菜单那里电源重启。请选择疑难解答-高级选项-UEFI固件设置-重..._efi shell version

PHP等级水平评定标准_php 质量级别-程序员宅基地

文章浏览阅读7.1k次。《PHP等级水平评定标准》,又看到了前进的方向。分享下,这样和我曾经一样迷茫的phper不用迷茫了。 0级:(没有计算机编程基础,在培训学校里培训了三个月会php或者以前搭建过网站,又有很长一段时间没写过代码的社会人员)。特点:懂一大堆东西都说学过,实际测试的时候,机试写程序上反应半天,笔试的时候,题目鲜有答对。 入门级: 可以完整的搭建自己的网站和进行域名_php 质量级别

matlab图像处理实现简单机器视觉_matlab机器视觉算子有多少-程序员宅基地

文章浏览阅读6.6k次,点赞7次,收藏77次。使用matlab对图像进行简单处理并分析不同处理方法的特点对不同曝光程度的图像进行均衡化处理数据代码段%直方图均衡化figure;srcimage=imread('C:\Users\27019\Desktop\机器视觉\图1-2.jpg');info=imfinfo('C:\Users\27019\Desktop\机器视觉\图1-2.jpg');subplot(2,3,1);ims..._matlab机器视觉算子有多少

推荐文章

热门文章

相关标签