Big Data Architecture Course Review Notes


The requirements of big data systems include data requirements, functional requirements, performance requirements (high performance, high availability, high scalability, high fault tolerance, security, etc.), and computational scenario requirements.

The goal requirements of distributed systems/clusters or big data processing: high performance, high availability, fault tolerance, scalability, where high performance includes three metrics: response time (latency), throughput, resource utilization; high availability metrics: MTTF, MTTR, availability=MTTF/(MTTF+MTTR)

The relationship between big data and cloud computing:

  • Cloud computing can provide abundant computing resources for big data processing.
  • Big data is a typical application of cloud computing services.
  • Big data can be processed without using cloud computing. Typical scenarios of big data computation are
  • Batch processing
  • Stream computing
  • Interactive querying Static data is bounded, persistently stored, with large capacity, suitable for batch processing.

Stream data is unbounded, continuously generated, requires timely processing with data windows, and has no end in sight.

Overview of Cloud Computing

Definition of Cloud Computing

  • Cloud computing is a business computing model. It distributes computing tasks across a resource pool composed of a large number of computers, allowing various application systems to obtain computing power, storage space, and information services as needed.
  • It provides dynamically scalable, inexpensive computing services on demand through the network, and represents a universally applicable resource management mindset and model.
  • Cloud computing compares computing resources to omnipresent clouds and is the result of the development and evolution of technologies such as virtualization, distributed computing, utility computing, load balancing, parallel computing, network storage, hot backup redundancy, etc.

Characteristics of Cloud Computing

  1. Unified management of resource virtualization and pooling
  2. Massive scale, high availability, high scalability
  3. Elasticity, on-demand, self-service provision
  4. Ubiquitous access, accurate billing, low cost

Three Service Models

  1. Infrastructure as a Service (IaaS)

Provides computing resources services such as servers, storage, and networking.

Main functionalities:

  • Users pay for IaaS on-demand, without needing to purchase the entire hardware setup.
  • Infrastructure can be scaled according to processing and storage needs.
  • Saves companies the cost of buying and maintaining hardware.
  • Data is stored in the cloud, avoiding single points of failure.
  1. Platform as a Service (PaaS)

Provides environment software for development, management, and delivery, such as operating systems, databases, middleware, and development platforms.

Main functionalities:

  • Provides development platforms and tools for software vendors to rapidly develop, test, deploy, and run.
  • Software vendors can focus on development without worrying about the underlying infrastructure.
  • Cloud providers ensure the platform’s security, reliability, and stability.
  1. Software as a Service (SaaS)

Provides cloud-based software services through the internet.

Main functionalities:

  • Users subscribe to software and access application software directly via the internet, without needing to manage, install, or upgrade software.
  • Data is protected in the cloud, preventing loss due to device failures.
  • Resources can be scaled according to service needs. image.png

Four Types of Service Models

Public Cloud: IT infrastructure owned and operated by a third-party cloud service provider, offering services over the internet.

Advantages: Low cost, no maintenance needed, scalable on demand, high reliability, and availability.

Disadvantages: Uncontrollable security, resources cannot be freely customized.

Private Cloud consists of cloud computing resources dedicated to a single enterprise or organization.

Advantages: Compared to public clouds, resources are more flexibly customized, and security is higher.

Disadvantages: High construction and operation costs.

Community Cloud consists of multiple enterprises or organizations

Hybrid Cloud, combines public and private cloud environments through secure connections, allowing data and applications to be shared between different cloud environments.

– High controllability, sensitive assets in private clouds.

– Flexible availability, use public cloud on-demand.

– High cost-effectiveness, with the scalability of public clouds.

Cloud Computing Architecture

  1. SOA Build Layer

Encapsulating cloud computing capabilities into standard Web services, and integrating them into the SOA framework.

  1. Management Middleware Layer

Managing cloud computing resources and scheduling numerous application tasks, enabling resources to serve applications efficiently and securely.

Core Technologies of Cloud Computing

The core technologies of cloud computing mainly include virtualization and containerization. Among them, containerization technology is more lightweight, faster, and has lower overhead than virtualization because it utilizes a shared operating system kernel to package applications and their running environments, making it a technology that developers have pursued in recent years.

Virtualization Technology

Virtualization is the abstraction mapping of computer resources into virtual logical entities, breaching the boundary of physical resources for unified management, and building the foundational technology for the cloud computing environment.

◼ Server virtualization: Virtualizing a computer into multiple logical computers

◼ Storage virtualization: Abstracting and unifying the management of underlying storage devices, providing storage services independently

◼ Network virtualization: Virtualizing one physical network card into multiple virtual network cards, isolating different applications through virtual machines

◼ Desktop virtualization: Decoupling the user’s desktop environment from their terminal devices, hosting each person’s complete desktop environment at the service provider, and accessing their desktop through terminal devices over the network

1. Server Virtualization

Virtual Machine (VM)

Virtualizing one computer (physical machine, physical server) into multiple logical computers

Each virtual machine has independent “hardware”.

The “hardware” of the virtual machine is emulated using the hardware of the physical machine.

The work executed by the virtual machine is actually completed by the physical machine’s hardware.

Virtual Machine Monitor (VMM)

