This paper, published in 2016, mainly introduces the design and thinking of Facebook’s internal stream computing platform, and conducts in-depth comparative analysis on the implementation and selection of key features of stream computing.
are 5 important considerations mentioned in the 5 metrics for
streaming computing systems
< OL class="list-paddingleft-2">
ease of use. What language does it take for users to develop, such as SQL, C++, Java, to develop, test, and publish an application?
Performance. What level of latency does it need to be? For example, milliseconds, seconds, minutes? How much throughput do you need? At this point, Facebook’s stream processing system is basically designed to cope with second-level delays, which is a big premise
of fault-tolerant processing. What kind of faults can be automatically tolerated? And what are the semantics of the data when processing fault-tolerant, and how does the system store and restore the state of memory?
Scalability. Can data be sliced to parallelize processing? Whether it can be processed by brushing back old data, and the ease of adjustment of the system’s changes to data partitions
is correct. Is it necessary to provide ACID assurance
flow computing system 5 design dimensions
Declarative. For example, SQL language, SQL is known for its simplicity and declarative, and it is fast to develop, but one of the major shortcomings of SQL is that it is weak in expression and cannot describe very complex logic
Functional. This means that the framework has a series of built-in functional operators, and all the user needs to do is to arrange and combine these operators to describe the business logic
Procedural. For example, based on C++, Java, Python. These programming languages offer the most flexible implementation as well as better performance. App developers have full control over the data structure and runtime logic.
The choice of language
paradigm affects the ease of use and performance
systemConsidering that no one language can be adapted to the needs of various scenarios, a number of different computing engines are provided in Facebook. For example,
the Puma engine is written in SQL,
Swift is written in Python, and
Stylus is based on C++. From the description of the paper, it is impossible to distinguish whether the runtime of these different computing engines is universal, I understand that theoretically the language paradigm is only a user-oriented API, and the real runtime should be unified.
A typical streaming computing task usually consists of several compute nodes, and data transfer will be required between different computing nodes, and data transfer will affect the fault tolerance, performance, scalability and ease of use of the system (whether it is easy to debug).
The usual implementation methods are as follows
< ul class="list-paddingleft-2">Direct
message transfer. Data is usually sent directly from one process to another through RPC or memory-based message queues. One of the big advantages of this approach is that it is fast, and it can achieve end-to-end latency in the level of 10-100ms.
Broker based message transfer。 Messaging based on intermediate components. That is, a set of broker processes is created specifically for message sending. Adding a middleman comes with an additional burden. But there are some benefits in scalability. Brokers can be multiplexed to send one upstream to multiple downstreams. When there is a problem in the downstream, the upstream operator can be backpressured
Persistent storage based message transfer. The two processors are connected via a persistent message queue. The data of the upstream operator is written directly to the message queue, while the downstream operator reads data from the message queue. In addition to multiplexing, this mode can be processed at inconsistent speeds upstream and downstream, and upstream data can be played back at different times (e.g. during failure recovery). In this way, different processing nodes are completely decoupled from other nodes, and you will find that different workers do not need to interact directly, so the failure of a single node will not affect other workers. This sounds nice, but it also introduces delayed amplification because data is written to the message queue and consumed downstream from the message queue. Unaligned checkpoint in Flink records intermediate data locally to achieve data playback somewhat similar to the upstream and downstream decoupling mechanism in this scheme.
Facebook uses a third method, a
persistent message queue based on
Scribe, and according to Facebook’s data, using this method will bring a delay of about 1s per stream. I understand that if there are more nodes in series, the overall delay will be greatly amplified. A typical pipeline is shown in the figure below, different nodes send data through Scribe
> fault tolerance. In large-scale streaming jobs, different operators can maintain independence with the help of persistent message queues, which is a very useful property
fault tolerance. Recovery from errors and failures can be very fast, because only those failed nodes need to be replaced
. The ability to multiplex allows us to run some backup links, so that there can be some redundant performance on the processing link
. If some nodes are slow to process, it will not affect their upstream nodes. In some embedded data transfer implementations, such nodes cause backpressure upstream. The peak throughput of a task depends on the slowest node in the entire pipeline
, Ease of use. In this way, debugging will be more convenient, when it is observed that a node produces incorrect data, you can reproduce the corresponding problem with debugging by creating a new computing node to play back this upstream data.
Ease of use。 Monitoring and alerting has also become simpler, only need to monitor the delay of upstream data consumed by each node to reflect the operation of the task
Ease of use. With greater flexibility in task orchestration, various internal stream computing engines and storage can be opened up through Message Queuing
of adjusting the size of the partition of the message queue can be easily configured< ul class="list-paddingleft-2"> the
delay will increase, if there are more nodes in series, the delay will be amplified more
If the message queue is only guaranteed at least once, then there will be more duplicate data in the process of failover (not mentioned in this paper, but my task will cause this problem)
will bring additional network and storage overhead
. The process of avoiding multiple serialization and deserialization
As mentioned in the
an operator will have three main types of activities
processing input data, which may include deserializing input data, querying external systems, and updating state in memory. Such a process is considered to produce output data from a processing without side effects
. Based on the input data and the state in memory, the data will be delivered to provide downstream use
save the checkpoint to DB, and it
is also mentioned that an operator will mainly contain two processing semantics
Output semantics Take an output data as an example, whether at-least-once, at-most-once, exactly-once occurs in the downstream
> State semantics uses counting as an example to indicate whether each piece of data will only be counted at-least-once
, at-most-once, exactly-once
stateless operators have only output semantics. Stateful operators contain both semantics. State semantics depend on the order in which offset and memory states are saved
- At-least-once state semantics: save
memory state first, then save offset At-most-once state semantics: save
the offset first, Save
again Exactly-once state semantics: Atomized implementations save memory state and offset For example
, the output semantics of Output
are done in a transaction
depending on the order in which the data is sent and the offset and memory state are saved
- At-least-once output semantics: first deliver data, then save offset and state
at-most-once output semantics: save offset and state first, and then send data
Exactly-once output semantics: Saves offset and state and delivers data in one transaction.
Most of the stream computing and storage components in Facebook only provide at-least-once semantics, do not provide transaction function, I don’t know if there is a deduplication function for the computing nodes connected to each Scribe node, if not, once a node fails, the data may be duplicated.
Local database persistence。 For example, Samza saves the state to the local database, writes the amount of change to kafka, and recovers the data from kafka after the failure, because kafka has no transactional feature, so it can only support at lease once semantics
Remote database persistence。 Checkpoint and state are stored directly in a remote db such as MillWheel. As long as the remote db can have transactional features, it can implement exactly once semantic
upstream backup. Data is cached upstream, and when a failure occurs, the upstream data is played back
a Global consistent snapshot. Flink uses this distributed snapshot algorithm to maintain a globally consistent snapshot. After the exception, all nodes revert to a consistent state point
>Replication: Stateful nodes are replicated in multiple copies to ensure state reliability
Stylus The engines for local database and remote database are implemented. In the local database version, it is also saved directly to RocksDB, and as described in the article, in-memory data is periodically saved to rocksdb, and local data is asynchronously copied to HDFS at larger intervals. When the process is hooked, it first attempts to restart on the original machine, then the previous state can be used to continue the recovery state, and if the machine is hung, the data will be restored from the remote HDFS.
One of the
benefits of Remote database, which also uses a remote database in Facebook, is that it can have faster failover times, because there is no need to load full state data into the machine when restarting. It is also mentioned that most state operations are Read-and-modify operations. Normal operations require data to be read, modified and then written back. But if the remote db can support merge operations, then the performance can be greatly improved (is this a bit similar to computing pushed down to the storage layer?). ）。 Because the effect of this is that the Read-and-modify operation can be changed into an append-only operation. But can all actions make such a conversion?
are many scenarios where processing old data will be required, for example<
ul class=”list-paddingleft-2″> when the user develops a new task, a
better mode is to complete the regression test based on the old data
When adding a new indicator, Can produce the corresponding historical curve based on old
When the bug of past data output is found, and the corresponding data needs to be re-run to backfill after fixing the bugWhen
there is a need for this reprocess, there are usually the following options
< ul class="list-paddingleft-2"
>Stream only. The upstream message queue needs to keep historical data for a long enough time
to maintain two separate systems, one running batch and one running stream. In order to return to running historical data, pull up batch tasks to return. The main challenge with this model is that the consistency problem of the two systems is difficult to solve
. Spark and Flink are both following this route, and it’s the way Facebook uses
Scribe Save short-term message queue data, and long-term data is stored in a data warehouse built based on Hive. The paper points out that Flink and Spark use similar data transfer and fault tolerance approaches to batch and stream (this is less true as Flink’s streams and batch runtimes are already very different, especially shuffle and fault tolerance). Processing the Reprocess scene in Facebook is based on the MapReduce model, reading data from Hive and then running streaming apps in a batch environment. Specifically, when Puma’s
in the Hive environment, it runs as a Hive udf, and the code of
Puma application does not need to change
Stylus 3 types of operators are provided: 1) stateless operators 2) stateful operators 3) monoid operators When a user develops a Stylus application, two binaries are generated, one streaming and one batch. The batch implementation of a stateless operator is a custom mapper. The batch implementation of a stateful operator is a custom reducer. The batch implementation of the monoid operator is a partial aggregate of the map.
In this way, it is equivalent to the user writing a set of code, which can achieve both batch and stream.
public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.
like + Looking, less bugs 👇