引言

在当今大数据时代,Hadoop和Spark已成为处理海量数据的核心技术栈。Hadoop作为分布式存储和计算的基石,提供了可靠的数据存储和批处理能力;而Spark则凭借其内存计算和流处理能力,显著提升了数据处理的效率和灵活性。本指南将系统性地介绍从数据处理到性能优化的完整实践路径,帮助读者掌握这两项关键技术的实战应用。

一、Hadoop生态系统概述

1.1 Hadoop核心组件

Hadoop生态系统主要由三个核心组件构成:

  • HDFS(Hadoop Distributed File System):分布式文件系统,提供高吞吐量的数据访问,适合大规模数据集的存储。
  • MapReduce:分布式计算框架,将计算任务分解为Map和Reduce两个阶段,实现并行处理。
  • YARN(Yet Another Resource Negotiator):资源管理和作业调度平台,负责集群资源的分配和任务调度。

1.2 Hadoop生态系统扩展组件

除了核心组件,Hadoop生态系统还包括:

  • HBase:分布式列式数据库,支持实时读写访问。
  • Hive:数据仓库工具,提供类SQL查询语言(HQL)。
  • Pig:高级数据流语言和执行框架。
  • ZooKeeper:分布式协调服务。

二、Hadoop实战:数据处理基础

2.1 环境搭建与配置

2.1.1 单节点伪分布式部署

以下是在Linux系统上部署Hadoop伪分布式模式的步骤:

# 1. 安装Java环境
sudo apt-get update
sudo apt-get install openjdk-8-jdk

# 2. 下载Hadoop
wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
tar -xzf hadoop-3.3.1.tar.gz
sudo mv hadoop-3.3.1 /usr/local/hadoop

# 3. 配置环境变量
echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64' >> ~/.bashrc
echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc
echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrc
source ~/.bashrc

# 4. 配置Hadoop
cd /usr/local/hadoop/etc/hadoop

# 编辑core-site.xml
cat > core-site.xml << EOF
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
EOF

# 编辑hdfs-site.xml
cat > hdfs-site.xml << EOF
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>
EOF

# 5. 格式化HDFS
hdfs namenode -format

# 6. 启动Hadoop
start-dfs.sh
start-yarn.sh

# 7. 验证安装
jps  # 应该看到NameNode, DataNode, ResourceManager, NodeManager等进程

2.1.2 集群部署配置

对于生产环境,需要配置多节点集群。以下是关键配置文件示例:

core-site.xml(集群模式):

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://namenode:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop/tmp</value>
    </property>
</configuration>

hdfs-site.xml

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/opt/hadoop/namenode</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/opt/hadoop/datanode</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
</configuration>

yarn-site.xml

<configuration>
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>8192</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>4</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>8192</value>
    </property>
</configuration>

2.2 MapReduce编程实战

2.2.1 WordCount示例

WordCount是MapReduce的经典示例,用于统计文本中每个单词的出现次数。

// WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

编译与运行

# 编译
javac -classpath $(hadoop classpath) WordCount.java

# 打包
jar cf wc.jar WordCount*.class

# 运行
hadoop jar wc.jar WordCount /input /output

2.2.2 自定义InputFormat和OutputFormat

当处理非标准数据格式时,需要自定义InputFormat和OutputFormat。

// 自定义InputFormat示例:处理CSV文件
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class CSVInputFormat extends FileInputFormat<Text, Text> {
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, 
                                                      TaskAttemptContext context) 
                                                      throws IOException, InterruptedException {
        return new CSVRecordReader();
    }
    
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return true;
    }
}

// CSVRecordReader实现
class CSVRecordReader extends RecordReader<Text, Text> {
    // 实现RecordReader接口方法
    // ... 具体实现省略
}

2.3 Hadoop Streaming实战

Hadoop Streaming允许使用任何可执行文件或脚本作为Mapper和Reducer,特别适合Python、Ruby等脚本语言。

2.3.1 Python WordCount示例

# mapper.py
#!/usr/bin/env python
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            print(f"{word}\t1")

if __name__ == "__main__":
    main()
# reducer.py
#!/usr/bin/env python
import sys

