前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 存储行动算子源码解析

Spark 存储行动算子源码解析

作者头像
Tim在路上
发布2022-03-23 14:16:29
3151
发布2022-03-23 14:16:29
举报
  • saveAsHadoopFile

输出RDD到任何支持Hadoop的文件系统

代码语言:javascript
复制
def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  // 1. 配置hadoopConf 将key,value,output类型进行设置
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  // 配置压缩
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
      CompressionType.BLOCK.toString)
  }
  // 配置output的committer
  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // When speculation is on and output committer class name contains "Direct", we should warn
  // users that they may loss data if they are using a direct output committer.
  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      s"$outputCommitterClass may be an output committer that writes data directly to " +
        "the final location. Because speculation is enabled, this output committer may " +
        "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        "committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
  // 调用saveAsHadoopDataset
  saveAsHadoopDataset(hadoopConf)
}

从源码可以看出saveAsHadoopFile的输入参数有path, key类型,value类型, 输出格式类型,hadoop配置,压缩类型。将输入的参数配置到JobConf中后,调用saveAsHadoopDataset。

代码语言:javascript
复制
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
  val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
  SparkHadoopWriter.write(
    rdd = self,
    config = config)
}

源码调用了SparkHadoopWriter.write方法。

代码语言:javascript
复制
def write[K, V: ClassTag](
    rdd: RDD[(K, V)],
    config: HadoopWriteConfigUtil[K, V]): Unit = {
  // Extract context and configuration from RDD.
  val sparkContext = rdd.context
  val commitJobId = rdd.id

// Set up a job.  准备和创建一个commiter Job
  val jobTrackerId =createJobTrackerID(new Date())
  val jobContext = config.createJobContext(jobTrackerId, commitJobId)
  config.initOutputFormat(jobContext)

  // Assert the output format/key/value class is set in JobConf.
  config.assertConf(jobContext, rdd.conf)

  val committer = config.createCommitter(commitJobId)
  committer.setupJob(jobContext)

  // Try to write all RDD partitions as a Hadoop OutputFormat.
  try {
    val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
      // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
      // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
      val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
提交到Task端执行
executeTask(
        context = context,
        config = config,
        jobTrackerId = jobTrackerId,
        commitJobId = commitJobId,
        sparkPartitionId = context.partitionId,
        sparkAttemptNumber = attemptId,
        committer = committer,
        iterator = iter)
    })
    // 提交job
    committer.commitJob(jobContext, ret)
    logInfo(s"Job${jobContext.getJobID} committed.")
  } catch {
    case cause: Throwable =>
      logError(s"Aborting job${jobContext.getJobID}.", cause)
      committer.abortJob(jobContext)
      throw new SparkException("Job aborted.", cause)
  }
}

其主要工作为,在Driver端为作业准备数据源和Hadoop的配置,提交一个Job, 并向RDD的每一个分区传入executeTask作为执行,其任务将每一个分区中的所有行进行写出。如果所有的分区task都成功写出,提交commitTask,则提交committer, 否则存在失败则终止。

saveAsHadoopFile 还存在一些简化版本,参数的传递时通过程序自己获取。

代码语言:javascript
复制
def saveAsHadoopFile[F <: OutputFormat[K, V]](
    path: String,
    codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
  val runtimeClass = fm.runtimeClass
  saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}

从中可以看出我们只需传入存储路径。

代码语言:javascript
复制
private[spark] def keyClass: Class[_] = kt.runtimeClass

private[spark] def valueClass: Class[_] = vt.runtimeClass

keyClass和valueClass都是运行时转换获取。

  • saveAsTextFile

将RDD存储的支持hadoop系统上的文本文件,以string形式存储,它也是saveAsHadoopFile的简化版。

代码语言:javascript
复制
def saveAsTextFile(path: String): Unit = withScope {
  // same bytecodes for `saveAsTextFile`.
  val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
  val textClassTag =implicitly[ClassTag[Text]]
  val r = this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
  • 其他大数据系统

从上文可以看出,在调用saveAsHadoopDataset时,传入的参数为Jobconf类型,实质是在其中配置相关类型,通过配置JobConf也可以实现其他系统的存储。

代码语言:javascript
复制
object HBaseWriteTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)

    val tableName = "XXX"
    val quorum = "localhost"
    val port = "2181"

    // 配置相关信息
    val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName)
    conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

    val jobConf = new JobConf()
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

    // 写入数据到HBase
    val indataRDD = sc.makeRDD(Array("20180723_02,10","20180723_03,10","20180818_03,50"))

    val rdd = indataRDD.map(_.split(",")).map{arr => {
      val put = new Put(Bytes.toBytes(arr(0)))
      put.add(Bytes.toBytes("info"),Bytes.toBytes("clict_count"),Bytes.toBytes(arr(1)))
      (new ImmutableBytesWritable,put)
    }}.saveAsHadoopDataset(jobConf)

    sc.stop()
  }
}
本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.02.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com