VMM is the operating system or software that virtualizes the physical machine into virtual machines. Its main function is to provide virtual hardware resources for the virtual machine, manage and allocate these resources, and ensure isolation between virtual machines.

VMM’s two operating modes

1 Hosted mode: VMM runs on the operating system of the physical machine, easy to install and use, but with lower performance.

In hosted virtualization, the virtualization layer is generally referred to as the Virtual Machine Monitor (VMM). VMM achieves the virtualization of CPU, memory, and I/O devices by calling host OS resources. Virtual machines created by VMM are scheduled as a process of the host OS. Hosted mode VMM can fully utilize host OS functions to operate hardware devices; however, the system suffers from greater losses due to intermediary steps.

2 Hypervisor mode (Bare Metal mode): VMM runs directly on the hardware of the physical machine, providing performance close to that of the physical machine.

In the architecture, the VMM is an operating system, generally called Hypervisor. Hypervisor = OS + Virtualization—possessing traditional operating system functions, including virtualization functions, mapping virtual resources to physical resources, and isolating virtual machine systems. It provides performance close to that of the physical machine, but the supported I/O devices are limited.

Server Virtualization Technology Classification

Based on different handling methods for critical instructions

Full Virtualization

Para Virtualization

Hardware Assisted Virtualization

Full Virtualization

  1. The VMM simulates the complete underlying hardware for the Guest OS, including processors, physical memory, clocks, peripherals, etc. The guest operating system is completely unaware that it runs on a virtual machine.

  2. The guest operating system and its system software can run in the virtual machine without any modifications.

  3. Good compatibility, easy to install and use.

  4. Lower performance (because the VMM needs to translate the binary code, replacing sensitive instructions). Paravirtualization

  5. Paravirtualization requires modifying the kernel of the Guest OS, turning privileged instructions or sensitive instructions that were originally executed on a physical machine into super calls of the VMM.

  6. The Guest OS knows it is running in a virtual machine environment and cannot directly call the kernel’s privileged and sensitive instructions; it calls the CPU directly through the host kernel.

  7. Performance improvement,

  8. But difficult to implement. Hardware-assisted virtualization

CPU manufacturers modify CPUs, introducing new instructions and running modes, helping the VMM efficiently identify and intercept sensitive instructions, supporting virtualization from the hardware level. Usually, the core instructions of the Guest OS can be executed directly by the computer system hardware without going through the VMM. For special instructions, the system will switch to the VMM, letting the VMM handle special instructions.

2. Storage Virtualization

Abstracts and unifies the management of underlying storage devices, providing storage services independently. This can achieve:

  1. High scalability, breaking away from physical capacity limitations.

  2. Hide device complexity, unified management, and service.

  3. Integrates space resources, improves device utilization.

  4. High reliability, high availability. Technology Types

  5. Host-based virtualization (supports heterogeneous devices, cost-effective but occupies host resources affecting performance, impacts host security and stability, and has poor scalability)

  6. Storage device-based virtualization (does not affect host performance but does not support heterogeneous devices specific to manufacturers)

  7. Network-based virtualization

3. Desktop Virtualization

Remote Desktop Services (RDS)

Virtual Desktop Infrastructure (VDI)

Intelligent Desktop Virtualization (IDV)

4. Network Virtualization

OpenStack core services: compute service nova, storage service swift, image service glance

Virtualization technology flaws

  1. Virtual machine operating systems consume a lot of resources and have long startup times.
  2. The intermediary (hypervisor) reduces the system’s service performance.
  3. Users are more concerned about their deployed applications but have to deploy and maintain the operating system and its dependencies.

Containerization Technology

Containerization is a lightweight virtualization technology on the operating system kernel. Utilizing the shared operating system kernel functions, a series of resource-isolated and enclosed running environments are established. These environments, like containers, deploy and run applications. Its advantages include being lightweight, agile, easy to scale, supporting DevOps, improving resource utilization to save costs, accelerating product iterations, supporting microservices architecture, and achieving operation and maintenance automation.

  1. Containers share the same operating system kernel.

  2. Containers package the application and its runtime environment.

  3. Build once, run everywhere across platforms.

  4. Containers are lightweight, start quickly, and have low overhead. Container Implementation Principles

  5. Namespace

A namespace defines a closed scope range: processes within the same namespace can only see resources under that namespace, such as hostname, network, process, user, filesystem, etc. Processes in different namespaces are invisible to each other and do not interfere. Containers, possessing unique namespaces, make applications running within seem as though they are operating in separate operating systems, unaffected by one another.

Each process has seven namespaces for isolating different types of resources.

  1. Cgroups (Control Groups)

Namespaces can isolate processes into a specific environment but cannot limit the physical resources a process uses. Cgroups (Control Groups) are a physical resource isolation mechanism provided by the Linux kernel, implementing the ability to limit, isolate, and record the resources for Linux processes or process groups.

Containers use cgroups to isolate, limit, and record the physical resources (CPU, memory, IO, etc.) used by the container. Cgroups treat each container as a regular process. By setting resource limits for a process group or a specific process, cgroups achieve the purpose of isolating container processes from other processes in terms of resource usage.

· A. Namespace implements resource isolation.

· B. cgroups implement resource control.

· C. Each process has 7 types of namespaces, used to isolate different types of resources.

· D. cgroups treat each container as a normal process. By setting the resource limit conditions for a process group or a specific process, it aims to isolate container processes from other processes in terms of resource usage.

