Maintaining kafka offset to external media in spark streaming

Take kafka offset maintenance to redis as an example.

redis storage format

The data structure used is string, where key is topic:partition and value is offset.

For example, there are three partitions under the topic of bobo, so the key value structure is as follows:

  • The offset of bobo:0 is x
  • The offset of bobo:1 is y
  • The offset of bobo:2 is z

Specify offset when consuming

There are mainly two methods as follows:

  • createKafkaStream() creates a kakfa stream
  • getOffsets() get offsets from redis
/**
  * kakfa parameter
  */
private val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  // Notice this is none.
  "auto.offset.reset" -> "none",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// `There are 3 sections under bobo`topic
private val topicPartitions = Map[String, Int]("bobo" -> 3)

// Get offsets from redis
def getOffsets: Map[TopicPartition, Long] = {
  val jedis = InternalRedisClient.getResource

  // Set the offset at the beginning of each partition
  val offsets = mutable.Map[TopicPartition, Long]()

  topicPartitions.foreach { it =>
    val topic = it._1
    val partitions = it._2
    // Traverse the partition and set the offset of the corresponding partition under each topic
    for (partition <- 0 until partitions) {
      val topicPartitionKey = topic + ":" + partition
      var lastOffset = 0L
      val lastSavedOffset = jedis.get(topicPartitionKey)

      if (null != lastSavedOffset) {
        try {
          lastOffset = lastSavedOffset.toLong
        } catch {
          case e: Exception =>
            log.error("get lastSavedOffset error", e)
            System.exit(1)
        }
      }
      log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

      // Add to
      offsets += (new TopicPartition(topic, partition) -> lastOffset)
    }
  }

  InternalRedisClient.returnResource(jedis)

  offsets.toMap
}

/**
  * Create a kakfa flow
  *
  * @param ssc StreamingContext
  * @return InputDStream
  */
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
  val offsets = getOffsets

  // Create kafka stream
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
  )
  stream
}

The core is to specify the offset information of the corresponding partition under the topic through the ConsumerStrategies.Assign method.

Update offset to redis

Finally, maintain the offset information to redis.

/**
  * consumption
  *
  * @param stream InputDStream
  */
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // Get offset information
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    // Calculate the relevant indicators, and then count the next number
    val total = rdd.count()

    val jedis = InternalRedisClient.getResource
    val pipeline = jedis.pipelined()
    // It will block redis
    pipeline.multi()

    // Update relevant indicators
    pipeline.incrBy("totalRecords", total)

    // Update offset
    offsetRanges.foreach { offsetRange =>
      log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
      val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
      pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
    }

    // Execute, release
    pipeline.exec()
    pipeline.sync()
    pipeline.close()
    InternalRedisClient.returnResource(jedis)
  }
}

Reference resources

spark code

By the way, post your own spark related code.

Github address: spark-programming

It mainly includes:

  • Basic use of RDD
  • SQL
    • jdbc (read, write)
    • hive (read, write, dynamic partition)
  • Streaming
    • Consume kafka (submit manually and maintain offset manually)
    • Write to HBase
    • Write to Hive

Tags: Java Redis Jedis kafka Spark

Posted on Mon, 02 Dec 2019 13:24:58 -0800 by rhunter007