def main():
    current_word = None
    current_count = 0
    
    for line in sys.stdin:
        line = line.strip()
        word, count = line.split('\t', 1)
        
        if current_word == word:
            current_count += int(count)
        else:
            if current_word:
                print(f"{current_word}\t{current_count}")
            current_word = word
            current_count = int(count)
    
    if current_word:
        print(f"{current_word}\t{current_count}")

if __name__ == "__main__":
    main()

运行命令

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -files mapper.py,reducer.py \
    -input /input \
    -output /output \
    -mapper "python mapper.py" \
    -reducer "python reducer.py"

三、Spark实战:高性能数据处理

3.1 Spark架构与核心概念

3.1.1 Spark架构组件

  • Driver:应用程序的主进程,负责创建SparkContext、DAGScheduler和TaskScheduler。
  • Executor:工作节点上的进程,负责执行任务。
  • Cluster Manager:集群资源管理器(如YARN、Mesos、Standalone)。
  • RDD(弹性分布式数据集):Spark的基本数据抽象,支持并行操作。

3.1.2 Spark核心概念

  • DAG(有向无环图):Spark将作业转换为DAG,优化执行计划。
  • Stage:DAG中的阶段,包含一组并行任务。
  • Task:在Executor上执行的最小工作单元。

3.2 Spark环境搭建

3.2.1 本地模式部署

# 1. 安装Java(同Hadoop)
# 2. 下载Spark
wget https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar -xzf spark-3.3.0-bin-hadoop3.tgz
sudo mv spark-3.3.0-bin-hadoop3 /usr/local/spark

# 3. 配置环境变量
echo 'export SPARK_HOME=/usr/local/spark' >> ~/.bashrc
echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' >> ~/.bashrc
source ~/.bashrc

# 4. 配置Spark(可选)
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh
echo 'export SPARK_MASTER_HOST=localhost' >> spark-env.sh
echo 'export SPARK_WORKER_CORES=2' >> spark-env.sh
echo 'export SPARK_WORKER_MEMORY=2g' >> spark-env.sh

# 5. 启动Spark
start-master.sh
start-worker.sh spark://localhost:7077

# 6. 验证安装
spark-shell --master local[2]

3.2.2 Spark on YARN部署

# 配置spark-env.sh
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh

# 添加以下配置
echo 'export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop' >> spark-env.sh
echo 'export SPARK_HOME=/usr/local/spark' >> spark-env.sh
echo 'export SPARK_LOG_DIR=/opt/spark/logs' >> spark-env.sh
echo 'export SPARK_WORKER_DIR=/opt/spark/work' >> spark-env.sh

# 运行Spark on YARN
spark-submit --master yarn --deploy-mode cluster \
    --class org.apache.spark.examples.SparkPi \
    $SPARK_HOME/examples/jars/spark-examples_2.12-3.3.0.jar 10

3.3 Spark编程实战

3.3.1 WordCount示例

// Scala版本
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    
    val textFile = sc.textFile("hdfs://namenode:9000/input")
    val wordCounts = textFile
      .flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    
    wordCounts.saveAsTextFile("hdfs://namenode:9000/output")
    sc.stop()
  }
}
# Python版本(PySpark)
from pyspark import SparkContext, SparkConf

def main():
    conf = SparkConf().setAppName("WordCount")
    sc = SparkContext(conf=conf)
    
    text_file = sc.textFile("hdfs://namenode:9000/input")
    word_counts = text_file \
        .flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    
    word_counts.saveAsTextFile("hdfs://namenode:9000/output")
    sc.stop()

if __name__ == "__main__":
    main()

运行命令

# Scala版本编译运行
sbt package
spark-submit --class WordCount target/scala-2.12/wordcount_2.12-1.0.jar

# Python版本运行
spark-submit wordcount.py

3.3.2 Spark SQL实战

Spark SQL提供了DataFrame API和SQL接口,支持结构化数据处理。

# PySpark DataFrame示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg

def main():
    spark = SparkSession.builder \
        .appName("SparkSQLExample") \
        .getOrCreate()
    
    # 读取JSON数据
    df = spark.read.json("hdfs://namenode:9000/data/users.json")
    
    # 显示数据结构
    df.printSchema()
    df.show(5)
    
    # SQL查询
    df.createOrReplaceTempView("users")
    result = spark.sql("""
        SELECT country, 
               COUNT(*) as user_count,
               AVG(age) as avg_age
        FROM users
        GROUP BY country
        ORDER BY user_count DESC
    """)
    result.show()
    
    # DataFrame API
    result_df = df.groupBy("country") \
        .agg(
            count("*").alias("user_count"),
            avg("age").alias("avg_age")
        ) \
        .orderBy(col("user_count").desc())
    
    result_df.show()
    
    spark.stop()

if __name__ == "__main__":
    main()

3.3.3 Spark Streaming实战

Spark Streaming是Spark的流处理模块,支持微批处理。

# PySpark Streaming示例:实时词频统计
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def main():
    # 创建SparkContext和StreamingContext
    sc = SparkContext("local[2]", "NetworkWordCount")
    ssc = StreamingContext(sc, 1)  # 批处理间隔1秒
    
    # 创建DStream,监听本地9999端口
    lines = ssc.socketTextStream("localhost", 9999)
    
    # 处理逻辑:词频统计
    words = lines.flatMap(lambda line: line.split(" "))
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda a, b: a + b)
    
    # 打印结果
    wordCounts.pprint()
    
    # 启动流处理
    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

测试命令

# 在终端1启动netcat服务器
nc -lk 9999

# 在终端2运行Spark Streaming程序
spark-submit streaming_wordcount.py

四、Hadoop与Spark集成实战

4.1 Spark读取HDFS数据

# PySpark读取HDFS数据
from pyspark.sql import SparkSession

def main():
    spark = SparkSession.builder \
        .appName("SparkReadHDFS") \
        .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/spark-warehouse") \
        .getOrCreate()
    
    # 读取HDFS上的Parquet文件
    df = spark.read.parquet("hdfs://namenode:9000/data/sales.parquet")
    
    # 读取HDFS上的CSV文件
    df_csv = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("hdfs://namenode:9000/data/customers.csv")
    
    # 处理数据
    result = df.groupBy("product_id") \
        .agg({"amount": "sum", "quantity": "sum"}) \
        .orderBy("sum(amount)", ascending=False)
    
    # 写回HDFS
    result.write \
        .mode("overwrite") \
        .parquet("hdfs://namenode:9000/output/sales_summary.parquet")
    
    spark.stop()

if __name__ == "__main__":
    main()

4.2 Spark与Hive集成

# PySpark与Hive集成
from pyspark.sql import SparkSession

def main():
    spark = SparkSession.builder \
        .appName("SparkHiveIntegration") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # 查询Hive表
    result = spark.sql("SELECT * FROM hive_db.sales_table WHERE year = 2023")
    
    # 创建Hive表
    spark.sql("""
        CREATE TABLE IF NOT EXISTS hive_db.spark_result (
            product_id STRING,
            total_sales DOUBLE,
            total_quantity INT
        )
        USING PARQUET
        LOCATION 'hdfs://namenode:9000/hive/spark_result'
    """)
    
    # 将结果写入Hive表
    result.write.mode("overwrite").saveAsTable("hive_db.spark_result")
    
    spark.stop()

if __name__ == "__main__":
    main()

五、性能优化策略

5.1 Hadoop性能优化

5.1.1 MapReduce优化技巧

  1. Combiner优化:在Map端进行局部聚合,减少网络传输。
// 在WordCount中使用Combiner
job.setCombinerClass(IntSumReducer.class);  // 使用Reducer作为Combiner
  1. 压缩优化:使用压缩减少I/O开销。
<!-- mapred-site.xml配置 -->
<property>
    <name>mapreduce.map.output.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
  1. 数据本地化:尽量将计算任务分配到数据所在的节点。
<!-- yarn-site.xml配置 -->
<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>8192</value>
</property>
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>4</value>
</property>

5.1.2 HDFS优化

  1. 块大小调整:根据数据特性调整HDFS块大小。
<!-- hdfs-site.xml配置 -->
<property>
    <name>dfs.blocksize</name>
    <value>268435456</value>  <!-- 256MB -->
</property>
  1. 副本因子调整:根据数据重要性和集群规模调整。
<property>
    <name>dfs.replication</name>
    <value>3</value>  <!-- 生产环境通常为3 -->