Overview of Big Data Processing

Big Data Processing Procedures

Big data processing refers to the collection of tasks such as data acquisition and pre-processing, storage and management, processing and analysis, and visualization presentation of big data.

Data Acquisition and Preprocessing

Data types: Structured, Semi-structured, Unstructured

Data sources: Business data, Internet data, IoT data

Collection methods: Log collection, Web crawlers, APIs, Government and enterprise institution sharing

Data preprocessing includes:

  • Data Cleaning Delete duplicate values, handle missing values, rename columns

  • Data Integration Logically or physically integrate interrelated distributed heterogeneous data sources into one, providing users with transparent access services. The methods of data integration include:

Data consolidation (ETL + Data warehouse, physically unified)

Data federations (establishing a unified logical view)

Data propagation (data spreading across multiple applications)

Hybrid approach

  • Data Transformation Transform data from one form of presentation to another:

Smoothing, Aggregation, Generalization, Normalization, Attribute Construction

  • Data Reduction Reducing data size while ensuring the quality of data analysis:

Dimensionality reduction: Wavelet Transform, PCA (Principal Component Analysis), Feature Selection

Numerosity reduction: Clustering, Sampling, Logistic Regression

Big Data Processing and Analysis

  • Distributed computing models and frameworks
    • Batch processing: Hadoop, Spark
    • Stream processing: Storm, Flink
    • Graph computing: Pregel, Graph X
  • Big data analysis
    • Interactive query: Hive, Pig, Spark SQL
    • Data mining: Mahout
    • Machine Learning: MLlib

Big Data Storage and Management

Big Data Interpretation and Visualization

Distributed Computing Principles

A distributed system is a collaborative working system composed of a group of computer nodes in a network to complete a common task, requiring high availability, high performance, scalability, and fault tolerance; including distributed storage and distributed computing. Sharding and replication are the basic means.


How HDFS stores files

Files are split into fixed-size units, which are scattered and stored on different nodes. They are merged when accessed.

How HDFS writes files

The client applies to Namenode to write a file, Namenode prepares, and then informs the Client it’s ready. Upon receiving confirmation, the Client repeats the following steps until data writing is complete: (1) Apply for a Block to Namenode, Namenode selects Data Nodes according to rules and informs the Client. (2) The Client sends data to the designated Data Node, which receives and writes the data locally.


How HDFS reads files

The client requests to read a file from Namenode, which prepares and then returns the file’s corresponding metadata. After receiving the metadata, the Client requests the specific Block data from the corresponding Data Nodes and finally assembles the complete file content.

The Role of Data Distributed Storage:

  1. Data redundancy to improve availability.

  2. Separation of read and write operations to enhance performance.


The dataset is divided into mutually independent orthogonal subsets according to specific rules and distributed to different nodes. Sharding facilitates high performance, horizontal scaling, and high availability.

Sharding Requirements: Uniform distribution, load balancing, minimal data migration (during scaling)

Based on Data Range

Data is divided into different ranges based on keys, with each node responsible for one or more ranges.

Supports range queries; however, balancing is challenging to ensure.

Hash Method

A mapping relationship between hash values and system nodes is established, distributing data with different hash values across different nodes.

It solves the imbalance problem,

Range query performance is affected.

Scaling requires migrating a lot of data.

Consistent Hashing

  • Nodes (servers) are mapped to a hash ring that loops back on itself based on their characteristics.
  • Data is mapped to the same hash ring based on its characteristics.
  • Data is stored on the first node clockwise on the ring.
  • Additionally, each physical node is assigned virtual nodes, with data hashed to a virtual node actually stored on the corresponding physical node. Virtual nodes are evenly distributed across the hash ring to prevent data skew and node avalanches. Scaling involves minimal data migration,

Data skew might occur

Leading to node avalanches due to data skew.


Establishing redundant replicas is a fundamental approach for fault tolerance and high availability.

Replication Strategies

Single-Master Replication

With single-master replication, there is only one Master replica; all others are backup Slave replicas. The node maintaining the Master replica acts as the central node, responsible for maintaining data updates, concurrency control, and coordinating replica consistency.

Process: If the Master replica fails, a new Master is elected from among the Slaves. Once the failed master recovers, it demotes to a slave and syncs with the new master. If a slave replica fails, after recovery, it resyncs data from the master.

Existing Issues:

  1. Availability issues: Failover operations after master failure, slave election, and service switching to the new master take time, blocking the system from providing service.

  2. Data consistency issues: If the master fails and a slave becomes the new master through election, data synchronization between the new and old masters might not have occurred. When the old master recovers and becomes a new slave, it may have more data than the new master, leading to data inconsistency.

  3. Cost issues: Slave replicas are only used for failover, which can be wasteful.

Multi-Master Replication

All replicas are considered masters, and they serve as master and slave to each other. Write operations can be processed by any master and then synced to other masters.

Multi-master replication can cause data inconsistency issues during concurrent operations.

Masterless Replication

No distinction between master and slave replicas. For data updates, the client issues write requests to multiple replicas; for data queries, it sends read requests to multiple replicas.

The client can perform some data compensation work, but data inconsistency issues still exist.

Replica Synchronization Strategies

Synchronous Replication

