您的位置:首页 > 其它

Spark Streaming官方文档复习笔记-6

2017-03-02 19:51 393 查看


Fault-tolerance Semantics(容错语义)

In this section, we will discuss the behavior of Spark Streaming applications in the event of failures.

这部分我们讨论Spark Streaming应用事件失败的行为


Background

To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark’s RDDs.

为了理解Spark Streaming提供的语义,让我们记住基础的RDD容错语义

     An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD remembers the lineage of deterministic operations that     were used on a fault-tolerant input dataset to create it.
  一个RDD是不可变,准确的,可以重复计算的,分布式数据集。每一个RDD记住其准确的操作血统,血统是为了RDD容错。

     If any partition of an RDD is lost due to a worker node failure, then that partition can be re-computed
from the original fault-tolerant dataset        using the lineage of operations.
   如果一个节点失败导致任何分区丢失的话,那么分区将会被从原始RDD安装血统进行重新计算

    Assuming that all of the RDD transformations are deterministic, the data in the final transformed
RDD will always be the same irrespective of failures in the Spark cluster.
如果数据的转换算子是确定的话,忽略集群的失败,无论计算几次,结果都是一致的
   

Spark operates on data in fault-tolerant file systems like HDFS or S3. Hence, all of the RDDs generated from the fault-tolerant data are also fault-tolerant. However, this is not the case for Spark Streaming as the data in most cases is received over the network
(except when 
fileStream
 is used). To achieve the same fault-tolerance properties for all of the generated RDDs, the received data
is replicated among multiple Spark executors in worker nodes in the cluster (default replication factor is 2). This leads to two kinds of data in the system that need to recovered in the event of failures:

Spark算子对数据的处理一般都是处理据欧容错的文件系统,因为产生的RDD也是容错的,然而,Spark Streaming接受数据一般都是来至于网络,为了产生的RDD具有同样的容错特性,接收到的数据会在集群节点之间进行备份(默认是两份)。这将导致两种数据在系统中需要在失败中恢复:
Data received and replicated - This data survives failure of a single worker node as a copy of it exists on one of the other nodes.                        数据接受和副本:如果一个节点数据失败了,其他节点还有该数据的备份
Data received but buffered for replication - Since this is not replicated, the only way to recover this data is to get it again from the source.            接受数据但是没有做备份:如果数据没有被做副本的话,这种方式只能重新再从数据源获取。

Furthermore, there are two kinds of failures that we should be concerned about: 

 然而,有两种失败需要我们关心:
Failure of a Worker Node - Any of the worker nodes running executors can fail, and all in-memory data on those nodes will be lost. If any receivers were running on failed nodes, then their buffered data will be lost.  
                                                                                                        Worker节点失败:任何运行Executor的Worker节点失败了,所有存在内存的数据都会丢失,如果运行在Worker上的Recevier失败了,也会导致改数据丢失
Failure of the Driver Node - If the driver node running the Spark Streaming application fails, then obviously the SparkContext is lost, and all executors with their in-memory data are lost.                              
                                                                                                                              Driver失败:很明显,Spark Streaming作业也就失败了,所以所有executor内存中的数据都会丢失。

With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.

基于这些基础概念,让我们理解这些Spark Streaming容错语义


Definitions

The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

 Streaming系统的语义一般都是每一记录被处理的次数,根据可能的处理情况,有三种类型的保证:
At most once: Each record will be either processed once or not processed at all.                                                                                                   最多一次:每一条要么被处理一次,要么没有被处理
At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.                                          
                                                                                                                                                最少一次: 最少被处理一次,可能重复多次
Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.                                      
                                                                                                                                 正好一次:每一条记录查号都被处理一次,没有记录丢失或者重复计算。很明显这个是三条中最难保证的。


Basic Semantics

In any stream processing system, broadly speaking, there are three steps in processing the data.

   广泛被讨论的流系统一般都有下面三步来处理数据。

Receiving the data: The data is received from sources using Receivers or otherwise.                                                                                             接收数据:Receivers
从数据源接收数据或者其他方式

Transforming the data: The received data is transformed using DStream and RDD transformations.                                                                      转换数据:接收到的数据使用DStream或者RDD转换算子进行处理

Pushing out the data: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.                       输出计算数据:最后的转换算子便是将数据进行外部文件系统输出。

If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once.
Let’s understand the semantics of these steps in the context of Spark Streaming.

如果一个Spark Streaming作业要求exactly-once 保证的话,上面三步中每一步都需要我们做exactly-once 保证,接受exactly-once 保证,转换exactly-once 保证,和输出exactly-once 保证。接下来我们了解这三步骤的exactly-once 保证:

Receiving the data: Different input sources provide different guarantees. This is discussed in detail in the next subsection.                                  接受数据,不同的输输入元提供不同的保证,这将下面小结进行讨论。

Transforming the data: All data that has been received will be processed exactly once, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data
is accessible, the final transformed RDDs will always have the same contents.                       转换数据:所有接收到的数据都会被进行exactly once保证处理(有Spark框架保证的,RDD保证)。即使存在失败,只要接收到的数据有副本可以访问,最后输出的RDD都会是正确的。