</property>

5.2 Spark性能优化

5.2.1 内存管理优化

  1. RDD缓存策略:合理使用缓存级别。
# PySpark缓存示例
from pyspark.storagelevel import StorageLevel

# 使用MEMORY_ONLY缓存
rdd.cache()  # 等价于 rdd.persist(StorageLevel.MEMORY_ONLY)

# 使用MEMORY_AND_DISK缓存(当内存不足时溢出到磁盘)
rdd.persist(StorageLevel.MEMORY_AND_DISK)

# 使用MEMORY_ONLY_SER(序列化存储,节省空间)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
  1. Shuffle优化:减少Shuffle数据量。
# 使用reduceByKey代替groupByKey
# 不推荐:groupByKey会导致全量数据Shuffle
rdd.groupByKey().mapValues(sum)

# 推荐:reduceByKey在Map端进行局部聚合
rdd.reduceByKey(lambda a, b: a + b)

# 使用mapPartitions代替map
def process_partition(iterator):
    # 在分区级别进行处理,减少函数调用开销
    for item in iterator:
        yield item * 2

rdd.mapPartitions(process_partition)

5.2.2 数据倾斜处理

数据倾斜是Spark性能问题的常见原因,以下是解决方案:

# 解决数据倾斜的几种方法

# 方法1:加盐(Salting)
def add_salt(key, salt_count=10):
    import random
    salt = random.randint(0, salt_count - 1)
    return f"{key}_{salt}"

# 对倾斜的key加盐
rdd_with_salt = rdd.map(lambda x: (add_salt(x[0]), x[1]))

# 聚合后去除盐
result = rdd_with_salt.reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0].split('_')[0], x[1])) \
    .reduceByKey(lambda a, b: a + b)

# 方法2:广播小表
# 假设有一个倾斜的大表和一个小表
large_rdd = ...  # 倾斜的大表
small_rdd = ...  # 小表(可以广播)

# 将小表广播到所有Executor
small_broadcast = sc.broadcast(small_rdd.collectAsMap())

# 使用广播变量进行join
result = large_rdd.map(lambda x: (x[0], (x[1], small_broadcast.value.get(x[0], None))))

5.2.3 配置参数优化

# Spark配置参数优化示例
from pyspark import SparkConf, SparkContext

def create_optimized_spark_context():
    conf = SparkConf()
    
    # 内存相关配置
    conf.set("spark.executor.memory", "4g")  # 每个Executor内存
    conf.set("spark.executor.memoryOverhead", "512m")  # 额外内存
    conf.set("spark.driver.memory", "2g")  # Driver内存
    
    # 并行度配置
    conf.set("spark.default.parallelism", "200")  # 默认并行度
    conf.set("spark.sql.shuffle.partitions", "200")  # Shuffle分区数
    
    # 序列化配置
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrationRequired", "false")
    
    # 动态资源分配
    conf.set("spark.dynamicAllocation.enabled", "true")
    conf.set("spark.dynamicAllocation.minExecutors", "2")
    conf.set("spark.dynamicAllocation.maxExecutors", "10")
    
    # Shuffle优化
    conf.set("spark.shuffle.service.enabled", "true")
    conf.set("spark.shuffle.compress", "true")
    conf.set("spark.shuffle.spill.compress", "true")
    
    # 数据本地化
    conf.set("spark.locality.wait", "3s")
    
    return SparkContext(conf=conf)

5.3 Hadoop与Spark性能对比与选择

特性 Hadoop MapReduce Spark
计算模型 批处理,磁盘I/O为主 内存计算,支持批处理、流处理、交互式查询
延迟 高(分钟级) 低(秒级)
容错机制 任务重试,数据复制 RDD血统,检查点
适用场景 大规模批处理,ETL任务 实时分析,迭代算法,交互式查询
编程模型 MapReduce API RDD、DataFrame、Dataset API
生态系统 Hive、Pig、HBase等 Spark SQL、MLlib、GraphX、Spark Streaming

选择建议

  • 选择Hadoop:当需要处理PB级数据,对延迟要求不高,且已有Hadoop基础设施时。
  • 选择Spark:当需要低延迟处理,迭代算法,或需要流处理和批处理统一时。
  • 混合使用:使用Hadoop存储(HDFS)和Spark计算,发挥各自优势。