Replication is considered complete only when data is copied to all replicas. There is strong consistency among replicas, but the performance is not high.

Asynchronous Replication

Replication is considered complete as soon as data is copied to the master replica, with other replicas processing asynchronously. This approach offers high performance but can result in data loss or dirty reads.

Semi-Synchronous Replication

When data replication reaches a predetermined number of replicas, the replication is considered complete. It balances performance and consistency.

Replica Consistency Model

CAP Theorem

A distributed system cannot simultaneously fulfill consistency, availability, and partition tolerance. At most, only two of these can be satisfied.

  • Consistency: All copies of the data are consistent.
  • Availability: All requests receive a correct response.
  • Partition Tolerance: Even in the event of a network partition, the system can still provide services that satisfy consistency and availability. image.png

A network partition is a problem with network links that isolates cluster nodes into multiple partitions, making networks unreachable between partitions but normal within them.

Distributed systems must guarantee partition tolerance; otherwise, they lose the meaning of being distributed. Hence, a balance must be struck between consistency and availability.


  • Atomicity: All operations within a transaction either complete fully or are not completed at all. They do not end at an intermediate state.
  • Consistency: The database transitions from one consistent state to another consistent state before and after a transaction, without violating data integrity.
  • Isolation: Multiple concurrent transactions can occur without them interfering with each other, leading to inconsistent data.
  • Durability: After a transaction has been processed, modifications to data are permanent and won’t be lost even in case of a system failure.

BASE Principle

BASE loosens consistency in favor of partition tolerance and availability, representing a different design philosophy from ACID.

  • Basic Availability The system should function at a basic level, continuously providing service. In the case of unforeseen failures, partial availability loss, such as response delays or service degradation, is permitted.

  • Soft State The system allows data to exist in an intermediate state, considering that this state doesn’t affect the overall availability of the system, i.e., temporary inconsistency among replicas in different nodes is allowed.

  • Eventually Consistency Data should not remain in a soft state indefinitely and must eventually achieve consistency, ensuring data consistency across all replicas.

Consistency Models define the basic constraints for keeping consistency during data replication in distributed systems.

  • Strong Consistency At any time, any user or node can read the most recent successfully updated copy of the data. It requires the highest level of consistency and is the hardest to achieve in practice.

  • Monotonic Consistency At any time, once a user reads a value of some data after a certain update, the user will not read a value older than this. This is weaker than strong consistency.

  • Session Consistency Within a single session, once a user reads a value of some data after a certain update, the user will not read a value older than this for the duration of that session. Weaker than monotonic consistency, it only ensures monotonic modification of data within a single session for a single user, without guaranteeing consistency across different users or sessions for the same user.

  • Eventual Consistency Eventual consistency guarantees that once an update succeeds, the data on all replicas will ultimately reach a fully consistent state, but the time needed to achieve full consistency is not guaranteed.

  • Weak Consistency Once an update succeeds, users cannot read the update within a guaranteed time frame, and even if a new value is read on one replica, it does not guarantee that the new value can be read on other replicas. Weak consistency systems are generally difficult to use in practice, requiring the application to do more work to make the system usable.

Consistency Protocols for Distributed Systems


Lease, 2PC, and PAXOS can achieve full consistency

Lease Mechanism

A central node saves and maintains metadata, ensuring that the metadata cached on each node remains consistent with the central node.

Usage scenarios:

(1) Client reads metadata from a cache node

Determine whether the metadata is already on the cache node and whether the Lease is within its validity period.

If yes: then the metadata is returned directly.

If no: then the request to read data is sent to the central node. After receiving the request, the central node returns a corresponding Lease for the metadata.

If the cache node fails to receive or times out, the read fails, and the process exits to retry.

If the reception is successful, the metadata returned by the central node and its Lease will be recorded and returned to the client.

(2) Client modifies metadata

The client sends a request to the central node to modify the metadata.

After receiving the request, the central node blocks all new read data requests, that is, it only receives read requests but does not return data.

The central node waits for all Leases related to the data to timeout, modifies the data, and returns a successful modification message to the client.


Assuming there are a total of N replicas, a write operation is considered successful only if at least W replicas are successfully updated, and a read operation can only read the updated data if at least R replicas are read. The requirements are:

W + R > N

W and R can be adjusted according to business needs, thereby balancing between reliability and performance

Two-Phase Commit Protocol (2PC)

A protocol to maintain the consistency of distributed transactions, belonging to a synchronous replication protocol, that is, results are returned to the client only after all replica data have been synchronized.

2PC divides data replication into two stages

Voting stage: The primary node sends the data to all replicas, and each replica votes to commit or roll back. If a replica votes to commit, then it will place the data in a temporary area, waiting for the final commit.

Commit stage: The primary node receives responses from the other replicas. If all replicas vote to commit, then it sends a confirmation to all replicas to commit their updates, and the data is moved from the temporary area to the permanent area. If any replica returns a vote to roll back, then the whole process is rolled back.

2PC is a typical CA system that, to ensure consistency and availability, will refuse write operations once there is a network partition or nodes become unavailable, turning the system into read-only.

In case of node failures, 2PC will block the system indefinitely, thus it is not commonly used in data replication scenarios but generally applies to distributed transactions.

Paxos Protocol

