引言
在当今大数据时代,Hadoop和Spark已成为处理海量数据的核心技术栈。Hadoop作为分布式存储和计算的基石,提供了可靠的数据存储和批处理能力;而Spark则凭借其内存计算和流处理能力,显著提升了数据处理的效率和灵活性。本指南将系统性地介绍从数据处理到性能优化的完整实践路径,帮助读者掌握这两项关键技术的实战应用。
一、Hadoop生态系统概述
1.1 Hadoop核心组件
Hadoop生态系统主要由三个核心组件构成:
- HDFS(Hadoop Distributed File System):分布式文件系统,提供高吞吐量的数据访问,适合大规模数据集的存储。
- MapReduce:分布式计算框架,将计算任务分解为Map和Reduce两个阶段,实现并行处理。
- YARN(Yet Another Resource Negotiator):资源管理和作业调度平台,负责集群资源的分配和任务调度。
1.2 Hadoop生态系统扩展组件
除了核心组件,Hadoop生态系统还包括:
- HBase:分布式列式数据库,支持实时读写访问。
- Hive:数据仓库工具,提供类SQL查询语言(HQL)。
- Pig:高级数据流语言和执行框架。
- ZooKeeper:分布式协调服务。
二、Hadoop实战:数据处理基础
2.1 环境搭建与配置
2.1.1 单节点伪分布式部署
以下是在Linux系统上部署Hadoop伪分布式模式的步骤:
# 1. 安装Java环境
sudo apt-get update
sudo apt-get install openjdk-8-jdk
# 2. 下载Hadoop
wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
tar -xzf hadoop-3.3.1.tar.gz
sudo mv hadoop-3.3.1 /usr/local/hadoop
# 3. 配置环境变量
echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64' >> ~/.bashrc
echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc
echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrc
source ~/.bashrc
# 4. 配置Hadoop
cd /usr/local/hadoop/etc/hadoop
# 编辑core-site.xml
cat > core-site.xml << EOF
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
EOF
# 编辑hdfs-site.xml
cat > hdfs-site.xml << EOF
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
EOF
# 5. 格式化HDFS
hdfs namenode -format
# 6. 启动Hadoop
start-dfs.sh
start-yarn.sh
# 7. 验证安装
jps # 应该看到NameNode, DataNode, ResourceManager, NodeManager等进程
2.1.2 集群部署配置
对于生产环境,需要配置多节点集群。以下是关键配置文件示例:
core-site.xml(集群模式):
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
</property>
</configuration>
hdfs-site.xml:
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/datanode</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
yarn-site.xml:
<configuration>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>8192</value>
</property>
</configuration>
2.2 MapReduce编程实战
2.2.1 WordCount示例
WordCount是MapReduce的经典示例,用于统计文本中每个单词的出现次数。
// WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译与运行:
# 编译
javac -classpath $(hadoop classpath) WordCount.java
# 打包
jar cf wc.jar WordCount*.class
# 运行
hadoop jar wc.jar WordCount /input /output
2.2.2 自定义InputFormat和OutputFormat
当处理非标准数据格式时,需要自定义InputFormat和OutputFormat。
// 自定义InputFormat示例:处理CSV文件
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class CSVInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
return new CSVRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
return true;
}
}
// CSVRecordReader实现
class CSVRecordReader extends RecordReader<Text, Text> {
// 实现RecordReader接口方法
// ... 具体实现省略
}
2.3 Hadoop Streaming实战
Hadoop Streaming允许使用任何可执行文件或脚本作为Mapper和Reducer,特别适合Python、Ruby等脚本语言。
2.3.1 Python WordCount示例
# mapper.py
#!/usr/bin/env python
import sys
def main():
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print(f"{word}\t1")
if __name__ == "__main__":
main()
# reducer.py
#!/usr/bin/env python
import sys
def main():
current_word = None
current_count = 0
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
if current_word == word:
current_count += int(count)
else:
if current_word:
print(f"{current_word}\t{current_count}")
current_word = word
current_count = int(count)
if current_word:
print(f"{current_word}\t{current_count}")
if __name__ == "__main__":
main()
运行命令:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input /input \
-output /output \
-mapper "python mapper.py" \
-reducer "python reducer.py"
三、Spark实战:高性能数据处理
3.1 Spark架构与核心概念
3.1.1 Spark架构组件
- Driver:应用程序的主进程,负责创建SparkContext、DAGScheduler和TaskScheduler。
- Executor:工作节点上的进程,负责执行任务。
- Cluster Manager:集群资源管理器(如YARN、Mesos、Standalone)。
- RDD(弹性分布式数据集):Spark的基本数据抽象,支持并行操作。
3.1.2 Spark核心概念
- DAG(有向无环图):Spark将作业转换为DAG,优化执行计划。
- Stage:DAG中的阶段,包含一组并行任务。
- Task:在Executor上执行的最小工作单元。
3.2 Spark环境搭建
3.2.1 本地模式部署
# 1. 安装Java(同Hadoop)
# 2. 下载Spark
wget https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar -xzf spark-3.3.0-bin-hadoop3.tgz
sudo mv spark-3.3.0-bin-hadoop3 /usr/local/spark
# 3. 配置环境变量
echo 'export SPARK_HOME=/usr/local/spark' >> ~/.bashrc
echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' >> ~/.bashrc
source ~/.bashrc
# 4. 配置Spark(可选)
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh
echo 'export SPARK_MASTER_HOST=localhost' >> spark-env.sh
echo 'export SPARK_WORKER_CORES=2' >> spark-env.sh
echo 'export SPARK_WORKER_MEMORY=2g' >> spark-env.sh
# 5. 启动Spark
start-master.sh
start-worker.sh spark://localhost:7077
# 6. 验证安装
spark-shell --master local[2]
3.2.2 Spark on YARN部署
# 配置spark-env.sh
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh
# 添加以下配置
echo 'export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop' >> spark-env.sh
echo 'export SPARK_HOME=/usr/local/spark' >> spark-env.sh
echo 'export SPARK_LOG_DIR=/opt/spark/logs' >> spark-env.sh
echo 'export SPARK_WORKER_DIR=/opt/spark/work' >> spark-env.sh
# 运行Spark on YARN
spark-submit --master yarn --deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/jars/spark-examples_2.12-3.3.0.jar 10
3.3 Spark编程实战
3.3.1 WordCount示例
// Scala版本
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
val textFile = sc.textFile("hdfs://namenode:9000/input")
val wordCounts = textFile
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile("hdfs://namenode:9000/output")
sc.stop()
}
}
# Python版本(PySpark)
from pyspark import SparkContext, SparkConf
def main():
conf = SparkConf().setAppName("WordCount")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs://namenode:9000/input")
word_counts = text_file \
.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
word_counts.saveAsTextFile("hdfs://namenode:9000/output")
sc.stop()
if __name__ == "__main__":
main()
运行命令:
# Scala版本编译运行
sbt package
spark-submit --class WordCount target/scala-2.12/wordcount_2.12-1.0.jar
# Python版本运行
spark-submit wordcount.py
3.3.2 Spark SQL实战
Spark SQL提供了DataFrame API和SQL接口,支持结构化数据处理。
# PySpark DataFrame示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg
def main():
spark = SparkSession.builder \
.appName("SparkSQLExample") \
.getOrCreate()
# 读取JSON数据
df = spark.read.json("hdfs://namenode:9000/data/users.json")
# 显示数据结构
df.printSchema()
df.show(5)
# SQL查询
df.createOrReplaceTempView("users")
result = spark.sql("""
SELECT country,
COUNT(*) as user_count,
AVG(age) as avg_age
FROM users
GROUP BY country
ORDER BY user_count DESC
""")
result.show()
# DataFrame API
result_df = df.groupBy("country") \
.agg(
count("*").alias("user_count"),
avg("age").alias("avg_age")
) \
.orderBy(col("user_count").desc())
result_df.show()
spark.stop()
if __name__ == "__main__":
main()
3.3.3 Spark Streaming实战
Spark Streaming是Spark的流处理模块,支持微批处理。
# PySpark Streaming示例:实时词频统计
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def main():
# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1) # 批处理间隔1秒
# 创建DStream,监听本地9999端口
lines = ssc.socketTextStream("localhost", 9999)
# 处理逻辑:词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda a, b: a + b)
# 打印结果
wordCounts.pprint()
# 启动流处理
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
测试命令:
# 在终端1启动netcat服务器
nc -lk 9999
# 在终端2运行Spark Streaming程序
spark-submit streaming_wordcount.py
四、Hadoop与Spark集成实战
4.1 Spark读取HDFS数据
# PySpark读取HDFS数据
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder \
.appName("SparkReadHDFS") \
.config("spark.sql.warehouse.dir", "hdfs://namenode:9000/spark-warehouse") \
.getOrCreate()
# 读取HDFS上的Parquet文件
df = spark.read.parquet("hdfs://namenode:9000/data/sales.parquet")
# 读取HDFS上的CSV文件
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("hdfs://namenode:9000/data/customers.csv")
# 处理数据
result = df.groupBy("product_id") \
.agg({"amount": "sum", "quantity": "sum"}) \
.orderBy("sum(amount)", ascending=False)
# 写回HDFS
result.write \
.mode("overwrite") \
.parquet("hdfs://namenode:9000/output/sales_summary.parquet")
spark.stop()
if __name__ == "__main__":
main()
4.2 Spark与Hive集成
# PySpark与Hive集成
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder \
.appName("SparkHiveIntegration") \
.enableHiveSupport() \
.getOrCreate()
# 查询Hive表
result = spark.sql("SELECT * FROM hive_db.sales_table WHERE year = 2023")
# 创建Hive表
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_db.spark_result (
product_id STRING,
total_sales DOUBLE,
total_quantity INT
)
USING PARQUET
LOCATION 'hdfs://namenode:9000/hive/spark_result'
""")
# 将结果写入Hive表
result.write.mode("overwrite").saveAsTable("hive_db.spark_result")
spark.stop()
if __name__ == "__main__":
main()
五、性能优化策略
5.1 Hadoop性能优化
5.1.1 MapReduce优化技巧
- Combiner优化:在Map端进行局部聚合,减少网络传输。
// 在WordCount中使用Combiner
job.setCombinerClass(IntSumReducer.class); // 使用Reducer作为Combiner
- 压缩优化:使用压缩减少I/O开销。
<!-- mapred-site.xml配置 -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
- 数据本地化:尽量将计算任务分配到数据所在的节点。
<!-- yarn-site.xml配置 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>
5.1.2 HDFS优化
- 块大小调整:根据数据特性调整HDFS块大小。
<!-- hdfs-site.xml配置 -->
<property>
<name>dfs.blocksize</name>
<value>268435456</value> <!-- 256MB -->
</property>
- 副本因子调整:根据数据重要性和集群规模调整。
<property>
<name>dfs.replication</name>
<value>3</value> <!-- 生产环境通常为3 -->
</property>
5.2 Spark性能优化
5.2.1 内存管理优化
- RDD缓存策略:合理使用缓存级别。
# PySpark缓存示例
from pyspark.storagelevel import StorageLevel
# 使用MEMORY_ONLY缓存
rdd.cache() # 等价于 rdd.persist(StorageLevel.MEMORY_ONLY)
# 使用MEMORY_AND_DISK缓存(当内存不足时溢出到磁盘)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 使用MEMORY_ONLY_SER(序列化存储,节省空间)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
- Shuffle优化:减少Shuffle数据量。
# 使用reduceByKey代替groupByKey
# 不推荐:groupByKey会导致全量数据Shuffle
rdd.groupByKey().mapValues(sum)
# 推荐:reduceByKey在Map端进行局部聚合
rdd.reduceByKey(lambda a, b: a + b)
# 使用mapPartitions代替map
def process_partition(iterator):
# 在分区级别进行处理,减少函数调用开销
for item in iterator:
yield item * 2
rdd.mapPartitions(process_partition)
5.2.2 数据倾斜处理
数据倾斜是Spark性能问题的常见原因,以下是解决方案:
# 解决数据倾斜的几种方法
# 方法1:加盐(Salting)
def add_salt(key, salt_count=10):
import random
salt = random.randint(0, salt_count - 1)
return f"{key}_{salt}"
# 对倾斜的key加盐
rdd_with_salt = rdd.map(lambda x: (add_salt(x[0]), x[1]))
# 聚合后去除盐
result = rdd_with_salt.reduceByKey(lambda a, b: a + b) \
.map(lambda x: (x[0].split('_')[0], x[1])) \
.reduceByKey(lambda a, b: a + b)
# 方法2:广播小表
# 假设有一个倾斜的大表和一个小表
large_rdd = ... # 倾斜的大表
small_rdd = ... # 小表(可以广播)
# 将小表广播到所有Executor
small_broadcast = sc.broadcast(small_rdd.collectAsMap())
# 使用广播变量进行join
result = large_rdd.map(lambda x: (x[0], (x[1], small_broadcast.value.get(x[0], None))))
5.2.3 配置参数优化
# Spark配置参数优化示例
from pyspark import SparkConf, SparkContext
def create_optimized_spark_context():
conf = SparkConf()
# 内存相关配置
conf.set("spark.executor.memory", "4g") # 每个Executor内存
conf.set("spark.executor.memoryOverhead", "512m") # 额外内存
conf.set("spark.driver.memory", "2g") # Driver内存
# 并行度配置
conf.set("spark.default.parallelism", "200") # 默认并行度
conf.set("spark.sql.shuffle.partitions", "200") # Shuffle分区数
# 序列化配置
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "false")
# 动态资源分配
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.minExecutors", "2")
conf.set("spark.dynamicAllocation.maxExecutors", "10")
# Shuffle优化
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.shuffle.compress", "true")
conf.set("spark.shuffle.spill.compress", "true")
# 数据本地化
conf.set("spark.locality.wait", "3s")
return SparkContext(conf=conf)
5.3 Hadoop与Spark性能对比与选择
| 特性 | Hadoop MapReduce | Spark |
|---|---|---|
| 计算模型 | 批处理,磁盘I/O为主 | 内存计算,支持批处理、流处理、交互式查询 |
| 延迟 | 高(分钟级) | 低(秒级) |
| 容错机制 | 任务重试,数据复制 | RDD血统,检查点 |
| 适用场景 | 大规模批处理,ETL任务 | 实时分析,迭代算法,交互式查询 |
| 编程模型 | MapReduce API | RDD、DataFrame、Dataset API |
| 生态系统 | Hive、Pig、HBase等 | Spark SQL、MLlib、GraphX、Spark Streaming |
选择建议:
- 选择Hadoop:当需要处理PB级数据,对延迟要求不高,且已有Hadoop基础设施时。
- 选择Spark:当需要低延迟处理,迭代算法,或需要流处理和批处理统一时。
- 混合使用:使用Hadoop存储(HDFS)和Spark计算,发挥各自优势。
六、实战案例:电商数据分析系统
6.1 案例背景
某电商平台需要分析用户行为数据,包括:
- 用户浏览日志
- 订单数据
- 商品信息
- 用户画像数据
6.2 数据处理流程
6.2.1 数据采集与存储
# 使用Flume采集日志数据到HDFS
# flume.conf配置示例
agent.sources = r1
agent.channels = c1
agent.sinks = k1
# 定义source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /var/log/nginx/access.log
agent.sources.r1.channels = c1
# 定义channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000
# 定义sink
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://namenode:9000/flume/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text
agent.sinks.k1.channel = c1
6.2.2 数据清洗与转换
# PySpark数据清洗示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract, to_date
def clean_user_logs(spark, input_path, output_path):
# 读取原始日志
raw_logs = spark.read.text(input_path)
# 解析日志格式(假设为Nginx日志格式)
log_pattern = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'
parsed_logs = raw_logs.select(
regexp_extract('value', log_pattern, 1).alias('ip'),
regexp_extract('value', log_pattern, 2).alias('user_id'),
regexp_extract('value', log_pattern, 3).alias('session_id'),
regexp_extract('value', log_pattern, 4).alias('timestamp'),
regexp_extract('value', log_pattern, 5).alias('method'),
regexp_extract('value', log_pattern, 6).alias('url'),
regexp_extract('value', log_pattern, 7).alias('protocol'),
regexp_extract('value', log_pattern, 8).cast('int').alias('status'),
regexp_extract('value', log_pattern, 9).cast('int').alias('size')
)
# 数据清洗
cleaned_logs = parsed_logs \
.filter(col('status').isNotNull()) \
.filter(col('user_id') != '-') \
.withColumn('date', to_date(col('timestamp'), 'dd/MMM/yyyy:HH:mm:ss')) \
.withColumn('url_category',
when(col('url').contains('/product/'), 'product')
.when(col('url').contains('/category/'), 'category')
.when(col('url').contains('/cart'), 'cart')
.when(col('url').contains('/checkout'), 'checkout')
.otherwise('other')
)
# 写入Parquet格式(列式存储,压缩)
cleaned_logs.write \
.mode('overwrite') \
.partitionBy('date') \
.parquet(output_path)
return cleaned_logs
# 使用示例
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("DataCleaning") \
.getOrCreate()
clean_user_logs(
spark,
"hdfs://namenode:9000/raw/logs",
"hdfs://namenode:9000/cleaned/logs"
)
spark.stop()
6.2.3 用户行为分析
# 用户行为分析:计算用户访问频率和转化率
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, sum as spark_sum
def analyze_user_behavior(spark, logs_path, orders_path):
# 读取清洗后的日志
logs = spark.read.parquet(logs_path)
# 读取订单数据
orders = spark.read.parquet(orders_path)
# 计算用户访问频率
user_activity = logs.groupBy("user_id") \
.agg(
count("session_id").alias("visit_count"),
countDistinct("date").alias("active_days"),
spark_sum(when(col("url_category") == "product", 1).otherwise(0)).alias("product_views")
)
# 计算转化率(访问到购买的转化)
user_conversions = logs.join(orders, "user_id", "left") \
.groupBy("user_id") \
.agg(
count("session_id").alias("total_visits"),
spark_sum(when(col("order_id").isNotNull(), 1).otherwise(0)).alias("order_count")
) \
.withColumn("conversion_rate",
when(col("total_visits") > 0, col("order_count") / col("total_visits"))
.otherwise(0)
)
# 用户分群(RFM模型)
user_rfm = user_conversions \
.withColumn("recency",
when(col("order_count") > 0, 1).otherwise(0)
) \
.withColumn("frequency", col("order_count")) \
.withColumn("monetary",
when(col("order_count") > 0, 100).otherwise(0) # 假设每单平均100元
)
# 保存结果
user_activity.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_activity")
user_conversions.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_conversions")
user_rfm.write.mode("overwrite").parquet("hdfs://namenode:9000/results/user_rfm")
return user_activity, user_conversions, user_rfm
# 使用示例
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("UserBehaviorAnalysis") \
.getOrCreate()
analyze_user_behavior(
spark,
"hdfs://namenode:9000/cleaned/logs",
"hdfs://namenode:9000/data/orders"
)
spark.stop()
6.2.4 实时推荐系统
# 基于Spark Streaming的实时推荐
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.recommendation import ALS, Rating
import json
def real_time_recommendation():
# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "RealTimeRecommendation")
ssc = StreamingContext(sc, 5) # 批处理间隔5秒
# 读取历史数据(用户-商品评分)
historical_data = sc.textFile("hdfs://namenode:9000/data/ratings.csv") \
.map(lambda line: line.split(",")) \
.map(lambda parts: Rating(int(parts[0]), int(parts[1]), float(parts[2])))
# 训练ALS模型
model = ALS.train(historical_data, rank=10, iterations=10, lambda_=0.01)
# 广播模型
model_broadcast = sc.broadcast(model)
# 处理实时用户行为流
user_actions = ssc.socketTextStream("localhost", 9999)
def recommend_for_user(user_id):
"""为用户生成推荐"""
model = model_broadcast.value
# 获取用户未评分的商品
user_products = sc.parallelize(range(1, 1000)) # 假设商品ID范围
user_ratings = user_products.map(lambda p: (user_id, p))
predictions = model.predictAll(user_ratings)
# 返回Top 10推荐
return predictions.sortBy(lambda x: x.rating, False).take(10)
# 处理每个批次
def process_batch(batch_rdd):
if not batch_rdd.isEmpty():
# 解析用户行为
user_actions = batch_rdd.map(lambda x: json.loads(x))
# 为每个用户生成推荐
recommendations = user_actions.flatMap(
lambda action: recommend_for_user(action['user_id'])
)
# 输出推荐结果
recommendations.pprint()
# 应用处理逻辑
user_actions.foreachRDD(process_batch)
# 启动流处理
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
real_time_recommendation()
七、监控与运维
7.1 Hadoop监控
7.1.1 使用Hadoop Metrics
# 查看Hadoop Metrics
# 1. 启动Metrics系统
hadoop-daemon.sh start metrics
# 2. 配置Metrics输出到文件
# hadoop-metrics2.properties
*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
*.sink.file.filename=/var/log/hadoop/metrics.log
*.sink.file.interval=60
# 3. 查看Metrics
tail -f /var/log/hadoop/metrics.log
7.1.2 使用Ganglia监控
# 安装Ganglia
sudo apt-get install ganglia-monitor ganglia-webfrontend
# 配置Hadoop Metrics
# hadoop-metrics2.properties
*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink
*.sink.ganglia.period=10
*.sink.ganglia.supportsparse=true
*.sink.ganglia.server=ganglia-server:8649
*.sink.ganglia.slope=both
*.sink.ganglia.unit=seconds
*.sink.ganglia.tmax=60
*.sink.ganglia.dmax=0
7.2 Spark监控
7.2.1 Spark UI使用
Spark Web UI提供实时监控:
- Application UI:http://
:4040 - History Server:http://
:18080
7.2.2 使用Spark Metrics System
# 配置Spark Metrics
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
conf.set("spark.metrics.conf.*.sink.console.period", "10")
conf.set("spark.metrics.conf.*.sink.console.unit", "seconds")
# 启用详细日志
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.dir", "hdfs://namenode:9000/spark-logs")
conf.set("spark.history.fs.logDirectory", "hdfs://namenode:9000/spark-logs")
7.2.3 使用Prometheus和Grafana
# prometheus.yml配置
scrape_configs:
- job_name: 'spark'
static_configs:
- targets: ['spark-master:8080', 'spark-worker1:8081', 'spark-worker2:8081']
# Spark配置Prometheus
conf.set("spark.metrics.conf.*.sink.prometheus.class", "org.apache.spark.metrics.sink.PrometheusSink")
conf.set("spark.metrics.conf.*.sink.prometheus.port", "8090")
八、故障排查与最佳实践
8.1 常见问题与解决方案
8.1.1 Hadoop常见问题
NameNode无法启动 “`bash
检查日志
tail -f $HADOOP_HOME/logs/hadoop--namenode-.log
# 检查磁盘空间 df -h
# 检查端口占用 netstat -tuln | grep 9000
2. **DataNode无法启动**
```bash
# 检查DataNode日志
tail -f $HADOOP_HOME/logs/hadoop-*-datanode-*.log
# 检查数据目录权限
ls -ld /opt/hadoop/datanode
# 检查磁盘空间
df -h /opt/hadoop/datanode
8.1.2 Spark常见问题
Executor OOM “`python
解决方案1:增加Executor内存
conf.set(“spark.executor.memory”, “8g”)
# 解决方案2:增加分区数 conf.set(“spark.sql.shuffle.partitions”, “500”)
# 解决方案3:使用Kryo序列化 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
2. **数据倾斜**
```python
# 检测数据倾斜
rdd.mapPartitionsWithIndex(lambda idx, it: [(idx, len(list(it)))]) \
.collect()
# 解决方案:加盐
salted_rdd = rdd.map(lambda x: (f"{x[0]}_{random.randint(0, 9)}", x[1]))
8.2 最佳实践总结
8.2.1 Hadoop最佳实践
数据存储:
- 使用Parquet/ORC格式存储,提高查询性能
- 合理设置块大小(256MB-1GB)
- 定期清理临时文件
作业优化:
- 使用Combiner减少Shuffle数据
- 合理设置Map和Reduce任务数
- 使用压缩减少I/O
集群管理:
- 定期备份NameNode元数据
- 监控集群健康状态
- 合理分配资源
8.2.2 Spark最佳实践
数据处理:
- 优先使用DataFrame API而非RDD
- 合理使用缓存和持久化
- 避免使用collect()收集大量数据
性能调优:
- 合理设置并行度
- 优化Shuffle操作
- 使用广播变量减少数据传输
资源管理:
- 使用动态资源分配
- 合理设置Executor和Driver内存
- 监控资源使用情况
九、未来趋势与扩展学习
9.1 技术演进
- Hadoop 3.x:支持Erasure Coding,提高存储效率
- Spark 3.x:自适应查询执行(AQE),动态分区裁剪
- 云原生部署:Kubernetes上的Hadoop和Spark
- 实时流处理:Structured Streaming成为主流
9.2 扩展学习路径
高级主题:
- Spark MLlib机器学习
- Spark GraphX图计算
- Delta Lake数据湖
云平台:
- AWS EMR
- Google Dataproc
- Azure HDInsight
相关技术:
- Apache Flink(流处理)
- Apache Kafka(消息队列)
- Apache Iceberg(表格式)
十、总结
本指南系统性地介绍了Hadoop和Spark的实战应用,从环境搭建、数据处理到性能优化,涵盖了完整的实践路径。通过具体的代码示例和实战案例,读者可以掌握这两项关键技术的核心应用。
关键要点回顾:
- Hadoop:适合大规模批处理,提供可靠的存储和计算基础
- Spark:适合低延迟处理,支持多种计算模式
- 集成应用:HDFS作为存储层,Spark作为计算层,发挥各自优势
- 性能优化:从配置参数、数据处理策略到集群管理,全方位优化
- 监控运维:确保系统稳定运行,及时发现和解决问题
实践建议:
- 从单节点部署开始,逐步扩展到集群
- 从简单示例(如WordCount)开始,逐步增加复杂度
- 关注官方文档和社区,保持技术更新
- 在生产环境中,务必进行充分的测试和性能调优
通过本指南的学习和实践,读者将能够构建高效、可靠的大数据处理系统,应对各种数据处理挑战。
