引言:理解操作系统作业调度的核心价值
操作系统作业调度是计算机科学中至关重要的概念,它决定了多个进程或作业如何在有限的CPU资源下高效运行。作业调度算法直接影响系统的吞吐量、响应时间、公平性和资源利用率。在实际操作系统中,调度器(Scheduler)负责决定哪个进程获得CPU时间,而在学习阶段,通过编程模拟这些算法可以帮助我们深入理解它们的工作原理和性能差异。
本文将通过Python实现四种经典的作业调度算法:先来先服务(FCFS)、短作业优先(SJF)、优先级调度(PS)和最高响应比优先(HRRN),并进行详细的性能对比分析。我们将构建一个完整的模拟环境,包括作业生成、调度器实现和性能指标计算。
1. 调度算法理论基础
1.1 先来先服务(FCFS)
FCFS是最简单的调度算法,按照作业到达的顺序分配CPU资源。它易于实现但可能导致短作业等待长作业完成,产生”护航效应”。
1.2 短作业优先(SJF)
SJF选择估计运行时间最短的作业优先执行。它能最小化平均等待时间,但可能导致长作业”饥饿”。
1.3 优先级调度(PS)
PS根据作业的优先级分配CPU。优先级可以是静态或动态的。需要注意防止低优先级作业饥饿。
1.4 最高响应比优先(HRRN)
HRRN是FCFS和SJF的折中方案,响应比 = (等待时间 + 服务时间) / 服务时间。它既考虑了等待时间又考虑了服务时间,能平衡长短作业。
2. 模拟环境设计与实现
2.1 作业类设计
首先,我们创建一个Job类来表示系统中的作业:
import random
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Job:
"""作业类,表示一个需要CPU时间的作业"""
job_id: int
arrival_time: int # 到达时间
burst_time: int # 需要的CPU时间
priority: int = 0 # 优先级(数值越小优先级越高)
start_time: Optional[int] = None # 开始执行时间
finish_time: Optional[int] = None # 完成时间
@property
def waiting_time(self) -> int:
"""等待时间 = 开始时间 - 到达时间"""
if self.start_time is None:
return 0
return self.start_time - self.arrival_time
@property
def turnaround_time(self) -> int:
"""周转时间 = 完成时间 - 到达时间"""
if self.finish_time is None:
return 0
return self.finish_time - self.arrival_time
@property
def response_ratio(self) -> float:
"""响应比 = (等待时间 + 服务时间) / 服务时间"""
if self.start_time is None:
return 0.0
return (self.waiting_time + self.burst_time) / self.burst_time
def __repr__(self):
return (f"Job(id={self.job_id}, arrive={self.arrival_time}, "
f"burst={self.burst_time}, priority={self.priority})")
def generate_jobs(num_jobs: int, max_arrival: int = 20,
max_burst: int = 10, max_priority: int = 5) -> List[Job]:
"""生成随机作业列表"""
jobs = []
for i in range(num_jobs):
arrival = random.randint(0, max_arrival)
burst = random.randint(1, max_burst)
priority = random.randint(1, max_priority)
jobs.append(Job(job_id=i+1, arrival_time=arrival,
burst_time=burst, priority=priority))
# 按到达时间排序
jobs.sort(key=lambda x: x.arrival_time)
return jobs
2.2 调度器基类与具体实现
我们创建一个调度器基类,然后为每种算法实现具体的调度器:
from abc import ABC, abstractmethod
from collections import deque
class Scheduler(ABC):
"""调度器抽象基类"""
def __init__(self, jobs: List[Job]):
self.original_jobs = jobs
self.jobs = [job for job in jobs] # 深拷贝
self.current_time = 0
self.completed_jobs = []
self.gantt_chart = [] # 甘特图记录
@abstractmethod
def schedule(self) -> List[Job]:
"""执行调度,返回按完成时间排序的作业列表"""
pass
def run_simulation(self) -> dict:
"""运行完整的模拟并返回性能指标"""
scheduled_jobs = self.schedule()
# 计算性能指标
total_waiting = sum(j.waiting_time for j in scheduled_jobs)
total_turnaround = sum(j.turnaround_time for j in scheduled_jobs)
total_response_ratio = sum(j.response_ratio for j in scheduled_jobs)
n = len(scheduled_jobs)
return {
"algorithm": self.__class__.__name__,
"jobs": scheduled_jobs,
"avg_waiting": total_waiting / n,
"avg_turnaround": total_turnaround / n,
"avg_response_ratio": total_response_ratio / n,
"gantt_chart": self.gantt_chart,
"total_time": max(j.finish_time for j in scheduled_jobs) if scheduled_jobs else 0
}
class FCFSScheduler(Scheduler):
"""先来先服务调度器"""
def schedule(self) -> List[Job]:
# FCFS按到达时间顺序执行
jobs_by_arrival = sorted(self.jobs, key=lambda x: x.arrival_time)
for job in jobs_by_arrival:
# 如果当前时间小于作业到达时间,需要等待
if self.current_time < job.arrival_time:
self.current_time = job.arrival_time
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class SJFScheduler(Scheduler):
"""短作业优先调度器"""
def schedule(self) -> List[Job]:
# 使用列表存储未完成的作业
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
# 如果没有就绪作业,时间跳到下一个作业到达
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 选择burst_time最短的作业
ready_queue = deque(sorted(ready_queue, key=lambda x: x.burst_time))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class PriorityScheduler(Scheduler):
"""优先级调度器"""
def schedule(self) -> List[Job]:
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 选择优先级最高的作业(数值越小优先级越高)
ready_queue = deque(sorted(ready_queue, key=lambda x: x.priority))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class HRRNScheduler(Scheduler):
"""最高响应比优先调度器"""
def schedule(self) -> List[Job]:
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 计算每个作业的响应比
for job in ready_queue:
# 注意:响应比需要实时计算,因为等待时间在变化
job.response_ratio = (self.current_time - job.arrival_time + job.burst_time) / job.burst_time
# 选择响应比最高的作业
ready_queue = deque(sorted(ready_queue, key=lambda x: x.response_ratio, reverse=True))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
2.3 性能分析与可视化模块
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List
def plot_gantt_chart(gantt_data: List[Dict], title: str):
"""绘制甘特图"""
fig, ax = plt.subplots(figsize=(12, 3))
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for i, segment in enumerate(gantt_data):
job_id = segment['job_id']
start = segment['start']
end = segment['end']
ax.barh(y=0, width=end-start, left=start, height=0.5,
color=colors[job_id % 10], edgecolor='black')
# 在条形中间显示作业ID
ax.text((start + end) / 2, 0, f'J{job_id}',
ha='center', va='center', color='white', fontweight='bold')
ax.set_xlabel('时间')
ax.set_title(title)
ax.set_yticks([])
ax.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()
def compare_schedulers(jobs: List[Job], plot_charts: bool = True) -> pd.DataFrame:
"""比较所有调度器的性能"""
import pandas as pd
schedulers = [
FCFSScheduler(jobs),
SJFScheduler(jobs),
PriorityScheduler(jobs),
HRRNScheduler(jobs)
]
results = []
for scheduler in schedulers:
result = scheduler.run_simulation()
results.append({
'算法': result['algorithm'],
'平均等待时间': result['avg_waiting'],
'平均周转时间': result['avg_turnaround'],
'平均响应比': result['avg_response_ratio'],
'总完成时间': result['total_time']
})
if plot_charts:
plot_gantt_chart(result['gantt_chart'], result['algorithm'])
return pd.DataFrame(results)
def run_batch_simulation(num_jobs: int = 10, num_runs: int = 100) -> pd.DataFrame:
"""批量运行模拟以获得统计结果"""
import pandas as pd
algorithms = ['FCFSScheduler', 'SJFScheduler', 'PriorityScheduler', 'HRRNScheduler']
metrics = ['avg_waiting', 'avg_turnaround', 'avg_response_ratio', 'total_time']
# 初始化统计字典
stats = {algo: {metric: [] for metric in metrics} for algo in algorithms}
for run in range(num_runs):
jobs = generate_jobs(num_jobs)
for SchedulerClass in [FCFSScheduler, SJFScheduler, PriorityScheduler, HRRNScheduler]:
scheduler = SchedulerClass(jobs)
result = scheduler.run_simulation()
algo_name = SchedulerClass.__name__
for metric in metrics:
stats[algo_name][metric].append(result[metric])
# 计算平均值和标准差
summary = []
for algo in algorithms:
row = {'算法': algo}
for metric in metrics:
values = stats[algo][metric]
row[f'{metric}_mean'] = np.mean(values)
row[f'{metric}_std'] = np.std(values)
summary.append(row)
return pd.DataFrame(summary)
# 简单的统计分析函数
def analyze_results(df: pd.DataFrame):
"""分析性能对比结果"""
print("=" * 60)
print("调度算法性能对比分析")
print("=" * 60)
# 找出每项指标的最优算法
best_waiting = df.loc[df['平均等待时间'].idxmin()]
best_turnaround = df.loc[df['平均周转时间'].idxmin()]
best_response = df.loc[df['平均响应比'].idxmin()]
best_total = df.loc[df['总完成时间'].idxmin()]
print(f"\n最佳平均等待时间: {best_waiting['算法']} ({best_waiting['平均等待时间']:.2f})")
print(f"最佳平均周转时间: {best_turnaround['算法']} ({best_turnaround['平均周转时间']:.2f})")
print(f"最佳平均响应比: {best_response['算法']} ({best_response['平均响应比']:.2f})")
print(f"最短总完成时间: {best_total['算法']} ({best_total['总完成时间']:.2f})")
print("\n详细对比:")
print(df.to_string(index=False))
3. 完整的模拟运行与结果分析
3.1 单次模拟示例
# 生成测试作业
jobs = generate_jobs(5, max_arrival=10, max_burst=8, max_priority=5)
print("生成的作业列表:")
for job in jobs:
print(job)
# 运行单次模拟
print("\n" + "="*50)
print("单次模拟结果")
print("="*50)
# FCFS
fcfs_result = FCFSScheduler(jobs).run_simulation()
print(f"\nFCFS - 平均等待时间: {fcfs_result['avg_waiting']:.2f}, 平均周转时间: {fcfs_result['avg_turnaround']:.2f}")
# SJF
sjf_result = SJFScheduler(jobs).run_simulation()
print(f"SJF - 平均等待时间: {sjf_result['avg_waiting']:.2f}, 平均周转时间: {sjf_result['avg_turnaround']:.2f}")
# 优先级
ps_result = PriorityScheduler(jobs).run_simulation()
print(f"优先级调度 - 平均等待时间: {ps_result['avg_waiting']:.2f}, 平均周转时间: {ps_result['avg_turnaround']:.2f}")
# HRRN
hrrn_result = HRRNScheduler(jobs).run_simulation()
print(f"HRRN - 平均等待时间: {hrrn_result['avg_waiting']:.2f}, 平均周转时间: {hrrn_result['avg_turnaround']:.2f}")
3.2 批量模拟与统计分析
# 批量模拟100次,每次10个作业
print("\n" + "="*50)
print("批量模拟统计结果(100次运行,每次10个作业)")
print("="*50)
batch_results = run_batch_simulation(num_jobs=10, num_runs=100)
print(batch_results.to_string(index=False))
# 分析结果
analyze_results(batch_results)
4. 性能对比分析与讨论
4.1 理论分析
从理论角度分析,各算法的特点如下:
- FCFS:实现简单,但可能导致平均等待时间较长,特别是当长作业先到达时。
- SJF:理论上能提供最小的平均等待时间,但需要预知作业运行时间,且可能导致长作业饥饿。
- 优先级调度:能体现系统策略,但需要合理设置优先级,否则可能导致低优先级作业饥饿。
- HRRN:平衡了等待时间和服务时间,既避免了长作业饥饿,又不会让短作业等待过久。
4.2 实验结果分析
通过批量模拟我们通常会观察到:
- 平均等待时间:SJF通常最优,FCFS最差,HRRN介于两者之间。
- 平均周转时间:与等待时间趋势相似,SJF表现最好。
- 响应比:HRRN自然最优,因为它就是基于响应比选择的。
- 总完成时间:SJF和HRRN通常能更快完成所有作业。
4.3 算法选择建议
- 批处理系统:SJF或HRRN,追求吞吐量和平均性能。
- 交互式系统:可能需要结合时间片轮转,但作业调度可用HRRN。
- 实时系统:优先级调度更合适,能保证关键任务优先执行。
- 通用系统:现代操作系统通常采用多级反馈队列,结合了多种算法的优点。
5. 扩展与优化方向
5.1 多级反馈队列(MLFQ)
可以扩展为多级反馈队列,结合时间片轮转和优先级调度:
class MLFQScheduler:
"""多级反馈队列调度器"""
def __init__(self, jobs, queues=3, base_time_quantum=2):
self.jobs = jobs
self.queues = queues
self.base_time_quantum = base_time_quantum
self.queue_list = [deque() for _ in range(queues)]
self.time_quantums = [base_time_quantum * (2 ** i) for i in range(queues)]
def schedule(self):
# 实现多级反馈队列逻辑
# 新作业进入最高优先级队列
# 用完时间片未完成则降级
# 高队列优先
pass
5.2 考虑I/O等待时间
实际系统中作业可能有CPU突发和I/O突发,可以扩展Job类:
@dataclass
class RealJob(Job):
cpu_bursts: List[int] = field(default_factory=list)
io_bursts: List[int] = field(default_factory=list)
current_burst: int = 0
5.3 可视化增强
使用Plotly创建交互式甘特图:
import plotly.express as px
def interactive_gantt(gantt_data):
df = pd.DataFrame(gantt_data)
fig = px.timeline(df, x_start="start", x_end="end", y="job_id",
color="job_id", title="调度甘特图")
fig.show()
6. 结论
通过本文的模拟实现,我们深入理解了四种经典作业调度算法的工作原理和性能特点。实验表明:
- SJF在平均等待时间和周转时间上表现最佳,但需要预知作业运行时间。
- HRRN是平衡性最好的算法,既考虑了等待时间又考虑了服务时间。
- FCFS实现简单但性能较差,适合教学或简单系统。
- 优先级调度适合有明确优先级需求的系统。
在实际系统设计中,调度算法的选择需要综合考虑系统目标(吞吐量、响应时间、公平性)、作业特性和实现复杂度。现代操作系统通常采用更复杂的混合策略,但这些经典算法仍然是理解调度问题的基础。
通过编程模拟,我们不仅验证了理论分析,还获得了对调度算法行为的直观理解,这对于系统性能优化和资源管理具有重要指导意义。# 操作系统作业调度模拟实战:从先来先服务到最高响应比优先算法的实现与性能对比分析
引言:理解操作系统作业调度的核心价值
操作系统作业调度是计算机科学中至关重要的概念,它决定了多个进程或作业如何在有限的CPU资源下高效运行。作业调度算法直接影响系统的吞吐量、响应时间、公平性和资源利用率。在实际操作系统中,调度器(Scheduler)负责决定哪个进程获得CPU时间,而在学习阶段,通过编程模拟这些算法可以帮助我们深入理解它们的工作原理和性能差异。
本文将通过Python实现四种经典的作业调度算法:先来先服务(FCFS)、短作业优先(SJF)、优先级调度(PS)和最高响应比优先(HRRN),并进行详细的性能对比分析。我们将构建一个完整的模拟环境,包括作业生成、调度器实现和性能指标计算。
1. 调度算法理论基础
1.1 先来先服务(FCFS)
FCFS是最简单的调度算法,按照作业到达的顺序分配CPU资源。它易于实现但可能导致短作业等待长作业完成,产生”护航效应”。
1.2 短作业优先(SJF)
SJF选择估计运行时间最短的作业优先执行。它能最小化平均等待时间,但可能导致长作业”饥饿”。
1.3 优先级调度(PS)
PS根据作业的优先级分配CPU。优先级可以是静态或动态的。需要注意防止低优先级作业饥饿。
1.4 最高响应比优先(HRRN)
HRRN是FCFS和SJF的折中方案,响应比 = (等待时间 + 服务时间) / 服务时间。它既考虑了等待时间又考虑了服务时间,能平衡长短作业。
2. 模拟环境设计与实现
2.1 作业类设计
首先,我们创建一个Job类来表示系统中的作业:
import random
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Job:
"""作业类,表示一个需要CPU时间的作业"""
job_id: int
arrival_time: int # 到达时间
burst_time: int # 需要的CPU时间
priority: int = 0 # 优先级(数值越小优先级越高)
start_time: Optional[int] = None # 开始执行时间
finish_time: Optional[int] = None # 完成时间
@property
def waiting_time(self) -> int:
"""等待时间 = 开始时间 - 到达时间"""
if self.start_time is None:
return 0
return self.start_time - self.arrival_time
@property
def turnaround_time(self) -> int:
"""周转时间 = 完成时间 - 到达时间"""
if self.finish_time is None:
return 0
return self.finish_time - self.arrival_time
@property
def response_ratio(self) -> float:
"""响应比 = (等待时间 + 服务时间) / 服务时间"""
if self.start_time is None:
return 0.0
return (self.waiting_time + self.burst_time) / self.burst_time
def __repr__(self):
return (f"Job(id={self.job_id}, arrive={self.arrival_time}, "
f"burst={self.burst_time}, priority={self.priority})")
def generate_jobs(num_jobs: int, max_arrival: int = 20,
max_burst: int = 10, max_priority: int = 5) -> List[Job]:
"""生成随机作业列表"""
jobs = []
for i in range(num_jobs):
arrival = random.randint(0, max_arrival)
burst = random.randint(1, max_burst)
priority = random.randint(1, max_priority)
jobs.append(Job(job_id=i+1, arrival_time=arrival,
burst_time=burst, priority=priority))
# 按到达时间排序
jobs.sort(key=lambda x: x.arrival_time)
return jobs
2.2 调度器基类与具体实现
我们创建一个调度器基类,然后为每种算法实现具体的调度器:
from abc import ABC, abstractmethod
from collections import deque
class Scheduler(ABC):
"""调度器抽象基类"""
def __init__(self, jobs: List[Job]):
self.original_jobs = jobs
self.jobs = [job for job in jobs] # 深拷贝
self.current_time = 0
self.completed_jobs = []
self.gantt_chart = [] # 甘特图记录
@abstractmethod
def schedule(self) -> List[Job]:
"""执行调度,返回按完成时间排序的作业列表"""
pass
def run_simulation(self) -> dict:
"""运行完整的模拟并返回性能指标"""
scheduled_jobs = self.schedule()
# 计算性能指标
total_waiting = sum(j.waiting_time for j in scheduled_jobs)
total_turnaround = sum(j.turnaround_time for j in scheduled_jobs)
total_response_ratio = sum(j.response_ratio for j in scheduled_jobs)
n = len(scheduled_jobs)
return {
"algorithm": self.__class__.__name__,
"jobs": scheduled_jobs,
"avg_waiting": total_waiting / n,
"avg_turnaround": total_turnaround / n,
"avg_response_ratio": total_response_ratio / n,
"gantt_chart": self.gantt_chart,
"total_time": max(j.finish_time for j in scheduled_jobs) if scheduled_jobs else 0
}
class FCFSScheduler(Scheduler):
"""先来先服务调度器"""
def schedule(self) -> List[Job]:
# FCFS按到达时间顺序执行
jobs_by_arrival = sorted(self.jobs, key=lambda x: x.arrival_time)
for job in jobs_by_arrival:
# 如果当前时间小于作业到达时间,需要等待
if self.current_time < job.arrival_time:
self.current_time = job.arrival_time
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class SJFScheduler(Scheduler):
"""短作业优先调度器"""
def schedule(self) -> List[Job]:
# 使用列表存储未完成的作业
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
# 如果没有就绪作业,时间跳到下一个作业到达
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 选择burst_time最短的作业
ready_queue = deque(sorted(ready_queue, key=lambda x: x.burst_time))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class PriorityScheduler(Scheduler):
"""优先级调度器"""
def schedule(self) -> List[Job]:
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 选择优先级最高的作业(数值越小优先级越高)
ready_queue = deque(sorted(ready_queue, key=lambda x: x.priority))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
class HRRNScheduler(Scheduler):
"""最高响应比优先调度器"""
def schedule(self) -> List[Job]:
remaining_jobs = sorted(self.jobs, key=lambda x: x.arrival_time)
ready_queue = deque()
while remaining_jobs or ready_queue:
# 将到达的作业加入就绪队列
while remaining_jobs and remaining_jobs[0].arrival_time <= self.current_time:
ready_queue.append(remaining_jobs.pop(0))
if not ready_queue:
if remaining_jobs:
self.current_time = remaining_jobs[0].arrival_time
continue
else:
break
# 计算每个作业的响应比
for job in ready_queue:
# 注意:响应比需要实时计算,因为等待时间在变化
job.response_ratio = (self.current_time - job.arrival_time + job.burst_time) / job.burst_time
# 选择响应比最高的作业
ready_queue = deque(sorted(ready_queue, key=lambda x: x.response_ratio, reverse=True))
job = ready_queue.popleft()
# 记录开始时间
job.start_time = self.current_time
# 记录甘特图
self.gantt_chart.append({
'job_id': job.job_id,
'start': self.current_time,
'end': self.current_time + job.burst_time
})
# 更新当前时间
self.current_time += job.burst_time
# 记录完成时间
job.finish_time = self.current_time
# 完成的作业
self.completed_jobs.append(job)
return self.completed_jobs
2.3 性能分析与可视化模块
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List
def plot_gantt_chart(gantt_data: List[Dict], title: str):
"""绘制甘特图"""
fig, ax = plt.subplots(figsize=(12, 3))
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for i, segment in enumerate(gantt_data):
job_id = segment['job_id']
start = segment['start']
end = segment['end']
ax.barh(y=0, width=end-start, left=start, height=0.5,
color=colors[job_id % 10], edgecolor='black')
# 在条形中间显示作业ID
ax.text((start + end) / 2, 0, f'J{job_id}',
ha='center', va='center', color='white', fontweight='bold')
ax.set_xlabel('时间')
ax.set_title(title)
ax.set_yticks([])
ax.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()
def compare_schedulers(jobs: List[Job], plot_charts: bool = True) -> pd.DataFrame:
"""比较所有调度器的性能"""
import pandas as pd
schedulers = [
FCFSScheduler(jobs),
SJFScheduler(jobs),
PriorityScheduler(jobs),
HRRNScheduler(jobs)
]
results = []
for scheduler in schedulers:
result = scheduler.run_simulation()
results.append({
'算法': result['algorithm'],
'平均等待时间': result['avg_waiting'],
'平均周转时间': result['avg_turnaround'],
'平均响应比': result['avg_response_ratio'],
'总完成时间': result['total_time']
})
if plot_charts:
plot_gantt_chart(result['gantt_chart'], result['algorithm'])
return pd.DataFrame(results)
def run_batch_simulation(num_jobs: int = 10, num_runs: int = 100) -> pd.DataFrame:
"""批量运行模拟以获得统计结果"""
import pandas as pd
algorithms = ['FCFSScheduler', 'SJFScheduler', 'PriorityScheduler', 'HRRNScheduler']
metrics = ['avg_waiting', 'avg_turnaround', 'avg_response_ratio', 'total_time']
# 初始化统计字典
stats = {algo: {metric: [] for metric in metrics} for algo in algorithms}
for run in range(num_runs):
jobs = generate_jobs(num_jobs)
for SchedulerClass in [FCFSScheduler, SJFScheduler, PriorityScheduler, HRRNScheduler]:
scheduler = SchedulerClass(jobs)
result = scheduler.run_simulation()
algo_name = SchedulerClass.__name__
for metric in metrics:
stats[algo_name][metric].append(result[metric])
# 计算平均值和标准差
summary = []
for algo in algorithms:
row = {'算法': algo}
for metric in metrics:
values = stats[algo][metric]
row[f'{metric}_mean'] = np.mean(values)
row[f'{metric}_std'] = np.std(values)
summary.append(row)
return pd.DataFrame(summary)
# 简单的统计分析函数
def analyze_results(df: pd.DataFrame):
"""分析性能对比结果"""
print("=" * 60)
print("调度算法性能对比分析")
print("=" * 60)
# 找出每项指标的最优算法
best_waiting = df.loc[df['平均等待时间'].idxmin()]
best_turnaround = df.loc[df['平均周转时间'].idxmin()]
best_response = df.loc[df['平均响应比'].idxmin()]
best_total = df.loc[df['总完成时间'].idxmin()]
print(f"\n最佳平均等待时间: {best_waiting['算法']} ({best_waiting['平均等待时间']:.2f})")
print(f"最佳平均周转时间: {best_turnaround['算法']} ({best_turnaround['平均周转时间']:.2f})")
print(f"最佳平均响应比: {best_response['算法']} ({best_response['平均响应比']:.2f})")
print(f"最短总完成时间: {best_total['算法']} ({best_total['总完成时间']:.2f})")
print("\n详细对比:")
print(df.to_string(index=False))
3. 完整的模拟运行与结果分析
3.1 单次模拟示例
# 生成测试作业
jobs = generate_jobs(5, max_arrival=10, max_burst=8, max_priority=5)
print("生成的作业列表:")
for job in jobs:
print(job)
# 运行单次模拟
print("\n" + "="*50)
print("单次模拟结果")
print("="*50)
# FCFS
fcfs_result = FCFSScheduler(jobs).run_simulation()
print(f"\nFCFS - 平均等待时间: {fcfs_result['avg_waiting']:.2f}, 平均周转时间: {fcfs_result['avg_turnaround']:.2f}")
# SJF
sjf_result = SJFScheduler(jobs).run_simulation()
print(f"SJF - 平均等待时间: {sjf_result['avg_waiting']:.2f}, 平均周转时间: {sjf_result['avg_turnaround']:.2f}")
# 优先级
ps_result = PriorityScheduler(jobs).run_simulation()
print(f"优先级调度 - 平均等待时间: {ps_result['avg_waiting']:.2f}, 平均周转时间: {ps_result['avg_turnaround']:.2f}")
# HRRN
hrrn_result = HRRNScheduler(jobs).run_simulation()
print(f"HRRN - 平均等待时间: {hrrn_result['avg_waiting']:.2f}, 平均周转时间: {hrrn_result['avg_turnaround']:.2f}")
3.2 批量模拟与统计分析
# 批量模拟100次,每次10个作业
print("\n" + "="*50)
print("批量模拟统计结果(100次运行,每次10个作业)")
print("="*50)
batch_results = run_batch_simulation(num_jobs=10, num_runs=100)
print(batch_results.to_string(index=False))
# 分析结果
analyze_results(batch_results)
4. 性能对比分析与讨论
4.1 理论分析
从理论角度分析,各算法的特点如下:
- FCFS:实现简单,但可能导致平均等待时间较长,特别是当长作业先到达时。
- SJF:理论上能提供最小的平均等待时间,但需要预知作业运行时间,且可能导致长作业饥饿。
- 优先级调度:能体现系统策略,但需要合理设置优先级,否则可能导致低优先级作业饥饿。
- HRRN:平衡了等待时间和服务时间,既避免了长作业饥饿,又不会让短作业等待过久。
4.2 实验结果分析
通过批量模拟我们通常会观察到:
- 平均等待时间:SJF通常最优,FCFS最差,HRRN介于两者之间。
- 平均周转时间:与等待时间趋势相似,SJF表现最好。
- 响应比:HRRN自然最优,因为它就是基于响应比选择的。
- 总完成时间:SJF和HRRN通常能更快完成所有作业。
4.3 算法选择建议
- 批处理系统:SJF或HRRN,追求吞吐量和平均性能。
- 交互式系统:可能需要结合时间片轮转,但作业调度可用HRRN。
- 实时系统:优先级调度更合适,能保证关键任务优先执行。
- 通用系统:现代操作系统通常采用多级反馈队列,结合了多种算法的优点。
5. 扩展与优化方向
5.1 多级反馈队列(MLFQ)
可以扩展为多级反馈队列,结合时间片轮转和优先级调度:
class MLFQScheduler:
"""多级反馈队列调度器"""
def __init__(self, jobs, queues=3, base_time_quantum=2):
self.jobs = jobs
self.queues = queues
self.base_time_quantum = base_time_quantum
self.queue_list = [deque() for _ in range(queues)]
self.time_quantums = [base_time_quantum * (2 ** i) for i in range(queues)]
def schedule(self):
# 实现多级反馈队列逻辑
# 新作业进入最高优先级队列
# 用完时间片未完成则降级
# 高队列优先
pass
5.2 考虑I/O等待时间
实际系统中作业可能有CPU突发和I/O突发,可以扩展Job类:
@dataclass
class RealJob(Job):
cpu_bursts: List[int] = field(default_factory=list)
io_bursts: List[int] = field(default_factory=list)
current_burst: int = 0
5.3 可视化增强
使用Plotly创建交互式甘特图:
import plotly.express as px
def interactive_gantt(gantt_data):
df = pd.DataFrame(gantt_data)
fig = px.timeline(df, x_start="start", x_end="end", y="job_id",
color="job_id", title="调度甘特图")
fig.show()
6. 结论
通过本文的模拟实现,我们深入理解了四种经典作业调度算法的工作原理和性能特点。实验表明:
- SJF在平均等待时间和周转时间上表现最佳,但需要预知作业运行时间。
- HRRN是平衡性最好的算法,既考虑了等待时间又考虑了服务时间。
- FCFS实现简单但性能较差,适合教学或简单系统。
- 优先级调度适合有明确优先级需求的系统。
在实际系统设计中,调度算法的选择需要综合考虑系统目标(吞吐量、响应时间、公平性)、作业特性和实现复杂度。现代操作系统通常采用更复杂的混合策略,但这些经典算法仍然是理解调度问题的基础。
通过编程模拟,我们不仅验证了理论分析,还获得了对调度算法行为的直观理解,这对于系统性能优化和资源管理具有重要指导意义。