The application scenario is multi-master replication to ensure consistency (state machine replication + consensus algorithm).

Three Roles

  • Proposer: Proposer, proposing a proposal (propose), there can be multiple.
  • Acceptor: Voters, who vote to accept proposals.
  • Learner: Synchronizers, synchronizing confirmed proposals. Proposal: Data update request, can be represented as: [Proposal number n, proposal content value]


  • Each proposer Proposer, when proposing a proposal, first obtains a globally unique, incrementing proposal number N and assigns it to the proposal he intends to propose.
  • Each accepter Acceptor, after accepting a proposal, will record the proposal’s number N locally, where the highest proposal number is recorded as MaxN. Each accepter will only accept proposals with a number greater than their local MaxN.
  • In an election, out of many proposals, there must be and can only be one proposal finally chosen.
  • Once a proposal is chosen, other nodes will actively synchronize (learn) the proposal locally.
  • If no proposal is made, then no proposal will be chosen. prepare-promise, propose-accept or learn, learn

Basic Paxos’s problem is that it can only make a decision for one value, and the formation of the decision requires at least two network round trips, which may require more network round trips in high concurrency situations, and in extreme cases, may even form a livelock (malicious competition for a value by two nodes).

Multi-Paxos improves on Basic Paxos in two ways:

  1. For each value to be determined, run a Paxos algorithm instance (Instance), to form a decision. Each Paxos instance uses a unique Instance ID to identify.
  2. Elect a Leader among all Proposers, and the Leader uniquely submits Proposal to Acceptors for voting. This eliminates Proposer competition, solving the livelock problem. In the system, when there is only one Leader submitting Value, the Prepare phase can be skipped, thereby reducing the phases to one, increasing efficiency. Even in the case of network partitioning with multiple leaders, multi-paxos degrades at most to basic-paxos.

Reference Article


Three types of events in distributed systems, each of which can trigger an increase in the clock

  1. Internal node events
  2. Sending events
  3. Receiving events Two methods for establishing logical clocks in distributed systems

Lamport can only represent causal relationships

Vector clock can represent both causality and concurrency relationships

① Lamport timestamp is a logical clock representation, a monotonically increasing integer. By assigning a Lamport timestamp to each event in a distributed system according to certain rules, the partial order relationship of events can be determined by comparing the numerical value of the timestamps.


  • Each node locally has a timestamp, with an initial value of 0.
  • If an event occurs within a node, the local timestamp is incremented by 1.
  • For sending events, the local timestamp is incremented by 1 and included in the message.
  • For receiving events, the local timestamp = Max(local timestamp, timestamp in the message) + 1. Event ordering: first sort by timestamp, if the same, then sort by node identifier (special attention, the node identifier is given by the problem!!!!!!!!!!!!!)

Vector clocks are another logical clock method evolved from Lamport timestamps, which record both the node’s Lamport timestamp and those of other nodes using a vector structure(Vector). It effectively describes the concurrent relationship and causality among events. The vector clock algorithm utilizes this vector data structure to broadcast the logical timestamps of all processes in the system to each process: whenever a process sends an event, it writes the timestamps of all known processes into a vector and attaches it to the message.

Event ordering: If Tb[Q] > Ta[Q] and Tb[P] < Ta[P], (i.e., event b occurs before a in node Q, and a occurs before b in node P), then a and b are considered to happen concurrently, denoted as a <-> b, which is a concurrent relationship (concurrent) that cannot be represented by Lamport timestamps.

Big Data System Architecture

Briefly describe what a big data system is, and what considerations are needed when establishing a big data system

Big Data Systems: An integrated software and hardware system with high performance, scalability, high availability, fault tolerance, and ease of use, which consolidates functions for big data processing such as data collection and pre-processing, storage and management, processing and analysis, and visualization; aimed at helping users discover potentially valuable information and knowledge within big data, grasp business realities, and predict business trends.

The structure of a big data system depends on the demands and macro decisions of its construction, including business objectives, data source types and characteristics, performance requirements, batch/stream processing (computing frameworks), technology selection, etc.

Traditional BI Architecture

Data source + ETL + Data warehouse + Analytical reports

  • Centered around structured analysis of data warehouse, lacking unstructured analysis.
  • Complex and bloated ETL data pre-processing functionality.
  • ACID properties affect performance, unable to cope with big data scale.

Batch Processing Architecture

Data source + ETL + Data storage + Batch processing + Analytical reports

  • Advantages: Simple to use, replacing BI components with big data components during technology selection.
  • Disadvantages: ①Lacks the flexibility of data warehouses in supporting business, requiring custom solutions for numerous reports and complex drilling scenarios; ②Primarily batch processing, lacking real-time support.
  • Applicable scenarios: Mainly BI scenarios, suitable for offline analysis of large-scale historical data.

Stream Processing Architecture

Data source + Real-time data channel + Stream processing + Message push

  • Advantages: No bulky ETL process, high data timeliness.
  • Disadvantages: Lacks batch processing, not well supporting data replay and historical statistics. Only supports analysis within the offline window.
  • Applicable scenarios: Alerts, monitoring, situations where data has a validity period requirement.

Lambda Architecture

