登录 |  注册
首页 >  云计算&大数据 >  Spark >  SparkStreamingCheckpointing

SparkStreamingCheckpointing

Spark Streaming Checkpointing

一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中,以使系统从故障中恢复。

  • Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括

  • Configuration :创建Spark Streaming应用程序的配置信息
  • DStream operations :定义Streaming应用程序的操作集合
  • Incomplete batches:操作存在队列中的未完成的批

  • Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。

元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。

何时checkpoint

应用程序在下面两种情况下必须开启checkpoint

  • 使用有状态的transformation。如果在应用程序中用到了updateStateByKey或者reduceByKeyAndWindow,checkpoint目录必需提供用以定期checkpoint RDD。
  • 从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息。

注意,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。这一般是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。

怎样配置Checkpointing

在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着可以通过streamingContext.checkpoint(checkpointDirectory)方法来做。这运行你用之前介绍的有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序。

  • 当应用程序是第一次启动,新建一个StreamingContext,启动所有Stream,然后调用start()方法
  • 当应用程序因为故障重新启动,它将会从checkpoint目录checkpoint数据重新创建StreamingContext
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用functionToCreateContext函数创建一个新的上下文,建立DStreams。请看RecoverableNetworkWordCount例子。

除了使用getOrCreate,开发者必须保证在故障发生时,driver处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。

注意,RDD的checkpointing有存储成本。这会导致批数据(包含的RDD被checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少操作的吞吐量。相反,checkpointing太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的transformation需要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint来设置。典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。

上一篇: DStreams缓存或持久化
下一篇: SparkStreaming部署应用程序
推荐文章
  • MD5(Message-DigestAlgorithm5)是一种广泛使用的散列函数(哈希函数),由美国密码学家罗纳德·李维斯特(RonaldL.Rivest)在1991年设计。MD5的作用是对任意长度的信息生成一个固定长度(128位,即32个十六进制字符)的“指纹”或“消息摘要”,并且几乎不可能找到
  • 循环冗余校验(CyclicRedundancyCheck,CRC)是一种用于检测数据传输和存储过程中发生错误的技术,属于一种基于数学原理的错误检测编码(ErrorDetectionCoding)方法。它通过在原始数据上附加一个固定长度的校验码,使得接收端可以通过同样的计算规则对收到的数据进行校验,确
  • AES(AdvancedEncryptionStandard)是一种广泛使用的对称密钥加密算法,它是美国国家标准与技术研究院(NIST)于2001年制定的加密标准,用于替代原有的DES(DataEncryptionStandard)。AES算法以其高效性、安全性和可靠性而著称,在众多应用领域中被广泛
  • RSA(Rivest-Shamir-Adleman)是一种广泛应用的非对称加密算法,由RonRivest、AdiShamir和LenAdleman在1977年提出。其安全性基于数学上的大数因子分解难题,即对于足够大的两个素数p和q而言,已知它们的乘积很容易,但想要从这个乘积中恢复原始的素数则异常困难
  • 最小生成树(MinimumSpanningTree,MST)是一种图论算法,用于在一个带权重的无向连通图中找到一棵包括所有顶点且总权重尽可能小的树。常见的最小生成树算法有两种:Prim算法和Kruskal算法。Prim算法原理:Prim算法是一种贪心算法,它从图中的一个顶点开始,逐步增加边,每次都添
  • 关于最短路径算法的Java实现,这里简述一下几种常用的算法及其基本原理,并给出一个Dijkstra算法的基本实现框架。Dijkstra算法(适用于无负权边的图)Dijkstra算法用于寻找图中一个顶点到其他所有顶点的最短路径。它维护了一个距离表,用来存储从源点到各个顶点的已知最短距离,并且每次都会选
学习大纲