前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-connect-hive sink插件实现要点小结

kafka-connect-hive sink插件实现要点小结

作者头像
九州暮云
发布2019-08-21 10:51:31
1.2K0
发布2019-08-21 10:51:31
举报
文章被收录于专栏:九州牧云九州牧云

kafka-connect-hive sink插件实现了以ORCParquet两种方式向Hive表中写入数据。Connector定期从Kafka轮询数据并将其写入HDFS,来自每个Kafka主题的数据由提供的分区字段进行分区并划分为块,每个数据块都表示为一个HDFS文件,文件名由topic名称+分区编号+offset构成。如果配置中没有指定分区,则使用默认分区方式,每个数据块的大小由已写入HDFS的文件长度、写入HDFS的时间和未写入HDFS的记录数决定。

在阅读该插件的源码过程中,觉得有很多值得学习的地方,特总结如下以备后忘。

一、分区策略

该插件可以配置两种分区策略:

  • STRICT:要求必须已经创建了所有分区
  • DYNAMIC:根据PARTITIONBY指定的分区字段创建分区

STRICT策略

实现代码及注释如下:

代码语言:javascript
复制
package com.landoop.streamreactor.connect.hive.sink.partitioning

import com.landoop.streamreactor.connect.hive.{DatabaseName, Partition, TableName}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.metastore.IMetaStoreClient

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
  * A [[PartitionHandler]] that requires any partition
  * to already exist in the metastore.
  *
  * 要求分区已经在metastore中存在
  */
object StrictPartitionHandler extends PartitionHandler {

  override def path(partition: Partition,
                    db: DatabaseName,
                    tableName: TableName)
                   (client: IMetaStoreClient,
                    fs: FileSystem): Try[Path] = {
    try {
      // 获取Hive metastore中表的存储位置,成功则返回
      val part = client.getPartition(db.value, tableName.value, partition.entries.map(_._2).toList.asJava)
      Success(new Path(part.getSd.getLocation))
    } catch { // 未找到表的存储位置,返回异常
      case NonFatal(e) =>
        Failure(new RuntimeException(s"Partition '${partition.entries.map(_._2).toList.mkString(",")}' does not exist and strict policy requires upfront creation", e))
    }
  }
}

DYNAMIC策略

实现代码及注释如下:

代码语言:javascript
复制
package com.landoop.streamreactor.connect.hive.sink.partitioning

import com.landoop.streamreactor.connect.hive.{DatabaseName, Partition, TableName}
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.hadoop.hive.metastore.api.{StorageDescriptor, Table}

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

/**
  * A [[PartitionHandler]] that creates partitions
  * on the fly as required.
  *
  * The path of the partition is determined by the given
  * [[PartitionPathPolicy]] parameter. By default this will
  * be an implementation that uses the standard hive
  * paths of key1=value1/key2=value2.
  */
class DynamicPartitionHandler(pathPolicy: PartitionPathPolicy = DefaultMetastorePartitionPathPolicy)
  extends PartitionHandler with StrictLogging {

  override def path(partition: Partition,
                    db: DatabaseName,
                    tableName: TableName)
                   (client: IMetaStoreClient,
                    fs: FileSystem): Try[Path] = {

    def table: Table = client.getTable(db.value, tableName.value)

    def create(path: Path, table: Table): Unit = {
      logger.debug(s"New partition will be created at $path")

      // 设置的表的存储位置信息
      val sd = new StorageDescriptor(table.getSd)
      sd.setLocation(path.toString)

      val params = new java.util.HashMap[String, String]
      // 获取分区key的值、分区创建时间
      val values = partition.entries.map(_._2).toList.asJava
      val ts = (System.currentTimeMillis / 1000).toInt

      // 给表设置并创建新分区
      val p = new org.apache.hadoop.hive.metastore.api.Partition(values, db.value, tableName.value, ts, 0, sd, params)
      logger.debug(s"Updating hive metastore with partition $p")
      client.add_partition(p)

      logger.info(s"Partition has been created in metastore [$partition]")
    }

    // 获取分区信息
    Try(client.getPartition(db.value, tableName.value, partition.entries.toList.map(_._2).asJava)) match {
      case Success(p) => Try { // 成功则返回
        new Path(p.getSd.getLocation)
      }
      case Failure(_) => Try { // 失败则根据分区路径创建策略生成分区路径并返回
        val t = table
        val tableLocation = new Path(t.getSd.getLocation)
        val path = pathPolicy.path(tableLocation, partition)
        create(path, t)
        path
      }
    }
  }
}

该方式会以标准的Hive分区路径来创建分区,也就是分区字段=分区字段值的方式。

二、文件命名和大小控制

Kafka轮询数据并将其写入HDFS,来自每个Kafka主题的数据由提供的分区字段进行分区并划分为块,每个数据块都表示为一个HDFS文件,这里涉及到两个细节:

  • 如何给文件命名
  • 文件如何分块,文件大小及数量如何控制

接下来逐一看一下相关代码实现,文件命名部分实现代码如下:

代码语言:javascript
复制
package com.landoop.streamreactor.connect.hive.sink.staging

import com.landoop.streamreactor.connect.hive.{Offset, Topic}

import scala.util.Try

trait FilenamePolicy {
  val prefix: String
}

object DefaultFilenamePolicy extends FilenamePolicy {
  val prefix = "streamreactor"
}

object CommittedFileName {

  private val Regex = s"(.+)_(.+)_(\\d+)_(\\d+)_(\\d+)".r

  def unapply(filename: String): Option[(String, Topic, Int, Offset, Offset)] = {
    filename match {
      case Regex(prefix, topic, partition, start, end) =>
        // 返回主题名称、分区编号、起始offset和结束offset
        Try((prefix, Topic(topic), partition.toInt, Offset(start.toLong), Offset(end.toLong))).toOption
      case _ => None
    }
  }
}

从上面代码可以看出,文件名由topic名称+分区编号+offset构成。假设文件前缀是streamreactortopic名称是hive_sink_orc,分布编号是0,当前最大的offset是1168,那么最终生成的文件名称就是streamreactor_hive_sink_orc_0_1168

接下来看看文件的大小是如何控制的。在HDFS中一个块通常是64M、128M、256M,小文件会占用NameNode的大量元数据存储内存,增加文件数据块的寻址时间。文件的大小主要由sink插件的三个配置项决定,这些配置项信息如下:

  • WITH_FLUSH_INTERVALlong类型,表示文件提交的时间间隔,单位是毫秒
  • WITH_FLUSH_SIZElong类型,表示执行提交操作之前,已提交到HDFS的文件长度,单位是字节
  • WITH_FLUSH_COUNTlong类型,表示执行提交操作之前,未提交到HDFS的记录数,一条数据算一个记录

这些参数在CommitPolicy特质中被使用,该特质的信息及实现类如下:

代码语言:javascript
复制
package com.landoop.streamreactor.connect.hive.sink.staging

import com.landoop.streamreactor.connect.hive.TopicPartitionOffset
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.kafka.connect.data.Struct

import scala.concurrent.duration.FiniteDuration

/**
  * The [[CommitPolicy]] is responsible for determining when
  * a file should be flushed (closed on disk, and moved to be visible).
  *
  * Typical implementations will flush based on number of records,
  * file size, or time since the file was opened.
  *
  * 负责决定文件何时被刷新(在磁盘上关闭,以及移动到可见),一般情况下基于记录数量、文件大小和文件被打开的时间来刷新
  */
trait CommitPolicy {

  /**
    * This method is invoked after a file has been written.
    *
    * If the output file should be committed at this time, then this
    * method should return true, otherwise false.
    *
    * Once a commit has taken place, a new file will be opened
    * for the next record.
    *
    * 该方法在文件被写入之后调用,在这时如果文件应该被提交,该方法返回true,否则返回false。一旦发生了提交,新文件将为下一个记录打开
    *
    * @param tpo   the [[TopicPartitionOffset]] of the last record written 最后一次记录的TopicPartitionOffset
    * @param path  the path of the file that the struct was written to 文件写入的路径
    * @param count the number of records written thus far to the file 到目前为止写入文件的记录数
    *
    */
  def shouldFlush(struct: Struct, tpo: TopicPartitionOffset, path: Path, count: Long)
                 (implicit fs: FileSystem): Boolean
}

/**
  * Default implementation of [[CommitPolicy]] that will flush the
  * output file under the following circumstances:
  * - file size reaches limit
  * - time since file was created
  * - number of files is reached
  *
  * CommitPolicy 的默认实现,将根据以下场景刷新输出文件:
  * 文件大小达到限制
  * 文件创建以来的时间
  * 达到文件数量
  *
  * @param interval in millis 毫秒间隔
  */
case class DefaultCommitPolicy(fileSize: Option[Long],
                               interval: Option[FiniteDuration],
                               fileCount: Option[Long]) extends CommitPolicy with StrictLogging {
  require(fileSize.isDefined || interval.isDefined || fileCount.isDefined)

  override def shouldFlush(struct: Struct, tpo: TopicPartitionOffset, path: Path, count: Long)
                          (implicit fs: FileSystem): Boolean = {
    // 返回文件状态
    val stat = fs.getFileStatus(path)
    val open_time = System.currentTimeMillis() - stat.getModificationTime // 计算文件打开时间

    /**
      * stat.getLen:文件长度,以字节为单位
      * stat.getModificationTime:文件修改时间,以毫秒为单位
      */
    fileSize.exists(_ <= stat.getLen) || interval.exists(_.toMillis <= open_time) || fileCount.exists(_ <= count)
  }
}

现在来分析一下DefaultCommitPolicy类的实现逻辑:

首先,返回HDFS上文件的状态,接着计算文件被打开的时间,最后使用exists函数来执行以下逻辑判断:

  • fileSize.exists(_ <= stat.getLen):已提交到HDFS的文件长度stat.getLen是否大于设置的文件长度阈值fileSize
  • interval.exists(_.toMillis <= open_time):文件打开时间open_time是否大于设置的文件打开时间阈值interval
  • fileCount.exists(_ <= count):未提交到HDFS的记录数count是否大于设置未提交到HDFS的记录数阈值fileCount

以上三个判断条件只要任何一个成立,就返回true,接着执行flush操作,将文件刷新到HDFS的对应目录中,这样就很好地控制了文件的大小以及数量,避免过多小文件的产生。

三、异常处理策略

异常处理不当的话,会直接影响服务的高可用,产生不可预估的损失。kafka-connect在处理数据读写的过程中产生的异常默认是直接抛出的,这类异常容易使负责读写的task停止服务,示例异常信息如下:

代码语言:javascript
复制
[2019-02-25 11:03:56,170] ERROR WorkerSinkTask{id=hive-sink-example-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Operation timed out (Connection timed out)
	at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:477)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:285)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:210)
	at com.landoop.streamreactor.connect.hive.sink.HiveSinkTask.start(HiveSinkTask.scala:56)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Operation timed out (Connection timed out)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
	... 13 more
)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:525)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:285)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:210)
	at com.landoop.streamreactor.connect.hive.sink.HiveSinkTask.start(HiveSinkTask.scala:56)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2019-02-25 11:03:56,172] ERROR WorkerSinkTask{id=hive-sink-example-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

在以上异常信息可以看到,由于连接Hive metastore超时,因此相关的Task被杀死,需要我们手动重启。当然这只是kafka-connect在运行中发生的一个异常,对于这类容易使Task停止工作的异常,需要设置相关的异常处理策略,sink插件在实现中定义了三种异常处理策略,分别如下:

  • NOOP:表示在异常发生后,不处理异常,继续工作
  • THROW:表示在异常发生后,直接抛出异常,这样会使服务停止
  • RETRY:表示在异常发生后,进行重试,相应地,需要定义重试次数,来避免无限重试情况的发生

基于以上三种异常处理策略,sink插件相关的实现类如下:

代码语言:javascript
复制
/*
 *  Copyright 2017 Datamountaineer.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package com.datamountaineer.streamreactor.connect.errors

import java.util.Date

import com.datamountaineer.streamreactor.connect.errors.ErrorPolicyEnum.ErrorPolicyEnum
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.errors.RetriableException

/**
  * Created by andrew@datamountaineer.com on 19/05/16. 
  * kafka-connect-common
  */
object ErrorPolicyEnum extends Enumeration {
  type ErrorPolicyEnum = Value
  val NOOP, THROW, RETRY = Value
}

case class ErrorTracker(retries: Int, maxRetries: Int, lastErrorMessage: String, lastErrorTimestamp: Date, policy: ErrorPolicy)

trait ErrorPolicy extends StrictLogging {
  def handle(error: Throwable, sink: Boolean = true, retryCount: Int = 0)
}

object ErrorPolicy extends StrictLogging {
  def apply(policy: ErrorPolicyEnum): ErrorPolicy = {
    policy match {
      case ErrorPolicyEnum.NOOP => NoopErrorPolicy()
      case ErrorPolicyEnum.THROW => ThrowErrorPolicy()
      case ErrorPolicyEnum.RETRY => RetryErrorPolicy()
    }
  }
}

/**
 * 不处理异常策略
 */
case class NoopErrorPolicy() extends ErrorPolicy {
  override def handle(error: Throwable, sink: Boolean = true, retryCount: Int = 0){
    logger.warn(s"Error policy NOOP: ${error.getMessage}. Processing continuing.")
  }
}

/**
 * 异常抛出处理策略
 */
case class ThrowErrorPolicy() extends ErrorPolicy {
  override def handle(error: Throwable, sink: Boolean = true, retryCount: Int = 0){
    throw new RuntimeException(error)
  }
}

/**
 * 异常重试处理策略
 */
case class RetryErrorPolicy() extends ErrorPolicy {

  override def handle(error: Throwable, sink: Boolean = true, retryCount: Int) = {
    if (retryCount == 0) {
      throw new RuntimeException(error)
    }
    else {
      logger.warn(s"Error policy set to RETRY. Remaining attempts $retryCount")
      throw new RetriableException(error)
    }
  }
}

四、总结

基于kafka-connect实现相关数据同步插件时,应该尽可能地利用Kafkatopic信息,并对异常进行适当地处理,这样才可以保证插件的可扩展、高可用。

本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、分区策略
    • STRICT策略
      • DYNAMIC策略
      • 二、文件命名和大小控制
      • 三、异常处理策略
      • 四、总结
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
      http://www.vxiaotou.com