六、实战案例:电商数据分析系统

6.1 案例背景

某电商平台需要分析用户行为数据,包括:

  • 用户浏览日志
  • 订单数据
  • 商品信息
  • 用户画像数据

6.2 数据处理流程

6.2.1 数据采集与存储

# 使用Flume采集日志数据到HDFS
# flume.conf配置示例
agent.sources = r1
agent.channels = c1
agent.sinks = k1

# 定义source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /var/log/nginx/access.log
agent.sources.r1.channels = c1

# 定义channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000

# 定义sink
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://namenode:9000/flume/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text
agent.sinks.k1.channel = c1

6.2.2 数据清洗与转换

# PySpark数据清洗示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract, to_date

def clean_user_logs(spark, input_path, output_path):
    # 读取原始日志
    raw_logs = spark.read.text(input_path)
    
    # 解析日志格式(假设为Nginx日志格式)
    log_pattern = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'
    
    parsed_logs = raw_logs.select(
        regexp_extract('value', log_pattern, 1).alias('ip'),
        regexp_extract('value', log_pattern, 2).alias('user_id'),
        regexp_extract('value', log_pattern, 3).alias('session_id'),
        regexp_extract('value', log_pattern, 4).alias('timestamp'),
        regexp_extract('value', log_pattern, 5).alias('method'),
        regexp_extract('value', log_pattern, 6).alias('url'),
        regexp_extract('value', log_pattern, 7).alias('protocol'),
        regexp_extract('value', log_pattern, 8).cast('int').alias('status'),
        regexp_extract('value', log_pattern, 9).cast('int').alias('size')
    )
    
    # 数据清洗
    cleaned_logs = parsed_logs \
        .filter(col('status').isNotNull()) \
        .filter(col('user_id') != '-') \
        .withColumn('date', to_date(col('timestamp'), 'dd/MMM/yyyy:HH:mm:ss')) \
        .withColumn('url_category', 
            when(col('url').contains('/product/'), 'product')
            .when(col('url').contains('/category/'), 'category')
            .when(col('url').contains('/cart'), 'cart')
            .when(col('url').contains('/checkout'), 'checkout')
            .otherwise('other')
        )
    
    # 写入Parquet格式(列式存储,压缩)
    cleaned_logs.write \
        .mode('overwrite') \
        .partitionBy('date') \
        .parquet(output_path)
    
    return cleaned_logs

# 使用示例
if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("DataCleaning") \
        .getOrCreate()
    
    clean_user_logs(
        spark,
        "hdfs://namenode:9000/raw/logs",
        "hdfs://namenode:9000/cleaned/logs"
    )
    
    spark.stop()

6.2.3 用户行为分析

# 用户行为分析:计算用户访问频率和转化率
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, sum as spark_sum

def analyze_user_behavior(spark, logs_path, orders_path):
    # 读取清洗后的日志
    logs = spark.read.parquet(logs_path)
    
    # 读取订单数据
    orders = spark.read.parquet(orders_path)
    
    # 计算用户访问频率
    user_activity = logs.groupBy("user_id") \
        .agg(
            count("session_id").alias("visit_count"),
            countDistinct("date").alias("active_days"),
            spark_sum(when(col("url_category") == "product", 1).otherwise(0)).alias("product_views")
        )
    
    # 计算转化率(访问到购买的转化)
    user_conversions = logs.join(orders, "user_id", "left") \
        .groupBy("user_id") \
        .agg(
            count("session_id").alias("total_visits"),
            spark_sum(when(col("order_id").isNotNull(), 1).otherwise(0)).alias("order_count")
        ) \
        .withColumn("conversion_rate", 
            when(col("total_visits") > 0, col("order_count") / col("total_visits"))
            .otherwise(0)
        )
    
    # 用户分群(RFM模型)
    user_rfm = user_conversions \
        .withColumn("recency", 
            when(col("order_count") > 0, 1).otherwise(0)
        ) \
        .withColumn("frequency", col("order_count")) \
        .withColumn("monetary", 
            when(col("order_count") > 0, 100).otherwise(0)  # 假设每单平均100元
        )
    
    # 保存结果
    user_activity.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_activity")
    user_conversions.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_conversions")
    user_rfm.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_rfm")
    
    return user_activity, user_conversions, user_rfm

# 使用示例
if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("UserBehaviorAnalysis") \
        .getOrCreate()
    
    analyze_user_behavior(
        spark,
        "hdfs://namenode:9000/cleaned/logs",
        "hdfs://namenode:9000/data/orders"
    )
    
    spark.stop()

6.2.4 实时推荐系统

# 基于Spark Streaming的实时推荐
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.recommendation import ALS, Rating
import json

def real_time_recommendation():
    # 创建SparkContext和StreamingContext
    sc = SparkContext("local[2]", "RealTimeRecommendation")
    ssc = StreamingContext(sc, 5)  # 批处理间隔5秒
    
    # 读取历史数据(用户-商品评分)
    historical_data = sc.textFile("hdfs://namenode:9000/data/ratings.csv") \
        .map(lambda line: line.split(",")) \
        .map(lambda parts: Rating(int(parts[0]), int(parts[1]), float(parts[2])))
    
    # 训练ALS模型
    model = ALS.train(historical_data, rank=10, iterations=10, lambda_=0.01)
    
    # 广播模型
    model_broadcast = sc.broadcast(model)
    
    # 处理实时用户行为流
    user_actions = ssc.socketTextStream("localhost", 9999)
    
    def recommend_for_user(user_id):
        """为用户生成推荐"""
        model = model_broadcast.value
        # 获取用户未评分的商品
        user_products = sc.parallelize(range(1, 1000))  # 假设商品ID范围
        user_ratings = user_products.map(lambda p: (user_id, p))
        predictions = model.predictAll(user_ratings)
        # 返回Top 10推荐
        return predictions.sortBy(lambda x: x.rating, False).take(10)
    
    # 处理每个批次
    def process_batch(batch_rdd):
        if not batch_rdd.isEmpty():
            # 解析用户行为
            user_actions = batch_rdd.map(lambda x: json.loads(x))
            # 为每个用户生成推荐
            recommendations = user_actions.flatMap(
                lambda action: recommend_for_user(action['user_id'])
            )
            # 输出推荐结果
            recommendations.pprint()
    
    # 应用处理逻辑
    user_actions.foreachRDD(process_batch)
    
    # 启动流处理
    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    real_time_recommendation()

七、监控与运维

7.1 Hadoop监控

7.1.1 使用Hadoop Metrics

# 查看Hadoop Metrics
# 1. 启动Metrics系统
hadoop-daemon.sh start metrics

# 2. 配置Metrics输出到文件
# hadoop-metrics2.properties
*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
*.sink.file.filename=/var/log/hadoop/metrics.log
*.sink.file.interval=60

# 3. 查看Metrics
tail -f /var/log/hadoop/metrics.log

7.1.2 使用Ganglia监控

# 安装Ganglia
sudo apt-get install ganglia-monitor ganglia-webfrontend

# 配置Hadoop Metrics
# hadoop-metrics2.properties
*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink
*.sink.ganglia.period=10
*.sink.ganglia.supportsparse=true
*.sink.ganglia.server=ganglia-server:8649
*.sink.ganglia.slope=both
*.sink.ganglia.unit=seconds
*.sink.ganglia.tmax=60
*.sink.ganglia.dmax=0

7.2 Spark监控

7.2.1 Spark UI使用

Spark Web UI提供实时监控:

  • Application UI:http://:4040
  • History Server:http://:18080

7.2.2 使用Spark Metrics System

# 配置Spark Metrics
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
conf.set("spark.metrics.conf.*.sink.console.period", "10")
conf.set("spark.metrics.conf.*.sink.console.unit", "seconds")

# 启用详细日志
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.dir", "hdfs://namenode:9000/spark-logs")
conf.set("spark.history.fs.logDirectory", "hdfs://namenode:9000/spark-logs")

7.2.3 使用Prometheus和Grafana

# prometheus.yml配置
scrape_configs:
  - job_name: 'spark'
    static_configs:
      - targets: ['spark-master:8080', 'spark-worker1:8081', 'spark-worker2:8081']
# Spark配置Prometheus
conf.set("spark.metrics.conf.*.sink.prometheus.class", "org.apache.spark.metrics.sink.PrometheusSink")
conf.set("spark.metrics.conf.*.sink.prometheus.port", "8090")

八、故障排查与最佳实践

8.1 常见问题与解决方案

8.1.1 Hadoop常见问题

  1. NameNode无法启动 “`bash

    检查日志

    tail -f $HADOOP_HOME/logs/hadoop--namenode-.log

# 检查磁盘空间 df -h

# 检查端口占用 netstat -tuln | grep 9000


2. **DataNode无法启动**
   ```bash
   # 检查DataNode日志
   tail -f $HADOOP_HOME/logs/hadoop-*-datanode-*.log
   
   # 检查数据目录权限
   ls -ld /opt/hadoop/datanode
   
   # 检查磁盘空间
   df -h /opt/hadoop/datanode

8.1.2 Spark常见问题

  1. Executor OOM “`python

    解决方案1:增加Executor内存

    conf.set(“spark.executor.memory”, “8g”)

# 解决方案2:增加分区数 conf.set(“spark.sql.shuffle.partitions”, “500”)

# 解决方案3:使用Kryo序列化 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)


2. **数据倾斜**
   ```python
   # 检测数据倾斜
   rdd.mapPartitionsWithIndex(lambda idx, it: [(idx, len(list(it)))]) \
      .collect()
   
   # 解决方案:加盐
   salted_rdd = rdd.map(lambda x: (f"{x[0]}_{random.randint(0, 9)}", x[1]))

8.2 最佳实践总结

8.2.1 Hadoop最佳实践

  1. 数据存储

    • 使用Parquet/ORC格式存储,提高查询性能
    • 合理设置块大小(256MB-1GB)
    • 定期清理临时文件
  2. 作业优化

    • 使用Combiner减少Shuffle数据
    • 合理设置Map和Reduce任务数
    • 使用压缩减少I/O
  3. 集群管理

    • 定期备份NameNode元数据
    • 监控集群健康状态
    • 合理分配资源

8.2.2 Spark最佳实践

  1. 数据处理

    • 优先使用DataFrame API而非RDD
    • 合理使用缓存和持久化
    • 避免使用collect()收集大量数据
  2. 性能调优

    • 合理设置并行度
    • 优化Shuffle操作
    • 使用广播变量减少数据传输
  3. 资源管理

    • 使用动态资源分配
    • 合理设置Executor和Driver内存
    • 监控资源使用情况

九、未来趋势与扩展学习

9.1 技术演进

  • Hadoop 3.x:支持Erasure Coding,提高存储效率
  • Spark 3.x:自适应查询执行(AQE),动态分区裁剪
  • 云原生部署:Kubernetes上的Hadoop和Spark
  • 实时流处理:Structured Streaming成为主流

9.2 扩展学习路径

  1. 高级主题

    • Spark MLlib机器学习
    • Spark GraphX图计算
    • Delta Lake数据湖
  2. 云平台

    • AWS EMR
    • Google Dataproc
    • Azure HDInsight
  3. 相关技术

    • Apache Flink(流处理)
    • Apache Kafka(消息队列)
    • Apache Iceberg(表格式)

十、总结

本指南系统性地介绍了Hadoop和Spark的实战应用,从环境搭建、数据处理到性能优化,涵盖了完整的实践路径。通过具体的代码示例和实战案例,读者可以掌握这两项关键技术的核心应用。

关键要点回顾

  1. Hadoop:适合大规模批处理,提供可靠的存储和计算基础
  2. Spark:适合低延迟处理,支持多种计算模式
  3. 集成应用:HDFS作为存储层,Spark作为计算层,发挥各自优势
  4. 性能优化:从配置参数、数据处理策略到集群管理,全方位优化
  5. 监控运维:确保系统稳定运行,及时发现和解决问题

实践建议

  • 从单节点部署开始,逐步扩展到集群
  • 从简单示例(如WordCount)开始,逐步增加复杂度
  • 关注官方文档和社区,保持技术更新
  • 在生产环境中,务必进行充分的测试和性能调优

通过本指南的学习和实践,读者将能够构建高效、可靠的大数据处理系统,应对各种数据处理挑战。