Lambda Architecture: Batch processing layer + Streaming layer + Service layer. Data is appended and written in parallel to both batch and stream processing systems through two paths. Different data computing logics are provided for batch and streaming processing paths, respectively. Finally, the service layer integrates the computed results for external service output.

  • Advantages: Real-time + offline analysis, covering a wide range of data analysis scenarios.
  • Disadvantages: Needs to maintain two systems for batch and speed layers: Hadoop & Storm. The same business logic needs to be implemented and maintained separately in both layers. Merging query results is complex & maintenance is difficult.
  • Applicable scenarios: Situations with both real-time and offline requirements.

Kappa Architecture

Simplified Lambda architecture, eliminating the batch processing system, with all data following the real-time path. Everything is treated as a stream. The stream processing system manages both real-time and historical data. Data is introduced into a fault-tolerant, distributed, unified log in order, as an event stream. The event stream enters the speed layer for stream processing, producing a real-time view. At the same time, the event stream is saved in long-term storage. When necessary, replaying the event stream through the stream computing engine can recalculate to produce a historical data view.

  • Advantages: Addresses the redundancies in Lambda architecture, designed with data replayability in mind, resulting in a neat architecture.
  • Disadvantages: Implementation difficulty, especially the data replay part.
  • Application scenarios: Situations with both real-time and offline requirements. image.png


Evolution of Hadoop versions:

2.0 added Yarn

3.0 MapReduce Based on In-Memory Computing, Supports Multiple NameNodes, Streamlined Kernel

The three core components of Hadoop: HDFS, MapReduce, YARN


  • HDFS is a distributed storage framework, which stores files distributed across multiple computer nodes, suitable for massive data storage.

  • MapReduce is a distributed computing framework that abstracts the parallel computing process on large clusters into two functions: Map and Reduce. It adopts a “divide and conquer” strategy, where large datasets stored in a distributed file system are split into many independent pieces called splits. These splits are then processed in parallel by multiple Map tasks.

  • Yarn serves as a resource scheduling platform, and it is responsible for adjusting the resources occupied by various computing frameworks based on their load demands, achieving shared cluster resources and elastic resource scaling.


HDFS Architecture

The architecture of HDFS adopts a Master/Slave structural model, typically including:

  1. A NameNode as the central server responsible for managing the file system’s namespace and access by clients.

  2. Several DataNodes, each running a DataNode process, responsible for handling client read/write requests under the unified scheduling of the NameNode. Operations like creation, deletion, and replication of data blocks are performed. DataNode’s data is actually saved in the local Linux file system. image.png

HDFS Storage Principle

To ensure the system’s fault tolerance and availability, HDFS adopts a redundant storage method using multiple copies of data, which are usually distributed across different DataNodes. Clients preferentially use data on the same rack. Advantages:

  1. Speeds up data transfer

  2. Facilitates data error checking

  3. Ensures data reliability

Replication Storage Strategy:

  1. The first copy: placed on the DataNode uploading the file; if submitted from outside the cluster, then randomly select a node that has less full disks and is less busy with CPU.

  2. The second copy: placed on a node in a different rack than the first copy.

  3. The third copy: on another node in the same rack as the first copy.

  4. More copies: on random nodes.

Data Read and Write Process

Writing Files

The Client asks the NameNode to write a file, and after the NameNode is ready, it informs the Client. The Client then repeatedly follows these steps until the data is fully written:

  1. The Client asks the NameNode for a Block, and the NameNode selects a DataNode based on rules and informs the Client.

  2. The Client sends data to the designated DataNode, which after receiving the data writes it to the local file system.

Reading Files

The Client requests to read a file from the Name Node, and after the NameNode is ready, it returns the metadata information of the file. The Client then requests the corresponding Block data from the designated DataNodes and finally assembles the complete file content.

Data Error and Recovery

  1. Error in NameNode

The NameNode holds all metadata information; an error would cause the entire HDFS cluster to fail. HDFS has a checkpoint mechanism that periodically copies these metadata to a backup server, SecondaryNameNode. In case of NameNode error, the NameNode metadata can be recovered according to the SecondaryNameNode.

  1. Error in DataNode
  • Each DataNode regularly sends heartbeat messages to the NameNode, reporting its status.
  • If a DataNode fails or there is a network anomaly and the NameNode does not receive heartbeat messages from the DataNode, the DataNode will be marked as down. All data on it will be marked as unreadable, and the NameNode will not send any I/O requests to it.
  • The NameNode regularly checks the number of copies of data blocks. If it is less than the redundancy factor, data redundancy replication will be initiated.
  1. Data Error

Factors like network transmission and disk errors can cause data errors. Clients check the data blocks after reading them, using md5 and sha1 codes to verify the correctness of the data.

MapReduce Architecture

Computational Model

  • Abstracting the parallel computing process on large clusters into two functions: Map and Reduce.
  • Programming is easy, there is no need to master the tedious details of distributed parallel programming to run one’s own program on a distributed system and achieve computation of massive data.
  • Adopting the “divide and conquer” strategy, large datasets stored in the distributed file system are split into many independent shards (split), which are then processed in parallel by multiple Map tasks.
  • The design philosophy is to “move computation to data” rather than “move data to computation,” because moving data requires a large amount of network transmission overhead.
  • Adopted the Master/Slave architecture, including one Master and several Slaves (or Workers). The Master runs JobTracker, responsible for job scheduling, processing, and recovery after failure, while the Slave runs TaskTracker, responsible for receiving job instructions from the JobTracker.