Pushing out the data: Output operations by default ensure at-least once semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system
(supports transactions or not). But users can implement their own transaction mechanisms to achieve exactly-once semantics. This is discussed in more details later in the section.                                                  输出数据:输出操作默认是least
once语义,因他他取决于输出媒介的类型(幂写等)和下游系统的语义保证。但是用户需要实现的是exactly-once 语义,将会在后面部分进行详细讨论。


Semantics of Received Data

Different input sources provide different guarantees, ranging from at-least once to exactly once. Read for more details.

不同的输入源提供不同的保证,一般都是最少一次或者正好一次保证。


With Files

If all of the input data is already present in a fault-tolerant file system like HDFS, Spark Streaming can always recover from any failure and process all of the data. This gives exactly-once semantics, meaning all of the data will be processed exactly
once no matter what fails.

对于文件来说,如果所有文件都是存储在HDFS容错系统中,Spark Streaming作业会在失败中恢复和处理所有的数据,可以保证exactly-once语义。


With Receiver-based Sources(基于Receiver的数据源)

For input sources based on receivers, the fault-tolerance semantics depend on both the failure scenario and the type of receiver. As we discussed earlier,
there are two types of receivers:

对于基于receiver的数据源,容错的语义取决于失败情景和接受者的类型,正如我们早期讨论的两种接受者:
Reliable Receiver - These receivers acknowledge reliable sources only after ensuring that the received data has been replicated. If such a receiver fails, the source will not receive acknowledgment for the buffered (unreplicated)
data. Therefore, if the receiver is restarted, the source will resend the data, and no data will be lost due to the failure.                                                                                                                     可靠接受者:当有数据重复被发送时候,这些接受者确认该数据已经收到了。当一个接受者失败了,发送者没有收到确认,之后接受者重启之后还会再一次发送没有确认的数据(如果网络不好,是不是丢失了确认的信息又无法保证了呢)
Unreliable Receiver - Such receivers do not send acknowledgment and therefore can lose data when they fail due to worker or driver failures.    不可靠接受者:这种接受者不发确认信号,因此当他们失败的时候会丢失数据

Depending on what type of receivers are used we achieve the following semantics. If a worker node fails, then there is no data loss with reliable receivers. With unreliable receivers, data received but not replicated can get lost. If the driver node fails,
then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.

实现如下语义主要取决于我们使用的什么类型的接受者。如果一个worker节点失败了,对于可靠的接受者来说将不会有数据丢失,对于不可靠的接受者,接收到的数据还没有做副本就丢失了。如果驱动失败了,所有以前接受的数据和内存中的副本都会丢失,这将影响有状态转换算子的结果

To avoid this loss of past received data, Spark 1.2 introduced write ahead logs which save the received data to fault-tolerant storage. With the write
ahead logs enabled and reliable receivers, there is zero data loss. In terms of semantics, it provides an at-least once guarantee.                为了避免丢失之前接收到的数据,Spark1.2版本引入了WAL机制,它会保存接受到数据到支持容错的存储系统,开启WAL机制和基于可靠的Recevier,将会实现零丢失。这种场景会实现最少一次保证

The following table summarizes the semantics under failures:
Deployment ScenarioWorker FailureDriver Failure
Spark 1.1 or earlier, OR
Spark 1.2 or later without write ahead logs
Buffered data lost with unreliable receivers

Zero data loss with reliable receivers

At-least once semantics
Buffered data lost with unreliable receivers

Past data lost with all receivers

Undefined semantics
Spark 1.2 or later with write ahead logsZero data loss with reliable receivers

At-least once semantics
Zero data loss with reliable receivers and files

At-least once semantics
   


With Kafka Direct API(使用Kafka Direct API)

In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees.
This approach is further discussed in the Kafka Integration Guide.

在Saprk 1.3版本中,我们引入了新的Kafka Direct API,他能确保所有的数据被Spark Streaming 进行exactly once保证


Semantics of output operations(输出操作的语义)

Output operations (like 
foreachRDD
) have at-least once semantics, that is, the transformed data may get written to an external
entity more than once in the event of a worker failure. While this is acceptable for saving to file systems using the 
saveAs***Files
 operations
(as the file will simply get overwritten with the same data), additional effort may be necessary to achieve exactly-once semantics. There are two approaches.

输出操作,例如foreachRDD有至少一次语义,转换之后的数据当worker节点失败时候可能被写到外部实体系统最少一次。然而使用saveAs***Files算子是可以接受的,因为相同数据支持覆写。海域两种实现一次语义的方法如下:

Idempotent updates: Multiple attempts always write the same data. For example, 
saveAs***Files
 always
writes the same data to the generated files                                                               幂等写:相同的数据进行覆写(幂等运算:||-1||=|-1|,就是对一个数进行不同次运算之后结果相等)

Transactional updates: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.                                                  
                                                                                                                                                            事务更新:所有的更新都在事务中完成,这样更新操作就是原子的了,实现一次语义。一种实现方式如下:
Use the batch time (available in 
foreachRDD
) and the partition index of the RDD to create an identifier. This identifier uniquely
identifies a blob data in the streaming application.   使用RDD的批次时间(在foreachRDD中可获取)和分区编号创建一个唯一标识符(类似于数据库主键,唯一的)。这个标识符唯一标识一个Streaming中的数据

Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically.
Else, if this was already committed, skip the update.   使用唯一标识符并结合事务更新外部系统,如果在外部系统中标识符不存在则原子进行提交分区数据和标识符
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: