Author: Paul Lin

This article link: 2021/06/23/Talking about the past, present and future of big data/

I believe that readers in the field of big data can feel that the application scenarios of big data technology are undergoing far-reaching changes: With the rise of real-time computing, Kubernetes, and the general trend of HTAP and stream-batch integration, the previously relatively independent big data technology is gradually integrating with traditional online business. On this topic, the author has long been like a fish in the throat, but due to procrastination, I have been slow to write, and finally took the opportunity to participate in a number of conferences recently to gain a lot of insights to overcome laziness and write this article.

This article aims to briefly review the history of big data, then summarize the current major development trends and the author’s thinking, and finally inevitably look subjectively into the future.

the past: advanced and backward coexistence


big data originated from the explosive growth of the Internet brought about by Web 2.0 [1] in the early 21st century, when the data volume of Google, Yahoo and other leading companies has far exceeded the size of stand-alone processing, and most of the data is unstructured, semi-structured data such as web page text, which can not be processed by traditional databases, so new data storage and computing technologies have begun to be explored. In 2003-2006, Google published papers on internal research and development, known as the Google Troika, GFS, MapReduce, and Bigtable papers. During this period, Yahoo established an open source Hadoop project based on the GFS/MapReduce paper, laying the foundation for the development of big data for more than a decade, and at the same time the term big data was widely used to describe such systems that were too large or too complex to be processed by traditional stand-alone technologies [2].

However, while the general-purpose data storage computing framework represented by MapReduce has been a huge success in the search engine scenario, MapReduce is a huge step backwards (“A major step backwards”) in the eyes of the competing database community [3]. The main reasons are roughly as follows:

  • a huge step backwards in the programming model, the lack of schema and high-level data access language
  • implementation is very primitive, and the basic brute force traversal instead of using indexing
  • concepts is backward, It was the technical implementation of 25 years ago
  • that lacked most of the features that were standard with DBMS at the time

  • , such as transactions and data updates,
  • which were incompatible with the tools that DBMS users relied on at that time

In the author’s opinion, this paper bluntly points out the shortcomings of big data systems, which are still very instructive today. In the following decade, it is also the process of big data systems gradually perfecting to make up for these shortcomings, such as Hive/Spark filling the gap in the advanced programming model, Parquet/ORC and other storage formats adding indexes to files, and today’s data lakes are implementing missing ACID transaction features. It’s worth mentioning, though, that these criticisms are for the general-purpose database scenario, which targets unstructured/unstructured data, and Google Search itself is a huge inverted index (and therefore no additional indexing is required).

Due to the shortcomings of big data system characteristics and the independence of the technology stack, although big data has developed rapidly in the past ten years and various projects have bloomed, the application scenarios are still largely limited to scenarios where data accuracy requirements such as data warehousing and machine learning are not so high. Many of these projects are also designed to be positioned in certain subdivided application scenarios rather than general scenarios, such as Hive as a data warehouse and Storm as a real-time incremental supplement to offline data warehouses [5]. Although this can be regarded as a trade-off to support the magnitude of big data, it objectively also causes a very complex big data ecosystem, and to make good use of big data, it is usually necessary to introduce at least ten components, both for big data teams and users have a high threshold.


> computingStream-batch integration refers to the simultaneous implementation of stream computing and batch computing with the same computing framework, with the goal of solving the problem of repeated data pipelines with two different programming models of offline batch processing and real-time stream processing in Lambda architecture.

The main reason for the formation of such an architecture is that real-time stream computing could not provide accurate once semantics (Exactly-Once Semantics) in the early stage of development, which can easily lead to data undercalculation or overcalculation in the case of abnormal retries or data delays, so it is necessary to rely on mature and reliable offline batch computing to regularly correct data. The difference in data accuracy between the two is mainly due to: the offline batch computing data is bounded (so there is no need to consider whether the data is complete) and allows high latency, so there is almost no need to trade-off between data accuracy and latency; Real-time stream computing relies heavily on the low latency of input data, and if the business data generated at a certain point in time is not processed in time, it is likely to be incorrectly counted into the next statistical calculation window, which may lead to inaccurate data in both windows.

However, the release of the

Google Dataflow Model paper in 2015 [6] clarified the relationship between the opposites and batches of stream processing, that is, batch processing is a special case of stream processing, which laid the foundation for the general trend of stream-batch integration. This article is not intended to delve too much into the Dataflow Model, but in short, it introduces two concepts that are critical to stream processing: watermark and accumulation mode. A watermark is extracted from the business time of the data itself (this is called the Event Time property) and represents an estimate of the business time of the input data. Triggering calculations based on watermarks rather than data processing times can largely solve the latency dependency of stream computing. On the other hand, the Accumulation Mode defines the relationship between the results of different executions of stream computing, so that stream computing can output incomplete intermediate results first, and then gradually correct them, and finally converge to accurate results.

In the open source industry, Flink/Beam, the first computing framework to adopt the stream-batch integrated computing model, has gradually reached production availability after several years of iteration, and has successively landed in cutting-edge companies. Since the integration of flow and batch involves a large number of business transformations, the main power sources for promoting the transformation of existing services under the current situation that the Lambda architecture has been running stably for many years are:

reducing costs and increasing efficiency. Avoid the machine and labor costs of building two data pipelines at the same time.

Align the caliber. There may be inconsistencies between the schema of batch processing and the schema of stream processing, for example, the same metric may be at the daily granularity in batch processing, while the stream processing is minute granular. Such inconsistencies lead to error-prone use of both stream and batch results.

It is worth noting that Stream-batch integration does not change the offline pipeline in the Lambda architecture to the same engine as the real-time pipeline, and double-run as before, but allows the job to flexibly switch between the two modes. Generally speaking, latency-insensitive services can be executed in batch mode to improve resource utilization, and when services become latency-sensitive, they can seamlessly switch to real-time stream processing mode. When you need to correct real-time calculation results, you can directly use the Kappa architecture [7] to copy a job to rebrush part of the data in batch mode.

the stream batch integration of

storage is

well known, and the file system is often read and written in batch processing, and the file is used as a storage abstraction; In stream processing, message queues are often read and written, and queues are used as storage abstractions. In the Lambda architecture, we often write simultaneous data to file systems such as HDFS, S3 or object storage for batch processing, and to message queues such as Kafka for stream processing. Although Message Queuing reduces data storage costs by retaining only the most recent period of data, the redundancy of the two systems still causes significant machine resource overhead and human resource costs. Under the general trend of flow batch integration of computing, the promotion of storage flow batch integration is naturally also a smooth water.

However, unlike heavyweight papers such as Dataflow Model, which can make the industry reach a consensus that “batch processing is a special case of stream processing”, the stream-batch integration of storage is still in a situation where the two genres of file system-based and message queuing are not equal. File-based queuing is represented by data lakes such as Iceberg/Hudi/DeltaLake, while queue-based file features are represented by new message queuing systems such as Pulsar/Prevega.

In my opinion, file storage and queue storage can meet the requirements of stream batch integration after certain improvements, such as Pulsar supports archiving data to hierarchical storage and can choose the Segment API or Message API to read, while Iceberg supports batch reading of files or streaming listening to files. However, in combination with the stream-batch integration of compute, the two are fundamentally different in terms of writing to the update API, and this difference further leads to many different characteristics of the two

Update method. Although files and queues are usually written in Append mode in big data scenarios, files support updates to data that has already been written, while queues do not allow direct updates, but are updated indirectly by writing new data and compacting to delete old data. This means that there is something unnatural about reading and writing queues in batches or reading and writing files in stream processing (more on that below). In file-based storage such as data lakes, streaming reads are usually implemented by listening for changelogs; In queue-based storage, for a batch to recalculate the update results, it cannot directly delete or overwrite the results that have been previously written to the queue, either to Changelog or to rebuild a new queue. Versioning. The data in the file is mutable because of the way it is updated, while the data in the queue is immutable. Files represent state at a point in time, so data lakes need versioning to add backtracking; Queues, on the other hand, represent events that change state over a period of time and are inherently Event Sourcing capable and therefore do not require versioning. Write in parallel. Files have a unique write lock that only allows a single process to write. Data lakes typically expose the entire directory as a table to users, and if there are multiple parallel writes, a new file-based snapshot isolation (MVCC) is added to that directory for each parallel process. In contrast, queues inherently support parallel writes, so snapshot isolation is not required. In fact, this difference is also caused by the different update methods of the two, because the queue Append-Only method ensures that concurrent writes will not lead to data loss, while files are not.

Through the above analysis, I believe that many readers have vaguely felt that file-based storage is similar to the table in stream table duality, which is suitable for saving variable states (calculated final results or intermediate results) that can be queried, while queue-based storage is similar to the stream in stream table duality, suitable for saving event streams (Changelog data) read by stream computing engines.

Although the performance of the flow table can be used interchangeably, improper use will cause unnecessary data conversion between the two states of the flow table and cause additional trouble to downstream services. Specifically, if the file system stores Changelog data, then when streaming (listening) downstream, it reads the Changelog of Changelog, which is completely unreasonable. Conversely, if a message queue stores non-Changelog data, the queue loses the ability to update, and any update results in different versions of the message at the same time. Since the current Changelog type is generally generated by the aggregation and join of CDC or stream computing, and has not been generalized to general MQ usage scenarios, the latter problem occurs more often. However, the author believes that Changelog is a more stream-native format, which will probably be standardized and popularized in Queue storage in the future, and the current non-Changelog data can be regarded as a special case of the Append-Only business.

The above conclusions can be applied to the current hot real-time data warehouse construction. In addition to the Lambda architecture, the current real-time data warehouse architecture mainly has two types: Kappa architecture and real-time OLAP variants[9], either of which usually uses MQ such as Kafka/Pulsar as the storage of the middle layer such as ODS/DWD/DWS, and OLAP database or OLTP database as the storage of the ADS application layer. The main problem with this architecture is that it is not flexible enough, for example, if you want to do some ad-hoc analysis directly based on the DWD layer, you often have to export the data in the MQ of the DWD layer to the database and then do the query.

Some readers may ask, if you use Flink to read MQ data directly? In fact, it is possible, because Pulsar also provides indefinite storage, but the efficiency will be relatively low, mainly because MQ cannot provide indexes to achieve optimizations such as predicate pushdown [10], and the aggregated or Join data is in Changelog format, and the data stream will contain old versions of redundant data. Therefore, there is a new trend in the industry to replace MQ as the storage of the middle layer of data warehouses with data lakes such as Iceberg, which has the advantage of better docking offline data warehouses and their long-standing business models, but at the cost that data latency may become near real-time. From the point of view of “files suitable for storage state” in this article, the tables that need to be queried by business in the real-time data warehouse are indeed more suitable for file storage, because the business needs the state, not the change history.

in the offline mixed part

refers to the deployment of online services and

real-time and offline services in big data scenarios on the same physical cluster, in order to improve the utilization rate of machines. Due to historical reasons, the technology stacks of online services and big data services are relatively independent, so it is natural to deploy them separately: online services use the cluster manager represented by k8s/Mesos, while big data services usually use YARN native to the Hadoop ecosystem as the cluster manager. However, as the cluster grows, the problem of insufficient resource utilization becomes more prominent, for example, the average CPU consumption is less than 20%. The best way to solve the problem is to break the boundaries of different business independent clusters to achieve mixed parts, and use the tidal phenomenon and priority of business resources for dynamic resource allocation. In fact, many companies have been exploring the offline mixing department for many years, and the rapid development of K8S in the past one or two years has greatly accelerated the progress of business (including big data) to the cloud, so it has once again become a hot spot in the offline mixing department.

The difficulties in offline mixing technology are mainly unified cluster management, resource isolation and resource scheduling, which are discussed point by point below.

First of all, a unified offline cluster manager is the basis of mixing. At present, most companies coexist with k8s and YARN, but under the general trend of cloud native, big data components are gradually providing first-class support for k8s, and it seems that k8s unifies cluster resources only a matter of time. First, the first-level scheduling design of k8s cannot well meet the complex scheduling of many batches of computing jobs, and second, the cluster size that k8s can currently control is generally about 5000 nodes, which is an order of magnitude worse than YARN [11]. Therefore, at this stage, the industry mostly chooses YARN on k8s to migrate gradually. A common practice is to start NM in the k8s pod and let some NM nodes of YARN run on k8s.

Then, resource isolation is at the heart of mixing. Although k8s provides resource management, it is limited to two dimensions: CPU and memory, while network and disk IO are not taken into account[12]. This is obviously not enough for mixed big data services, because big data services can easily fill up the network or disk of the machine, seriously affecting online services. Resource isolation for production usually requires Linux kernel-level support, which is beyond the scope of this article and the author’s knowledge reserve, and will not be detailed.

Finally, resource scheduling is a guarantee of service quality. The scheduler needs to consider the heterogeneity of resources of physical nodes, the full distribution of similar services, and the deployment preferences of services to optimize scheduling, optimize efficiency, and avoid mutual interference to the greatest extent. In addition, the cluster scheduler will overrun resources according to priority. During off-peak hours, idle resources can be used to run offline jobs with low priority and insensitive latency, but when there is a sudden traffic flow or when online jobs are found to be interfered with by offline jobs, the cluster scheduler needs to quickly exit the offline jobs and give up resources.


stands for Hybrid Transactional Analytical Processing, that is, it supports both online transactional queries and analytical queries. The stream-batch integration of computing and storage mentioned above is the integration of real-time and offline technology stacks, the integration of big data services and online business operation and maintenance management in the offline mixing part, and HTAP is the final integration of big data and online business technology stacks. Since Gartner proposed the concept in 2014, HTAP has become the hottest direction in the database space. In addition to simplifying the complex architecture of OLTP and OLAP technology stacks, HTAP has an important background of requirements: As data scenarios change from internal decision support to algorithm model input used as online value-added services (such as recommendations and advertisements), to directly as user-oriented data services (such as Taobao business staff, Didi driving tracks, etc.), the boundary between OLTP and OLAP is becoming more and more blurred.

HTAP is divided into two types from an architectural point of view: a single system serving both OLTP and OLAP, or two systems serving OLTP and OLAP respectively. TiDB, OceanBase and Google’s F1 Lightning, which are now popular in the industry, all belong to the latter. In this type of system, OLTP and OLAP have independent storage and computing engines, and rely on the built-in synchronization mechanism to synchronize the row data in the OLTP system to the OLAP system into column data suitable for analysis business. On top of this, the query optimizer provides a unified query entry to the outside world, routing different types of queries to the appropriate system.

Compared with traditional Hadoop-based data warehouses, the advantages of HTAP are:

built-in reliable data synchronization mechanism, avoiding the establishment of complex ETL pipelines from OLTP libraries to data warehouses, and improving data consistency (for example, TiDB and F1 Lightning both provide repeatable read consistency consistent with OLTP).

The user-friendly unified query interface shields the complexity of the underlying engine and greatly reduces the threshold of OLAP. This allows online business teams to use OLAP for lightweight data analysis when authorized, and data analytics teams to use OLTP for quick spot checks.

Data security is more guaranteed. Moving data between different components can easily lead to inconsistent permissions and security vulnerabilities, and HTAP can avoid these problems by reusing OLTP’s data permissions and avoiding cross-component access.

While the vision of HTAP is

great, building a business-proven HTAP system is not easy. There have been many attempts in the field of databases and big data, but there are only a few successful cases, and the main difficulty is the