Four components

  1. Client:
  • a Users write MapReduce programs and submit them to the JobTracker through the Client.
  • b Users can view the job status through some interfaces provided by the Client.
  1. Job Tracker:
  • a Responsible for resource monitoring and job scheduling,
  • b Monitors the health status of all Task Trackers and Jobs, transferring tasks to other nodes upon failure detection.
  • c JobTracker tracks task execution progress, resource usage, etc., and informs the Task Scheduler (pluggable, customizable) of this information. The scheduler then allocates resources to appropriate tasks as they become free.
  1. Task Tracker:
  • a Task Tracker periodically reports the resource usage and task progress on this node to the JobTracker through “heartbeat”, while receiving commands from the JobTracker and executing corresponding operations (such as starting new tasks, killing tasks, etc.).
  • b Task Tracker uses “slots” to quantitatively divide the resources (CPU, memory, etc.) on the node. A Task can only run after obtaining a slot, and the role of the Hadoop task scheduler is to assign idle slots on various TaskTrackers to Tasks. (Slots are divided into Map slots and Reduce slots, for use by MapTask and Reduce Task respectively.)
  1. Task: Divided into Map Task and Reduce Task, both initiated by Task Tracker.


(1) Program deployment; (2) Allocation of Map and Reduce tasks; (3) Map nodes reading data, executing map tasks, and spilling intermediate results; (4) Reduce nodes receiving intermediate result data and executing reduce tasks; (5) Writing the execution results into HDFS.

Yarn architecture

Resource Manager

Handles client requests, monitors NodeManager, starts and monitors Application Master, and allocates resources

The global resource manager, responsible for the overall resource management and allocation of the system. It includes two components: Scheduler and Applications Manager.

(1) The Scheduler allocates cluster resources to the applying applications in the form of “containers”, often selecting containers based on the location of the data being processed by the applications, thus achieving “move computation to data”. The scheduler is a pluggable component; YARN not only offers several directly usable schedulers but also allows users to redesign the scheduler according to their needs.

(2) The Applications Manager manages all the applications in the system, including application submission, negotiating resources with the scheduler to start ApplicationMaster, monitoring ApplicationMaster’s running status and restarting it when it fails.

Application Master

Main functions are:

(1) After the user’s job submission, ApplicationMaster negotiates with the Resource Manager to obtain resources, which the ResourceManager allocates in the form of containers to ApplicationMaster;

(2) ApplicationMaster allocates the obtained resources further to its internal tasks (Map or Reduce tasks), achieving “second-level allocation” of resources;

(3) Interacts with NodeManager to start, run, monitor, and stop the application, monitors the resource usage it applied for, monitors all tasks’ execution progress and status, and performs failure recovery (re-applies for resources and restarts tasks) when a task fails;

(4) Regularly sends “heartbeat” messages to ResourceManager, reporting the resource usage and application progress information;

(5) When the job is completed, ApplicationMaster deregisters the containers with ResourceManager, completing the cycle.

Node Manager

NodeManager is the agent residing on every node in the YARN cluster, mainly responsible for:

  1. Container lifecycle management, monitoring each container’s resource (CPU, memory, etc.) usage
  2. Tracking node health status, communicating with ResourceManager through “heartbeats”, reporting the job’s resource usage and each container’s status to ResourceManager,
  3. Receiving various requests from ApplicationMaster to start/stop containers. Note: NodeManager mainly manages abstract containers, only dealing with container-related matters, without being directly responsible for the management of the status of each task (Map task or Reduce task). The task status management work is completed by ApplicationMaster, which keeps track of the execution state of each task through continuous communication with NodeManager.

JobHistoryServer: Uniformly manages YARN’s historical tasks.

WebAppProxyServer: Web page proxy during task execution. Responsible for supervising the entire process of specific MapReduce task executions, collecting task execution information from the Container, and displaying it on a Web interface.

Yarn Workflow


Step 1: Users write client applications and submit them to YARN, including the ApplicationMaster program, the command to start ApplicationMaster, the user program, etc.

Step 2: ResourceManager in YARN is responsible for receiving and processing requests from clients, allocating a container for the application, and starting an ApplicationMaster in the container.

Step 3: After being created, the ApplicationMaster first registers with ResourceManager.

Step 4: The ApplicationMaster requests resources from ResourceManager in a polling manner.

Step 5: ResourceManager allocates resources to the requesting ApplicationMaster in the form of “containers”.

Step 6: Tasks are started in the container (running environment, scripts).

Step 7: Each task reports its status and progress to the ApplicationMaster.

Step 8: After the application is finished, the ApplicationMaster deregisters with ResourceManager’s application manager and closes itself.

The Role of Unified Deployment in YARN

  • Unified deployment of the resource scheduling framework YARN in the cluster and deployment of various computing frameworks on top of YARN.
  • YARN provides unified resource scheduling management services for these computing frameworks and adjusts the resources occupied by them according to the load requirements of various computing frameworks, achieving cluster resource sharing and elastic shrinking.
  • Implementing different applications on a cluster, load mixing, effectively improving the utilization of the cluster.
  • Different computing frameworks can share the underlying storage, avoiding cross-cluster data movement.


