Big Data Architecture Course Review Notes
1 Introduction
The requirements of big data systems include data requirements, functional requirements, performance requirements (high performance, high availability, high scalability, high fault tolerance, security, etc.), and computing scenario requirements.
The target requirements of distributed systems/clusters or big data processing: high performance, high availability, fault tolerance, scalability, where high performance includes three metrics: response time (latency), throughput, resource utilization; high availability metrics: MTTF, MTTR, availability=MTTF/(MTTF+MTTR)
The relationship between big data and cloud computing:
- Cloud computing can provide sufficient computing resources for big data processing.
- Big data is a typical application of cloud computing services.
- Big data can be used without cloud computing. Typical scenarios for big data computing include
- Batch processing
- Stream computing
- Interactive query Static data is bounded, persistently stored, large in capacity, suitable for batch processing.
Stream data is unbounded, continuously generated, requiring timely processing of data windows with no end in sight.
2 Overview of Cloud Computing
2.1 Definition of Cloud Computing
- Cloud computing is a commercial computing model. It distributes computing tasks across a resource pool composed of a large number of computers, enabling various application systems to obtain computing power, storage space, and information services as needed.
- It provides dynamically scalable and inexpensive computing services through the network on demand, being a universally applicable resource management thinking and model.
- Cloud computing is likened to ubiquitous clouds, resulting from the development and evolution of technologies such as virtualization, distributed computing, utility computing, load balancing, parallel computing, network storage, and hot backup redundancy.
2.2 Characteristics of Cloud Computing
- Resource virtualization and pooled unified management
- Massive scale, high availability, high scalability
- Elasticity, on-demand, self-service provision
- Ubiquitous access, accurate billing, low cost
2.3 Three Service Models
- Infrastructure as a Service (IaaS)
Provides computing resources services such as servers, storage, and networks
Main functions:
- Users pay for IaaS on demand, without purchasing a complete set of hardware.
- Infrastructure can be expanded according to processing and storage needs.
- Saves the cost of purchasing and maintaining hardware for enterprises.
- Data is located in the cloud, avoiding single point of failure.
- Platform as a Service (PaaS)
Provides environment software for development, management, and delivery, such as operating systems, databases, middleware, and development platforms.
Main functions
- Provides development platforms and tools for software vendors to quickly develop, test, and deploy applications.
- Software vendors focus on development without worrying about the underlying infrastructure.
- Cloud vendors ensure the security, reliability, and stability of the platform.
- Software as a Service (SaaS)
Provides cloud software services through the network.
Main functions
- Users pay to subscribe to software, accessing application software directly via the internet without managing, installing, or upgrading the software.
- Data is protected in the cloud, preventing loss due to device failure.
- Resource usage can be expanded according to service needs.
2.4 Four Service Forms
Public Cloud: IT infrastructure owned and operated by third-party cloud service providers and provided through the Internet.
Advantages: Low cost, no maintenance, on-demand scalability, high reliability, and availability.
Disadvantages: Uncontrollable security, resources cannot be freely customized.
Private Cloud: Composed of cloud computing resources exclusively for use by one enterprise or organization.
Advantages: More flexible resource customization and higher security compared to public cloud.
Disadvantages: High construction and maintenance costs.
Community Cloud: Composed of cloud computing resources shared by multiple enterprises or organizations.
Hybrid Cloud: Combines public and private cloud environments through secure connections, allowing data and applications to be shared between different cloud environments.
– High controllability, sensitive assets in private cloud.
– Flexible availability, on-demand use of public cloud.
– Cost-effective, with the scalability of public cloud.
2.5 Cloud Computing Architecture
- SOA Construction Layer
Encapsulates cloud computing capabilities as standard Web services and incorporates them into the SOA system.
- Management Middleware Layer
Manages cloud computing resources and schedules numerous application tasks to ensure resources efficiently and securely serve applications.
2.6 Core Technologies of Cloud Computing
The core technologies of cloud computing mainly include virtualization and containerization, with containerization being popular among developers in recent years due to its lightweight, fast, and low-overhead nature, utilizing shared operating system kernels to package applications and their runtime environments.
2.6.1 Virtualization Technology
Virtualization abstracts and maps computer resources into virtual logical entities, breaking the boundaries of physical resources for unified management, forming the core foundational technology for building cloud computing environments.
◼ Server Virtualization: Virtualizes one computer into multiple logical computers.
◼ Storage Virtualization: Abstracts and unifies management of underlying storage devices, providing independent storage services externally.
◼ Network Virtualization: Virtualizes one physical network card into multiple virtual network cards, isolating different applications through virtual machines.
◼ Desktop Virtualization: Decouples user desktop environments from their terminal devices, storing each person’s complete desktop environment with the service provider, allowing users to access their desktops via the network using terminal devices.
2.6.1.1 1. Server Virtualization
Virtual Machine (VM)
Virtualizes one computer (physical machine, physical server) into multiple logical computers.
Each virtual machine has independent “hardware.”
The “hardware” of a virtual machine is simulated using the hardware of the physical machine.
The work executed by the virtual machine is actually completed by the hardware of the physical machine.
Virtual Machine Monitor (VMM)
VMM is the operating system or software that implements the virtualization of physical machines into virtual machines. Its main function is to provide virtual hardware resources for virtual machines, manage and allocate these resources, and ensure isolation between virtual machines.
VMM has two working modes:
- Hosted Mode: VMM runs on the operating system of the physical machine, easy to install and use, with lower performance.
In hosted virtualization, the virtualization layer is generally referred to as the virtual machine monitor (VMM). VMM obtains resources by calling the host OS, achieving virtualization of CPU, memory, and I/O devices. The virtual machine created by VMM participates in scheduling as a process of the host OS. In hosted mode, VMM can fully utilize the functions of the host OS to operate hardware devices; however, the intermediate steps lead to significant system loss.
- Hypervisor Mode: VMM runs directly on the hardware of the physical machine, providing performance close to that of the physical machine.
In this architecture, VMM is an operating system, generally referred to as a Hypervisor. Hypervisor = OS + virtualization—possessing traditional operating system functions and virtualization functions, including mapping virtual resources to physical resources and isolating virtual machine systems. It provides performance close to that of the physical machine but supports limited I/O devices.
Server Virtualization Technology Classification
Based on Different Handling Methods of Critical Instructions
Full Virtualization
Para Virtualization
Hardware-Assisted Virtualization
Full Virtualization
-
VMM simulates complete underlying hardware for the Guest OS, including processor, physical memory, clock, peripherals, etc., and the guest operating system is completely unaware of running in a virtual machine.
-
The guest operating system and its system software can run in the virtual machine without any modifications.
-
Good compatibility, easy to install and use.
-
Lower performance (because VMM needs to translate binary code and replace sensitive instructions). Para Virtualization
-
Para virtualization requires modifying the kernel of the Guest OS, changing the privileged or sensitive instructions that were executed on the physical machine into super calls of the VMM.
-
The Guest OS knows it is running in a virtual machine environment and cannot directly call the kernel’s privileged and sensitive instructions; it calls the CPU directly through the Host’s kernel.
-
Performance improvement,
-
But difficult to implement. Hardware-Assisted Virtualization
CPU manufacturers modify the CPU, introducing new instructions and operating modes to help VMM efficiently identify and intercept sensitive instructions, supporting virtualization from the hardware level. Typically, core instructions of the Guest OS can be directly executed by the computer system hardware without going through the VMM. For special instructions, the system will switch to the VMM, allowing the VMM to handle special instructions.
2.6.1.2 2. Storage Virtualization
Abstracts and unifies management of underlying storage devices, providing independent storage services externally. It can achieve:
-
High scalability, breaking free from physical capacity limitations.
-
Hiding device complexity, unified management and service.
-
Integrating space resources, improving device utilization.
-
High reliability, high availability. Technical Types
-
Host-Based Virtualization (supports heterogeneous devices, cost-effective but occupies host resources, affecting performance, security, and stability, with poor scalability)
-
Storage Device-Based Virtualization (host performance unaffected but does not support heterogeneous devices from specific manufacturers)
-
Network-Based Virtualization
2.6.1.3 3. Desktop Virtualization
Remote Desktop Services (RDS)
Virtual Desktop Infrastructure (VDI)
Intelligent Desktop Virtualization (IDV)
2.6.1.4 4. Network Virtualization
Core services of OpenStack: computing service nova, storage service swift, image service glance
Defects of Virtualization Technology
- High resource consumption and long startup time for virtual machine operating systems
- Intermediate steps (hypervisor) reduce system service performance
- Users focus more on their deployed applications but have to deploy and maintain operating systems and associated dependencies
2.6.2 Containerization Technology
Containerization is a lightweight virtualization technology on the operating system kernel. It utilizes the shared operating system kernel to establish a series of isolated closed runtime environments, which are like containers, where applications are deployed and run. Its advantages are lightweight, agile, easy to scale, supporting DevOps, improving resource utilization, saving costs, accelerating product iteration, supporting microservices architecture, and achieving automated operations and maintenance.
-
Containers share the same operating system kernel
-
Containers package applications and their runtime environments
-
Build once, run anywhere
-
Containers are lightweight, quick to start, and low overhead Container Implementation Principles
-
Namespace
Namespace defines a closed scope, stipulating that processes within the same namespace can only see resources within that namespace, such as hostname, network, processes, users, filesystem, etc. Processes in different namespaces are invisible to each other and do not affect each other. Containers are processes with separate namespaces, and applications running within containers operate as if they are running in independent operating systems, with containers not affecting each other.
Each process has seven namespaces for isolating different types of resources
- Cgroups (Control Groups)
Namespace can isolate processes into a specific environment but cannot limit the physical resources used by processes. Cgroups (Control Groups) is a physical resource isolation mechanism provided by the Linux kernel, allowing for resource limitation, isolation, and statistics for Linux processes or process groups.
Containers use cgroups to isolate, limit, and record the physical resources (CPU, memory, IO, etc.) used by containers. Cgroups treat each container as a normal process. By setting resource limit conditions for process groups or specific processes, it achieves the purpose of isolating container processes from other processes in terms of resource usage.
· A. Namespace achieves resource isolation.
· B. Cgroups achieve resource control.
· C. Each process has seven namespaces for isolating different types of resources.
· D. Cgroups treat each container as a normal process. By setting resource limit conditions for process groups or specific processes, it achieves the purpose of isolating container processes from other processes in terms of resource usage.
3 Overview of Big Data Processing
3.1 Big Data Processing Process
Big data processing encompasses tasks such as data collection and preprocessing, storage and management, processing and analysis, and visualization presentation.
3.1.1 Data Collection and Preprocessing
Data Types: Structured, Semi-Structured, Unstructured
Data Sources: Business Data, Internet Data, IoT Data
Collection Methods: Log Collection, Web Crawlers, API, Government and Enterprise Data Sharing
Data Preprocessing Includes:
-
Data Cleaning Removing duplicates, handling missing values, renaming columns
-
Data Integration Logically or physically integrating related distributed heterogeneous data sources to provide transparent access services for users. Data integration methods include:
Data Consolidation (ETL + Data Warehouse, physical unification)
Data Federation (establishing a unified logical view)
Data Propagation (data propagation among multiple applications)
Hybrid Approach
- Data Transformation Converting data from one representation to another:
Smoothing, Aggregation, Generalization, Normalization, Attribute Construction
- Data Reduction Reducing the data scale while ensuring data analysis quality:
Dimensionality Reduction: Wavelet Transform, PCA (Principal Component Analysis), Feature Selection
Numerosity Reduction: Clustering, Sampling, Logistic Regression
3.1.2 Big Data Processing and Analysis
- Distributed Computing Models and Frameworks
- Batch Processing: Hadoop, Spark
- Stream Processing: Storm, Flink
- Graph Computing: Pregel, GraphX
- Big Data Analysis
- Interactive Query: Hive, Pig, Spark SQL
- Data Mining: Mahout
- Machine Learning: Mllib
3.1.3 Big Data Storage and Management
3.1.4 Big Data Interpretation and Visualization
3.2 Principles of Distributed Computing
A distributed system is a collaborative working system composed of a group of computer nodes in a network to complete common tasks, requiring high availability, high performance, scalability, and fault tolerance; including distributed storage and distributed computing. Sharding and replication are basic means.
How HDFS Stores Files
Files are split into fixed-size units, which are stored on different nodes, and accessed by retrieving and merging from each node.
How HDFS Writes Files
The client requests to write a file to the Namenode, the Namenode prepares and informs the client when ready. The client receives confirmation and repeatedly executes the following steps until the data is written: (1) Requests a block from the Namenode, which selects Data Nodes according to rules and informs the client. (2) The client sends data to the specified Datanode, which receives and writes the data locally.
How HDFS Reads Files
The client requests to read a file from the Namenode, which prepares and returns the file’s metadata. The client receives the metadata and requests the corresponding block data from the relevant Datanode, finally merging to form the complete file content.
The Role of Distributed Data Storage:
-
Data redundancy to improve availability.
-
Read-write separation to improve performance.
3.2.1 Sharding
Data sets are divided into independent orthogonal data subsets according to certain rules and distributed to different nodes. Sharding can achieve high performance, horizontal scaling, and high availability.
Sharding Requirements: Uniform distribution, load balancing, minimal data migration (scaling)
3.2.1.1 Based on Data Range
Data is divided into different intervals according to keys, with each node responsible for one or more intervals.
Supports range queries, but balancing is not easy to ensure.
3.2.1.2 Hash Method
Establishes a mapping relationship between hash values and system nodes, distributing data with different hash values to different nodes.
Can solve imbalance issues,
Range query performance is affected.
Scaling requires migrating a lot of data
3.2.1.3 Consistent Hashing
- Maps nodes (servers) to a hash ring based on characteristics.
- Maps data to the same hash ring based on characteristics.
- Saves data to the first node clockwise on the ring.
- Sets virtual nodes for each physical node, with data hashed to virtual nodes actually saved to corresponding physical nodes, evenly distributing virtual nodes on the hash ring to avoid data skew and node avalanche. Scaling requires minimal data migration,
May cause data skew
Node avalanche due to data skew
3.2.2 Replication
Establishing redundant replicas is a basic means to achieve fault tolerance and high availability.
3.2.2.1 Replica Establishment Strategies
Single-Master Replication
Single-master replication has only one Master replica, with other replicas being backup Slave replicas. The node maintaining the Master replica acts as the central node, responsible for data updates, concurrency control, and coordinating replica consistency.
Process: When the Master replica fails, a Slave is elected as the Master, and the failed master, upon recovery, is demoted to a slave, synchronizing with the new master. When a slave replica fails, it resynchronizes data from the master upon recovery.
Problems:
(1) Availability Issue: Failover operations, slave elections, and service switching to the new master require time when the master fails, during which the system is blocked and unable to provide services.
(2) Data Consistency Issue: When the master fails, a slave is elected as the new master, and before synchronizing data between the new and old masters, the new slave may have more data than the new master, leading to data inconsistency.
(3) Cost Issue: Slave replicas are only used for failover, resulting in some waste.
Multi-Master Replication
All replicas are masters, mutually serving as masters and slaves. Write operations can be handled by any master and then synchronized to other masters.
Multi-master replication faces data inconsistency issues during concurrent operations.
Masterless Replication
Does not distinguish between master and slave replicas. When updating data, the client sends write requests to multiple replicas; when querying data, the client sends read requests to multiple replicas.
Clients can perform some data compensation work, but data inconsistency issues still exist.
3.2.2.2 Replica Synchronization Strategies
Synchronous Replication
Ensures data replication to all replicas before considering replication complete. Replicas have strong consistency, but performance is not high.
Asynchronous Replication
Considers replication complete once data is replicated to the master replica, with other replicas handling asynchronously. High performance, but may lose data or cause dirty reads.
Semi-Synchronous Replication
Considers replication complete when data is replicated to an agreed number of replicas. Balances performance and consistency.
3.2.2.3 Replica Consistency Models
CAP Theorem
A distributed system cannot simultaneously satisfy consistency, availability, and partition tolerance, and can only satisfy at most two of these.
- Consistency: All data replicas have consistent data.
- Availability: All requests can receive correct responses.
- Partition Tolerance: The system can provide services that satisfy consistency and availability even when network partitions occur.
Network partitioning is when network link issues isolate cluster nodes into multiple partitions, with partitions being unreachable to each other but functioning normally internally.
Distributed systems must ensure partition tolerance; otherwise, they lose the meaning of being distributed, requiring trade-offs between consistency and availability.
ACID
- Atomicity: All operations in a transaction must be completed or not completed at all, without ending in the middle.
- Consistency: The database transitions from one consistent state to another before and after the transaction, without breaking data integrity.
- Isolation: Multiple concurrent transactions do not interfere with each other, preventing data inconsistency.
- Durability: Modifications to data are permanent after transaction processing, even in the event of system failure. BASE Principle
BASE weakens consistency, pursuing partition tolerance and availability, representing a different design philosophy from ACID.
-
Basic Availability Requires the system to run basically and always provide services, allowing for partial availability loss, such as response delay or service degradation, in unforeseen failures.
-
Soft State Allows data in the system to exist in an intermediate state, considering that this state does not affect the overall availability of the system, allowing for temporary inconsistency between replicas.
-
Eventual Consistency Requires data not to remain in a soft state indefinitely, eventually achieving consistency across all replicas, ensuring data consistency.
Consistency Models define the basic constraints for maintaining consistency during data replication in distributed systems.
-
Strong Consistency At any time, any user or node can read the most recent successfully updated replica data. It has the highest consistency requirements and is the most challenging to achieve in practice.
-
Monotonic Consistency At any time, once a user reads a value updated in a certain operation, the user will not read an older value again. Weaker than strong consistency.
-
Session Consistency Once a user reads a value updated in a certain operation within a session, the user will not read an older value again during that session. Weaker than monotonic consistency, only ensuring monotonic modification of data within a single user’s single session, without guaranteeing consistency between different users or between different sessions of the same user.
-
Eventual Consistency Eventual consistency requires that once an update is successful, data across replicas will eventually reach a completely consistent state, but the time needed to achieve complete consistency cannot be guaranteed.
-
Weak Consistency Once an update is successful, users cannot read the updated value within a certain time, and even if a new value is read on one replica, it cannot be guaranteed to be read on other replicas. Weak consistency systems are generally challenging to use in practice, requiring applications to do more work to make the system usable.
3.2.3 Consistency Protocols in Distributed Systems
‘
Among them, Lease, 2PC, and PAXOS can achieve complete consistency.
3.2.3.1 Lease Mechanism
The central node stores and maintains metadata, ensuring that cached metadata on each node is always consistent with the metadata on the central node.
Usage Scenario:
(1) Client reads cache node metadata
Determines whether metadata is already on the cache node and if the Lease is valid.
If yes: returns metadata directly.
If no: requests to read data from the central node. Upon receiving the read request, the central node returns metadata with a corresponding Lease.
If the cache node fails to receive or times out, the read fails, and the process retries.
If received successfully, records the metadata and Lease returned by the central node and returns metadata to the client.
(2) Client modifies metadata
The client sends a request to modify metadata to the central node.
Upon receiving the request, the central node blocks all new read data requests, only receiving read requests but not returning data.
The central node waits for all Leases related to the data to time out, modifies the data, and returns modification success information to the client.
3.2.3.2 Quorum
Assuming a total of N replicas, a write operation must successfully update W replicas to be considered successful, and a read operation must read R replicas to read the updated data. Requirements:
W + R > N
Can adjust W and R based on business needs, balancing reliability and performance.
3.2.3.3 Two-Phase Commit Protocol (2PC)
A protocol for maintaining distributed transaction consistency, belonging to synchronous replication protocols, meaning all replica data must be synchronized before returning results to the client.
2PC divides data replication into two phases:
Voting Phase: The master node sends data to all replicas, and each replica votes to commit or rollback. If a replica votes to commit, it places the data in a temporary area, waiting for final commitment.
Commit Phase: The master node receives responses from other replicas. If all replicas vote to commit, it sends confirmation to commit to all replicas, moving data from the temporary area to the permanent area. If any replica returns a rollback, the entire process rolls back.
2PC is a typical CA system, ensuring consistency and availability. If network partitions or node unavailability occur, it refuses write operations, making the system read-only.
If a node fails, 2PC will block the system indefinitely, so it is rarely used in data replication scenarios, generally used for distributed transactions.
3.2.3.4 Paxos Protocol
Application scenario: ensuring consistency in multi-master replication (state machine replication + consensus algorithm).
Three Roles
- Proposer: Proposes proposals (propose), can have multiple.
- Acceptor: Votes on proposals.
- Learner: Synchronizes determined proposals. Proposal: Data update request, can be represented as: [Proposal Number n, Proposal Content value]
Steps:
- Each Proposer obtains a globally unique, incrementing proposal number N when proposing a proposal, assigning it to the proposal.
- Each Acceptor records the proposal number N locally after accepting a proposal, with the largest proposal number recorded as MaxN. Each Acceptor only accepts proposals with numbers greater than its local MaxN.
- In one election, only one proposal can be selected among many proposals.
- Once a proposal is selected, other nodes will actively synchronize (learn) the proposal locally.
- If no proposal is proposed, no proposal will be selected. prepare-promise, propose-accept or learn, learn
The problem with Basic Paxos is that it can only reach consensus on one value, requiring at least two network round trips to reach a decision, and in high concurrency situations, it may require more network round trips, potentially leading to livelock (two nodes maliciously competing for one value).
Multi-Paxos improves upon Basic Paxos with two enhancements:
- For each value to be determined, run one instance of the Paxos algorithm (Instance) to reach a decision. Each Paxos instance uses a unique Instance ID.
- Elect a Leader among all Proposers, with the Leader solely submitting Proposals to Acceptors for voting. This eliminates Proposer competition, solving the livelock problem. When only one Leader submits Values in the system, the Prepare phase can be skipped, reducing the two-phase process to one phase, improving efficiency. Thus, even in the case of network partitioning with multiple leaders, multi-paxos at most degrades to basic-paxos.
3.2.4 Reference Article
Clock
Three types of events in distributed systems, each capable of triggering clock increments:
- Internal events within nodes
- Sending events
- Receiving events Two methods for establishing logical clocks in distributed systems:
Lamport can only represent causal relationships.
Vector clock can represent causal and concurrent relationships.
① Lamport Timestamp is a logical clock representation, a monotonically increasing integer. By assigning a Lamport timestamp to each event in a distributed system according to certain rules, the partial order of events can be determined by comparing timestamp values.
Rules
- Each node has a local timestamp, initially set to 0.
- If an event occurs within a node, the local timestamp is incremented by 1.
- If it is a sending event, the local timestamp is incremented by 1 and included in the message.
- If it is a receiving event, the local timestamp = Max(local timestamp, message timestamp) + 1. Event Order: First sort by timestamp, if equal, sort by node number (Note: Node numbers are given by the problem statement!)
② Vector Clock is another logical clock method evolved from Lamport Timestamps, recording the Lamport timestamps of the local node and other nodes through a vector structure, effectively describing concurrent relationships and causal relationships between events. The vector clock algorithm uses the vector data structure to broadcast the logical timestamps of all processes globally to each process: each process, when sending an event, writes the known timestamps of all processes into a vector and includes it in the message.
Event Order: If Tb[Q] > Ta[Q] and Tb[P] < Ta[P], (i.e., event b occurs first on node Q, and a occurs first on node P), then a and b are considered concurrent, denoted as a <-> b, which is the concurrent relationship that Lamport Timestamps cannot represent.
3.3 Big Data System Structure
Briefly describe what a big data system is and what needs to be considered and balanced when building a big data system
Big Data System: A high-performance, scalable, highly available, fault-tolerant, secure, and easy-to-use hardware and software system that integrates big data processing functions such as data collection and preprocessing, storage and management, processing and analysis, and visualization presentation; used to help users discover valuable information and knowledge hidden in big data, grasp business realities, and predict business trends.
The structure of a big data system depends on the needs and macro decisions of building the big data system, including business goals, data source types and characteristics, performance requirements, batch processing/stream processing (computing framework), technology selection, etc.
3.3.1 Traditional BI Architecture
Data Source + ETL + Data Warehouse + Analysis Reports
- Focused on structured analysis around data warehouses, lacking unstructured analysis.
- Complex and bloated ETL data preprocessing functions.
- ACID characteristics affect performance, unable to handle large-scale data.
3.3.2 Batch Processing Architecture
Data Source + ETL + Data Storage + Batch Processing + Analysis Reports
- Advantages: Simple and easy to use, replacing BI components with big data components during technology selection.
- Disadvantages: ① Lacks the flexibility of data warehouse support for business, requiring manual customization for a large number of reports and complex drill-down scenarios; ② Primarily batch processing, lacking real-time support.
- Suitable Scenarios: Primarily BI scenarios, suitable for offline analysis of large-scale historical data.
3.3.3 Stream Processing Architecture
Data Source + Real-Time Data Channel + Stream Processing + Message Push
- Advantages: No bloated ETL process, high data timeliness.
- Disadvantages: No batch processing, unable to support data replay and historical statistics well. Only supports analysis within the window for offline analysis.
- Suitable Scenarios: Alerts, monitoring, situations requiring data with validity periods.
3.3.4 Lambda Architecture
Lambda Architecture: Batch Processing Layer + Stream Processing Layer + Service Layer. Data is written in parallel through two paths to batch and stream processing systems. Provides corresponding data computing logic for both batch and stream processing paths. Finally, the service layer integrates the computed result views for external service output.
- Advantages: Real-time + offline analysis, covering a comprehensive range of data analysis scenarios.
- Disadvantages: Requires maintaining two sets of systems: Hadoop & Storm for the batch processing layer and speed layer. The same business computing logic needs to be implemented and maintained in both layers. Merging query results is complex & maintenance is complex.
- Suitable Scenarios: Situations with both real-time and offline needs.
3.3.5 Kappa Architecture
Simplifies the Lambda architecture by removing the batch processing system, with all data going through the real-time path, treating all data as streams. Processes real-time and historical data entirely through stream processing systems. Data is introduced as events in order into a fault-tolerant distributed unified log. The event stream enters the speed layer for stream processing, generating real-time views. The event stream is also stored in long-term storage. When necessary, replay the event stream to recalculate and generate historical data views through the stream processing engine.
- Advantages: Solves the redundant parts of the Lambda architecture, designed with the idea of data replay, with a simple architecture.
- Disadvantages: High implementation difficulty, especially in data replay.
- Application Scenarios: Situations with both real-time and offline needs.
4 Hadoop
Hadoop Version Evolution:
2.0 added Yarn
3.0 MapReduce based on in-memory computing, supports multiple NameNodes, streamlined kernel
Three Core Components of Hadoop: HDFS, MapReduce, YARN
Functions: 5731
◼ HDFS is a distributed storage framework that distributes files across multiple computing nodes, suitable for storing massive data.
◼ MapReduce is a distributed computing framework that abstracts the parallel computing process on large-scale clusters into two functions: Map and Reduce, adopting a “divide and conquer” strategy, where large-scale datasets stored in the distributed file system are split into many independent splits, which are processed in parallel by multiple Map tasks.
◼ Yarn serves as a resource scheduling platform and is responsible for adjusting the resources occupied by various computing frameworks based on their load demands, achieving cluster resource sharing and elastic resource scaling.
4.1 HDFS
4.1.1 HDFS Architecture
HDFS architecture adopts a master-slave structure model, with an HDFS cluster typically including:
(1) A NameNode, which acts as the central server responsible for managing the file system’s namespace and client access to files.
(2) Multiple DataNodes, each running a datanode process responsible for handling client read/write requests and performing operations such as block creation, deletion, and replication under the unified scheduling of the NameNode. The data of DataNodes is actually stored in the local Linux file system.
4.1.2 HDFS Storage Principles
To ensure system fault tolerance and availability, HDFS uses multiple replicas to redundantly store data, with multiple replicas of a data block typically distributed across different DataNodes. Clients prioritize using data on the same rack. Advantages:
(1) Speeds up data transmission
(2) Easily checks data errors
(3) Ensures data reliability
Replica Storage Strategy:
(1) First replica: Placed on the DataNode uploading the file; if submitted from outside the cluster, a node with not too full disk and not too busy CPU is randomly selected;
(2) Second replica: Placed on a node in a different rack from the first replica;
(3) Third replica: Placed on other nodes in the same rack as the first replica;
(3) More replicas: Random nodes.
4.1.3 Data Read and Write Process
Writing Files
The client requests to write a file to the Namenode, which prepares and informs the client when ready. The client receives confirmation and repeatedly executes the following steps until the data is written:
-
Requests a block from the Namenode, which selects DataNodes according to rules and informs the client.
-
The client sends data to the specified Datanode, which receives and writes the data locally.
Reading Files
The client requests to read a file from the Namenode, which prepares and returns the file’s metadata. The client receives the metadata and requests the corresponding block data from the relevant Datanode, finally merging to form the complete file content.
4.1.4 Data Errors and Recovery
- NameNode Error
The NameNode stores all metadata information, and if it fails, the entire HDFS cluster will fail. HDFS sets up a checkpoint mechanism to periodically replicate this metadata to the backup server SecondaryNameNode. When the NameNode fails, NameNode metadata can be recovered based on the SecondaryNameNode.
- DataNode Error
- Each DataNode periodically sends heartbeat information to the NameNode, reporting its status.
- When a DataNode fails or experiences network anomalies, and the NameNode cannot receive heartbeat information from the DataNode, the DataNode is marked as down, and all data on the node is marked as unreadable, with the NameNode no longer sending any I/O requests to them.
- The NameNode periodically checks the number of replicas of data blocks, and if it is less than the redundancy factor, it initiates data redundancy replication.
- Data Error
Network transmission and disk errors can cause data errors. The client verifies the data block using md5 and sha1 after reading it to ensure correct data is read.
4.2 MapReduce Architecture
4.2.1 Computing Model
- Abstracts the parallel computing process on large-scale clusters into two functions: Map and Reduce.
- Programming is easy, allowing users to run their programs on distributed systems without mastering the cumbersome details of distributed parallel programming, achieving massive data computation.
- Adopts a “divide and conquer” strategy, where large-scale datasets stored in the distributed file system are split into many independent splits, which are processed in parallel by multiple Map tasks.
- The design philosophy is “bringing computation closer to data” rather than “bringing data closer to computation,” as moving data requires significant network transmission overhead.
- Adopts a Master/Slave architecture, including a Master and several Slaves (or Workers). The Master runs JobTracker, responsible for job scheduling, processing, and failure recovery, while Slaves run TaskTracker, responsible for receiving job instructions from JobTracker.
4.2.2 Four Components
- Client:
- a. Users write MapReduce programs and submit them to the JobTracker via the Client.
- b. Users can view job running status through interfaces provided by the Client.
- Job Tracker:
- a. Responsible for resource monitoring and job scheduling,
- b. Monitors the health status of all Task Trackers and Jobs, transferring tasks to other nodes upon failure.
- c. JobTracker tracks task execution progress, resource usage, and other information, and informs the task scheduler (TaskScheduler, pluggable, customizable), which selects suitable tasks to use available resources when resources become available.
- Task Tracker:
- a. Task Tracker periodically reports resource usage and task running progress on the local node to the JobTracker via “heartbeat,” while receiving commands from the JobTracker and executing corresponding operations (e.g., starting new tasks, killing tasks).
- b. Task Tracker uses “slots” to equally divide resources (CPU, memory, etc.) on the local node. A Task can only run after obtaining a slot, and the Hadoop task scheduler’s role is to allocate available slots on each TaskTracker to Tasks. (Slots are divided into Map slots and Reduce slots, used by MapTasks and Reduce Tasks, respectively.)
- Task: Divided into Map Task and Reduce Task, both started by Task Tracker.
Working Process
(1) Program deployment; (2) Assigning Map tasks and Reduce tasks; (3) Map nodes read data, execute map tasks, and spill intermediate results; (4) Reduce nodes receive intermediate result data and execute reduce tasks; (5) Write execution results to HDFS.
4.3 Yarn Architecture
4.3.0.1 Resource Manager
Handles client requests, monitors NodeManager, starts and monitors Application Master, resource scheduling and allocation
Global resource manager, responsible for resource management and allocation in the entire system. Includes two components: Scheduler and Applications Manager.
(1) Scheduler allocates cluster resources in the form of “containers” to applications that request them. Container selection usually considers the location of data to be processed by the application, choosing nearby locations to achieve “bringing computation closer to data.” The scheduler is a pluggable component, and YARN provides many directly usable schedulers while allowing users to design their own schedulers based on their needs.
(2) Applications Manager manages all applications in the system, including application submission, negotiating resources with the scheduler to start ApplicationMaster, monitoring ApplicationMaster running status, and restarting in case of failure.
4.3.0.2 Application Master
Main functions:
(1) After the user job is submitted, the ApplicationMaster negotiates with the Resource Manager to obtain resources, and the ResourceManager allocates resources to the Application Master in the form of containers.
(2) The ApplicationMaster further allocates the obtained resources to internal tasks (Map or Reduce tasks), achieving “secondary allocation” of resources.
(3) Maintains interactive communication with NodeManager for application startup, operation, monitoring, and stopping, monitors the usage of allocated resources, monitors the execution progress and status of all tasks, and performs failure recovery (i.e., reapplying resources to restart tasks) in case of task failure.
(4) Periodically sends “heartbeat” messages to the ResourceManager, reporting resource usage and application progress information.
(5) When the job is completed, the ApplicationMaster deregisters the container from the ResourceManager and closes itself, completing the execution cycle.
4.3.0.3 Node Manager
NodeManager is an agent residing on each node in the YARN cluster, mainly responsible for:
- Container lifecycle management, monitoring resource (CPU, memory, etc.) usage of each container
- Tracking node health status, maintaining communication with the ResourceManager via “heartbeat,” reporting job resource usage and container running status,
- Receiving various requests from the ApplicationMaster to start/stop containers. Note: NodeManager mainly manages abstract containers, only handling container-related matters, and does not specifically manage the state of each task (Map task or Reduce task). Task state management is completed by the ApplicationMaster, which continuously communicates with NodeManager to grasp the execution status of each task.
JobHistoryServer: Unified management of YARN historical tasks.
WebAppProxyServer: Web page proxy during task execution. Responsible for monitoring the entire execution process of specific MapReduce tasks, collecting task execution information from Containers, and displaying it on a Web interface.
4.3.0.4 Yarn Working Process
Step 1: Users write client applications and submit them to YARN, including the ApplicationMaster program, commands to start the ApplicationMaster, user programs, etc.
Step 2: The ResourceManager in YARN is responsible for receiving and processing requests from clients, allocating a container for the application, and starting an ApplicationMaster in that container.
Step 3: After being created, the ApplicationMaster first registers with the ResourceManager.
Step 4: The ApplicationMaster uses polling to request resources from the ResourceManager.
Step 5: The ResourceManager allocates resources in the form of “containers” to the ApplicationMaster that requested them.
Step 6: Start tasks (runtime environment, scripts) in containers.
Step 7: Each task reports its status and progress to the ApplicationMaster.
Step 8: After the application runs, the ApplicationMaster deregisters from the ResourceManager’s application manager and closes itself.
4.3.0.5 The Role of Unified Deployment of YARN
- Deploying a unified resource scheduling framework YARN in the cluster, deploying various computing frameworks on top of YARN.
- YARN provides unified resource scheduling management services for these computing frameworks and adjusts the resources occupied by each according to their load demands, achieving cluster resource sharing and elastic resource scaling.
- Achieves different applications and load mixing on a single cluster, effectively improving cluster utilization.
- Different computing frameworks can share underlying storage, avoiding cross-cluster data set movement.
5 ZooKeeper
Distributed applications can achieve the following functions through Zookeeper: configuration management, naming service, distributed lock, cluster management.
5.1 Architecture
Leader
-
The only scheduler and processor of transaction requests (write operations) in the cluster, ensuring the order of transactions in the cluster.
-
The scheduler of various services within the cluster. Follower
-
Handles non-transaction requests (read operations) in the cluster.
-
Forwards transaction requests to the Leader.
-
Participates in voting on transaction request proposals.
-
Participates in Leader election voting. Observer
-
Handles non-transaction requests from clients.
-
Forwards transaction requests to the Leader.
-
Provides data read-only services without participating in any form of voting. The Leader/Followers cluster architecture gives Zookeeper master-slave and master-backup capabilities.
(1) Master-Slave: The master node assigns tasks, and the slave nodes execute tasks.
(2) Master-Backup: The master node and backup nodes, when the master node fails, quickly elect a new master node from Followers to ensure the master node does not crash.
5.2 Three Types of Znodes
- Persistent Node (PERSISTENT): The node remains even after the client disconnects from Zookeeper. By default, all znodes are persistent.
- Ephemeral Node (EPHEMERAL): The node is valid when the client is active, and it is automatically deleted when the client disconnects from Zookeeper. Ephemeral nodes are not allowed to have child nodes and play an important role in leader election.
- Sequential Node (SEQUENTIAL): Nodes with sequential numbering. Sequential nodes can be persistent or ephemeral, thus they can be persistent sequential nodes (PERSISTENT_SEQUENTIAL) or ephemeral sequential nodes (EPHEMERAL_SEQUENTIAL). Znode Primitive Operations:
Create Node (create)
Delete Node (delete)
Update Node (set)
Get Node Information (get)
Access Control (getAcl/setAcl)
Event Watching (watch)
5.3 Znode Node Operation Characteristics
- When multiple machines simultaneously create a node, only one will succeed, which can be used as a distributed lock.
- The lifecycle of an ephemeral node is consistent with the session, and the node is deleted when the session ends, commonly used for heartbeat, monitoring, load, etc.
- Sequential nodes ensure globally unique node names, which can be used as a globally incrementing ID in distributed environments.
- Clients register to watch concerned directory nodes, and when the directory node data changes, Zookeeper notifies the client.
5.4 Services Provided by Zookeeper
Configuration Management
Unified Naming Service
Cluster Management
Distributed Lock (Shared Lock, Exclusive Lock)
How Zookeeper Implements Exclusive Lock:
(1) Representation of Exclusive Lock: An exclusive lock is represented by a znode, such as /x_lock/lock.
(2) Acquiring the Lock: All clients attempt to create a temporary child node /x_lock/lock under /x_lock by calling the create interface. Of course, only one client will ultimately succeed, indicating that the client has acquired the exclusive lock. At the same time, other clients that did not acquire the lock register a Watcher to monitor changes in child nodes.
(3) Releasing the Lock: The client that acquired the lock crashes or completes its business logic normally, deleting the temporary child node, indicating that the exclusive lock has been released. After the temporary child node is deleted, other clients begin a new round of lock acquisition.
6 Kafka
6.1 Two Models of Message Queues
Point-to-Point Model and Publish-Subscribe Model
Point-to-point cannot achieve message broadcasting and multicasting.
6.2 Five Application Scenarios of Message Queues
Application Decoupling, Asynchronous Communication, Traffic Shaving, Log Processing, Message Communication
6.3 Architecture
- Producer uses push mode to publish messages to the broker.
- Consumer uses pull mode to subscribe to and consume messages from the broker.
- Broker (Kafka server, multiple brokers form a Kafka cluster) How are messages published to Kafka organized and stored to achieve load balancing and high throughput?
-
Messages published to the Kafka cluster have a category called Topic.
-
Partition is an actual ordered queue storing messages, with each message assigned a unique identifier (called offset).
-
Each Topic can be stored in one or more Partitions. Messages sent to the Broker are stored in a Partition based on partition rules. If partition rules are set reasonably, all messages can be evenly distributed across different Partitions, balancing request load across cluster nodes and improving throughput.
-
Any message published to a Partition is appended to the end of the Partition. This sequential disk write operation is a key reason for Kafka’s high throughput. How does the Kafka cluster use replicas to achieve availability and fault tolerance?
-
In Kafka, a Partition has multiple replicas (Replication) on different Brokers in the cluster.
-
In a multi-replica Partition, only one replica is the Leader, and the others are Followers.
-
The Leader is responsible for handling all read and write operations for the partition, while Followers passively replicate data from the Leader. When the Leader fails, a Follower is elected as the Leader through Zookeeper. How does the Kafka cluster implement point-to-point and publish-subscribe models?
-
Kafka implements the point-to-point model. If all consumers belong to the same consumer group, all messages will be evenly delivered to each consumer, meaning each message is processed by only one consumer, equivalent to the point-to-point model.
-
Kafka implements the publish/subscribe model. If all consumers belong to different consumer groups, all messages will be broadcast to all consumers, meaning each message is processed by all consumers, equivalent to the publish/subscribe model.