isolation of OLTP and OLAP resources. Because OLAP often contains some resource-intensive, complex queries, OLTP and OLAP common components can easily compete for resources, interfering with higher-priority OLTP queries. In earlier cases, HTAP that shared compute and storage did not work well, so recent HTAP databases have been load isolated at the hardware level, that is, independent storage and compute.

How data synchronization mechanisms ensure data consistency and freshness. Unlike Hadoop-based data warehouses, which typically allow hourly data latency and windows of inconsistency, HTAP typically promises strong consistency to ensure consistent results for a query regardless of whether it is routed to an OLTP or OLAP system, which places high demands on the performance and fault tolerance of the data synchronization mechanism. At present, among the two databases called State of the art in the field of HTAP, F1 Lightning uses the non-intrusion CDC method for synchronization, and TiDB is based on the Raft algorithm for data replication. The former is loosely coupled, but the implementation is more complicated; The latter is more concise and elegant, but subject to OLTP design constraints, such as the copied block size needs to be consistent with OLTP[16].

How to combine OLTP and OLAP workloads. The current HTAP is like two independent systems behind the same façade, a query is either handed over to OLTP or OLAP processing, and does not produce a chemical reaction of 1 + 1 > 2. IBM points out that true OLAP is the efficient handling of both OLTP and OLAP workloads in the same transaction[15]. To do this, the HTAP architecture that relies on data synchronization is probably difficult to do, and it needs to be solved from the level of distributed transaction algorithms.

Although HTAP is not yet widely used, it is foreseeable that it will significantly affect data warehouse architectures in the future. In scenarios where the data scale is small and the analysis requirements are simple, HTAP will become the most popular solution.

the future: returning to the essence

of “

integration” is the general trend of the current development of big data, which can be seen from the perspective of historical development law. For emerging technical challenges, solutions are always emerging during the initial exploration period, and solutions using the Greenfield approach may tear down the existing foundation and bring some degradation (Regression) compared to the original technology. Degradation limits the application scenarios of new technologies, resulting in a dual-track system of old and new technologies, but as long as the core functions do not change much, such a separation is often only temporary.

Looking back at the history of big data, the term “big data” was originally used to describe the challenges that data scale, diversity, and

processing performance pose to data management, and was subsequently used to describe data systems built to deal with such problems, that is, “big data systems”. Because such systems are built on a different foundation from traditional data and abandon the transaction characteristics of the latter, it is difficult to apply to online business, and is usually only used in scenarios with slightly lower requirements for data latency and data accuracy, such as data warehousing and machine learning, which are gradually called “big data business”.

However, the essence of big data technology is a data-intensive distributed system, and with the development and popularization of distributed systems, the

limitations of big data systems in functional characteristics and business scenarios will eventually be broken, and there is no obvious boundary with the emerging NewSQL distributed database represented by Spanner. At that time, the term “big data” may gradually disappear into history, like many buzzwords, and return to the essence of general-purpose distributed systems. Horizontal scaling, excellent fault tolerance, and highly available distributed features will become standard in a wide range of systems, whether in OLTP or OLAP scenarios.


    > Wikipedia – Web 2.0
  • Wikipedia – Big data
  • MapReduce: A major step backwards
  • 3D Data Management: Controlling Data Volume, Velocity, and Variety
  • How to beat the CAP theorem
  • Why does Alibaba Cloud do flow batch integration?
  • Questioning the Lambda Architecture
  • Stream is the new file
  • based on Flink’s typical ETL scenario implementation
  • Flink + Iceberg Full-scenario Real-time Data Warehouse Construction Practice
  • Talk about the problems and limitations of Kubernetes
  • Kubernetes #27000: limiting bandwidth and iops per container
  • TiDB: A Raft-based HTAP Database
  • F1 Lightning: HTAP as a Service
  • Hybrid Transactional/ Analytical Processing: A Survey
  • – F1 Lightning: HTAP as a Service