Distributed applications can implement the following functions through Zookeeper: configuration management, naming service, distributed locks, cluster management



  1. The sole scheduler and processor of transaction requests (write operations) in the cluster, ensuring the sequentiality of cluster transactions.

  2. The scheduler of various services within the cluster. Follower

  3. Processes non-transaction requests (read operations) of the cluster.

  4. Forwards transaction requests to the Leader.

  5. Participates in proposal voting for transaction requests.

  6. Participates in Leader election voting. Observer

  7. Processes non-transaction requests from clients.

  8. Forwards transaction requests to the Leader.

  9. Provides read-only data service, does not participate in any form of voting. The Leader/Followers cluster architecture endows Zookeeper with the ability of master-slave and primary-standby.

(1) Master-slave: The master node assigns tasks, and the slave nodes execute them.

(2) Primary-standby: Between the primary and backup nodes; when the primary node fails, a new primary node is elected from the Followers as quickly as possible to ensure the primary node does not go down.

Three Types of Znodes

  1. Persistent Nodes (PERSISTENT): The node remains after the client disconnects from Zookeeper. By default, all znodes are persistent.
  2. Ephemeral Nodes (EPHEMERAL): Effective while the client session is active, ephemeral nodes are automatically deleted upon disconnection with Zookeeper. Ephemeral nodes cannot have children and play a significant role in leader election.
  3. Sequential Nodes (SEQUENTIAL): Nodes with sequential numbering. Sequential nodes can be either persistent or ephemeral, meaning there can be persistent sequential nodes (PERSISTENT_SEQUENTIAL) or ephemeral sequential nodes (EPHEMERAL_SEQUENTIAL). Znode primitive operations: 

Create node (create)

Delete node (delete)

Update node (set)

Get node information (get)

Permission control (getAcl/setAcl)

Event listening (watch)

Features of znode Operations

  1. When multiple machines attempt to create a node simultaneously, only one succeeds, which can be used as a distributed lock.
  2. The life cycle of ephemeral nodes is consistent with the session; if the session ends, the ephemeral node is deleted, commonly used for heartbeat, monitoring, load balancing, etc.
  3. Sequential nodes ensure the global uniqueness of node names, which can be used for globally auto-incrementing IDs in a distributed environment.
  4. The client registers to listen to interested directory nodes, and Zookeeper notifies the client when there’s a change in the node directory data.

Services Provided by Zookeeper

Configuration management

Unified naming service

Cluster Management

Distributed Locks (Shared Locks, Exclusive Locks)

How Zookeeper Implements Exclusive Locks:

(1) Exclusive Lock Representation: An exclusive lock is represented by a znode, such as /x_lock/lock.

(2) Acquiring the Lock: All clients attempt to create a temporary child node /x_lock/lock under /x_lock by calling the create interface. Of course, ultimately only one client will be successful in creating it, indicating that this client has acquired the exclusive lock. At the same time, other clients that did not acquire the lock register a Watcher to listen for changes to the sub-nodes.

(3) Releasing the Lock: The client that acquired the lock crashes or completes its business logic normally, and deletes the temporary child node, indicating that the exclusive lock has been released. After the temporary sub-node is deleted, other clients start a new round of lock acquisition process.


Two Models of Message Queues

Point-to-Point Model and Publish-Subscribe Model image.png

Point-to-Point cannot achieve message broadcasting and multicasting. image.png

Five Application Scenarios of Message Queues

Application decoupling, Asynchronous communication, Traffic shaping, Log processing, Message communication


  • Producer uses push mode to publish messages to the broker.
  • Consumer uses pull mode to subscribe to and consume messages from the broker.
  • Broker (Kafka server, multiple brokers form a Kafka cluster) How does Kafka organize and store messages published to it to achieve load balancing and high throughput?
  1. Messages published to the Kafka cluster have a category, known as Topic.

  2. A Partition is an ordered queue where messages are actually stored, and each message is assigned a unique, ordered identifier (called an offset).

  3. Each Topic can be stored in one or more Partitions. Messages sent to the Broker will be stored in a Partition according to the partitioning rules. If the partitioning rules are set reasonably, then all messages can be evenly distributed across different Partitions, thus balancing the request load to each cluster node and improving throughput.

  4. Any message published to a Partition will be appended to the end of the Partition. This sequence of disk write operations is an important reason for ensuring Kafka’s high throughput. How does a Kafka cluster use replicas to achieve availability and fault tolerance?

  5. In Kafka, a Partition has multiple replicas (Replication) across different Brokers in the cluster.

  6. In a multi-replica Partition, only one replica is the Leader, while all other replicas are Followers.

  7. The Leader is responsible for handling all read and write operations for that partition, while Followers passively replicate data from the Leader. When the Leader fails, a Follower is elected as the new Leader through Zookeeper. How does a Kafka cluster implement Point-to-Point and Publish-Subscribe models?

  8. Kafka implements the Point-to-Point model. If all consumers belong to the same consumer group, then all messages will be evenly delivered to each consumer, meaning each message is handled by one consumer, which is equivalent to the Point-to-Point model.

  9. Kafka implements the Publish/Subscribe model. If all consumers belong to different consumer groups, then all messages will be broadcasted to all consumers, meaning each message is handled by all consumers, which is equivalent to the Publish/Subscribe model.

Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay