引言:作业管理的核心概念
操作系统作为计算机系统的核心软件,负责管理和协调硬件资源与软件程序的交互。其中,作业管理是操作系统最重要的功能之一。作业(Job)是指用户在一次计算或事务处理过程中要求计算机系统所做工作的集合,它从用户提交开始,到最终完成结束,整个生命周期都受到操作系统的严密监控和调度。
作业管理的目标是提高系统吞吐量、缩短响应时间、公平分配资源,并确保系统的稳定性和安全性。现代操作系统通常采用多道程序设计技术,允许多个作业同时驻留在内存中,通过精心的调度算法实现资源的最优利用。
作业的生命周期概述
一个完整的作业生命周期通常包含以下五个主要阶段:
- 提交阶段(Submission):用户将作业输入到计算机系统
- 收容阶段(Holding):作业进入后备队列,等待进入内存
- 执行阶段(Execution):作业被调入内存并开始运行
- 完成阶段(Completion):作业运行结束,系统回收资源
- 善后阶段(Post-processing):输出结果,清理临时资源
接下来,我们将详细解析每个阶段的操作系统管理机制。
1. 提交阶段:作业进入系统
1.1 作业的输入方式
用户可以通过多种方式提交作业:
- 命令行方式:在终端输入命令,如
./program arg1 arg2 - 批处理文件:将多个命令写入脚本文件,如
batch_job.sh - 图形界面:通过GUI应用程序提交任务
- 网络提交:通过远程登录或Web服务提交作业
1.2 作业控制语言(JCL)
在批处理系统中,用户通常使用作业控制语言(Job Control Language, JCL)来描述作业的要求。例如,在IBM大型机系统中:
//JOB1 JOB (ACCT),'USER',CLASS=A,MSGCLASS=H
//STEP1 EXEC PGM=PROGA
//SYSPRINT DD SYSOUT=*
//SYSIN DD *
input data here
/*
//STEP2 EXEC PGM=PROGB
//SYSPRINT DD SYSOUT=*
//SYSIN DD *
more input
/*
//EOF
这段JCL定义了一个包含两个步骤的作业,每个步骤运行不同的程序,并指定了输入输出设备。
1.3 操作系统接收作业
当作业提交后,操作系统通过以下方式接收:
- 命令解释器(Shell):在交互式系统中,Shell接收用户命令并创建进程
- 作业提交守护进程:在批处理系统中,专门的守护进程(如Linux的
atd或cron)接收作业 - 系统调用接口:用户程序通过
fork()、exec()等系统调用创建新进程
在Linux系统中,一个简单的作业提交过程如下:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:执行作业
execlp("ls", "ls", "-l", NULL);
perror("execlp failed");
return 1;
} else if (pid > 0) {
// 父进程:等待子进程完成
wait(NULL);
printf("作业完成\n");
} else {
perror("fork failed");
}
return 0;
}
2. 收容阶段:作业进入后备队列
2.1 作业控制块(JCB)
当作业被接受后,操作系统会为其创建一个作业控制块(Job Control Block, JCB)。JCB是作业在系统中的唯一标识,包含了作业的所有元数据。
JCB通常包含以下信息:
| 字段 | 说明 |
|---|---|
| 作业标识符 | 唯一的作业ID |
| 用户名 | 提交作业的用户 |
| 优先级 | 作业的优先级别 |
| 作业类型 | CPU密集型、I/O密集型等 |
| 资源需求 | CPU时间、内存大小、I/O设备等 |
| 作业状态 | 提交、后备、运行、完成等 |
| 创建时间 | 作业提交的时间戳 |
| 预计运行时间 | 作业预估的执行时间 |
| 内存需求 | 所需的内存空间大小 |
2.2 后备队列管理
作业进入系统后,首先被放入后备队列(Job Queue),等待作业调度程序将其调入内存。后备队列通常按某种策略排序:
- 先来先服务(FCFS):按提交时间排序
- 优先级调度:按优先级高低排序
- 短作业优先(SJF):按预计运行时间排序
2.3 作业状态转换
作业在收容阶段的状态为提交(Submit)或后备(Holding)。当作业调度程序选择该作业时,状态转换为就绪(Ready)。
状态转换图如下:
提交 → 后备 → 就绪 → 运行 → 完成
3. 执行阶段:作业在内存中运行
3.1 进程创建与内存分配
当作业被作业调度程序选中后,操作系统执行以下步骤:
- 创建进程:为作业创建一个或多个进程
- 分配内存:为进程分配内存空间
- 加载程序:将程序代码和数据加载到内存
- 初始化:设置进程控制块(PCB)、打开文件表等
在Linux中,这个过程通过fork()和exec()系统调用实现:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/resource.h>
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:设置资源限制
struct rlimit rl;
rl.rlim_cur = 100; // CPU时间限制100秒
rl.rlim_max = 200;
setrlimit(RLIMIT_CPU, &rl);
// 设置内存限制
rl.rlim_cur = 1024*1024*100; // 100MB
rl.rlim_max = 1024*1024*200;
setrlimit(RLIMIT_AS, &rl);
// 执行作业程序
execlp("./myjob", "myjob", NULL);
perror("execlp failed");
return 1;
} else if (pid > 0) {
// 父进程:记录作业信息
printf("作业PID: %d\n", pid);
int status;
waitpid(pid, &status, 0);
printf("作业完成,状态: %d\n", status);
}
**进程调度**:操作系统使用进程调度算法(如轮转法、优先级调度)为进程分配CPU时间片
### 3.2 多道程序设计与资源管理
在执行阶段,操作系统管理多个并发执行的作业,确保它们不会相互干扰:
- **内存保护**:每个进程有独立的虚拟地址空间
- **CPU调度**:通过时间片轮转实现并发执行
- **I/O管理**:当进程等待I/O时,CPU可以切换到其他进程
### 3.3 作业内部的多任务处理
一个作业可能包含多个任务(进程/线程)。操作系统通过以下方式管理:
```c
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
// 作业的多个任务函数
void* task1(void* arg) {
printf("任务1开始执行\n");
sleep(2);
printf("任务1完成\n");
return NULL;
}
void* task2(void* arg) {
printf("任务2开始执行\n");
sleep(3);
printf("任务2完成\n");
return NULL;
}
int main() {
pthread_t t1, t2;
// 创建两个线程作为作业的子任务
pthread_create(&t1, NULL, task1, NULL);
pthread_create(&t2, NULL, task2, NULL);
// 等待所有任务完成
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("作业所有任务完成\n");
return 0;
}
4. 完成阶段:作业结束运行
4.1 作业完成的条件
作业完成可能由以下情况触发:
- 正常结束:程序执行到最后一条指令
- 异常终止:遇到错误或异常(如段错误)
- 用户终止:用户通过Ctrl+C或kill命令终止
- 系统终止:系统资源不足或超时
4.2 资源回收
当作业完成时,操作系统执行以下清理工作:
- 回收内存:释放进程占用的虚拟内存
- 关闭文件:关闭所有打开的文件描述符
- 释放I/O设备:释放占用的设备资源
- 清理进程表项:从进程表中移除PCB
- 通知父进程:通过SIGCHLD信号通知父进程
在Linux中,资源回收过程:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/resource.h>
void check_resources() {
struct rusage usage;
if (getrusage(RUSAGE_CHILDREN, &usage) == 0) {
printf("CPU时间: %ld.%06ld秒\n",
usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
printf("内存使用: %ld KB\n", usage.ru_maxrss);
printf("页面错误: %ld\n", usage.ru_majflt);
}
}
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:执行一些内存操作
int* arr = malloc(1000000 * sizeof(int));
for (int i = 0; i < 1000000; i++) {
arr[i] = i;
}
free(arr);
return 0;
} else if (pid > 0) {
int status;
waitpid(pid, &status, 0);
// 检查资源使用情况
check_resources();
if (WIFEXITED(status)) {
printf("作业正常退出,退出码: %d\n", WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
printf("作业被信号终止,信号: %d\n", WTERMSIG(status));
}
}
**退出状态处理**:操作系统保存作业的退出状态,供父进程查询
### 4.3 作业状态更新
作业完成时,其状态从**运行(Running)**转换为**完成(Complete)**,并从活动队列移除,但仍可能保留在系统日志中供审计。
## 5. 善后阶段:输出处理与清理
### 5.1 结果输出
操作系统处理作业的输出:
- **标准输出**:默认输出到终端或重定向到文件
- **标准错误**:错误信息输出
- **输出重定向**:通过Shell重定向机制
示例:
```bash
# 输出重定向
./myjob > output.txt 2>&1
# 管道处理
./myjob | grep "error" | tee error.log
5.2 临时文件清理
作业可能创建临时文件,操作系统在作业完成后需要清理:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
char temp_filename[256];
void cleanup(int sig) {
printf("收到信号 %d,清理临时文件\n", sig);
remove(temp_filename);
exit(0);
}
int main() {
// 设置信号处理器
signal(SIGINT, cleanup);
signal(SIGTERM, cleanup);
// 创建临时文件
snprintf(temp_filename, sizeof(temp_filename),
"/tmp/job_%d_XXXXXX", getpid());
int fd = mkstemp(temp_filename);
if (fd == -1) {
perror("mkstemp failed");
return 1;
}
printf("临时文件: %s\n", temp_filename);
// 写入数据
write(fd, "temporary data", 14);
close(fd);
// 模拟长时间运行
printf("作业运行中...按Ctrl+C测试清理\n");
while(1) {
sleep(1);
}
return 0;
}
**日志记录**:系统记录作业的完成时间、资源使用情况等信息
## 6. 作业调度算法详解
### 6.1 作业调度与进程调度的区别
| 特性 | 作业调度(高级调度) | 进程调度(低级调度) |
|------|---------------------|---------------------|
| 调度对象 | 作业(Job) | 进程(Process) |
| 发生频率 | 较低(几分钟一次) | 很高(毫秒级) |
| 调度粒度 | 粗粒度 | 细粒度 |
| 决策依据 | 作业优先级、资源需求 | CPU时间片、优先级 |
### 6.2 常见作业调度算法
#### 6.2.1 先来先服务(FCFS)
```python
# FCFS调度算法示例
def fcfs_scheduling(jobs):
# 按提交时间排序
sorted_jobs = sorted(jobs, key=lambda x: x['submit_time'])
current_time = 0
schedule = []
for job in sorted_jobs:
# 如果作业到达时间晚于当前时间,等待
if job['submit_time'] > current_time:
current_time = job['submit_time']
# 计算开始时间、完成时间和周转时间
start_time = current_time
finish_time = current_time + job['run_time']
turnaround_time = finish_time - job['submit_time']
schedule.append({
'job_id': job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time
})
current_time = finish_time
return schedule
# 示例作业
jobs = [
{'id': 'J1', 'submit_time': 0, 'run_time': 7},
{'id': 'J2', 'submit_time': 2, 'run_time': 4},
{'id': 'J3', 'submit_time': 4, 'run_time': 1},
{'id': 'J4', 'submit_time': 5, 'run_time': 4}
]
schedule = fcfs_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 开始={item['start_time']}, 完成={item['finish_time']}, 周转={item['turnaround_time']}")
6.2.2 短作业优先(SJF)
# SJF调度算法示例
def sjf_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
# 找出所有已提交且未调度的作业
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
# 没有可运行的作业,时间前进到下一个作业提交
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 选择运行时间最短的作业
shortest_job = min(available_jobs, key=lambda x: x['run_time'])
# 调度该作业
start_time = current_time
finish_time = current_time + shortest_job['run_time']
turnaround_time = finish_time - shortest_job['submit_time']
schedule.append({
'job_id': shortest_job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time
})
current_time = finish_time
remaining_jobs.remove(shortest_job)
return schedule
# 使用相同的作业示例
schedule = sjf_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 开始={item['start_time']}, 完成={item['finish_time']}, 周转={item['turnaround_time']}")
6.2.3 优先级调度
# 优先级调度算法示例
def priority_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 选择优先级最高的作业(数值越小优先级越高)
highest_priority_job = min(available_jobs, key=lambda x: x['priority'])
start_time = current_time
finish_time = current_time + highest_priority_job['run_time']
turnaround_time = finish_time - highest_priority_job['submit_time']
schedule.append({
'job_id': highest_priority_job['id'],
'start_time': start_time,
**finish_time**: finish_time,
'turnaround_time': turnaround_time,
'priority': highest_priority_job['priority']
})
current_time = finish_time
remaining_jobs.remove(highest_priority_job)
return schedule
# 带优先级的作业
priority_jobs = [
{'id': 'J1', 'submit_time': 0, 'run_time': 7, 'priority': 3},
{'id': 'J2', 'submit_time': 2, 'run_time': 4, 'priority': 1},
{'id': 'J3', 'submit_time': 4, 'run_time': 1, 'priority': 2},
{'id': 'J4', 'submit_time': 5, 'run_time': 4, 'priority': 4}
]
schedule = priority_scheduling(priority_jobs)
for item in schedule:
print(f"作业{item['job_id']}: 优先级={item['priority']}, 开始={item['start_time']}, 完成={item['finish_time']}")
6.2.4 最高响应比优先(HRRN)
# HRRN调度算法示例
def hrrn_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 计算每个作业的响应比
for job in available_jobs:
waiting_time = current_time - job['submit_time']
job['response_ratio'] = (waiting_time + job['run_time']) / job['run_time']
# 选择响应比最高的作业
highest_ratio_job = max(available_jobs, key=lambda x: x['response_ratio'])
start_time = current_time
finish_time = current_time + highest_ratio_job['run_time']
turnaround_time = finish_time - highest_ratio_job['submit_time']
schedule.append({
'job_id': highest_ratio_job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time,
'response_ratio': highest_ratio_job['response_ratio']
})
current_time = finish_time
remaining_jobs.remove(highest_ratio_job)
return schedule
schedule = hrrn_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 响应比={item['response_ratio']:.2f}, 开始={item['start_time']}, 完成={item['finish_time']}")
6.3 现代操作系统的混合调度策略
现代操作系统通常采用多级反馈队列(MLFQ)等混合策略:
// 多级反馈队列概念示例
#define QUEUE_LEVELS 3
#define TIME_QUANTUM_0 10 // 最高优先级队列时间片
#define TIME_QUANTUM_1 20 // 中等优先级队列时间片
#define TIME_QUANTUM_2 40 // 低优先级队列时间片
typedef struct {
int queue_level; // 所在队列级别
int time_used; // 已使用CPU时间
int priority; // 动态优先级
} ProcessInfo;
void mlfq_schedule(ProcessInfo* processes, int count) {
// 简化的MLFQ逻辑
for (int level = 0; level < QUEUE_LEVELS; level++) {
// 处理当前队列的所有进程
for (int i = 0; i < count; i++) {
if (processes[i].queue_level == level) {
int time_slice = (level == 0) ? TIME_QUANTUM_0 :
(level == 1) ? TIME_QUANTUM_1 : TIME_QUANTUM_2;
printf("调度进程: 级别=%d, 时间片=%d\n", level, time_slice);
// 如果进程未用完时间片,保持在当前队列
// 如果用完时间片,降级到下一级队列
if (processes[i].time_used >= time_slice) {
if (level < QUEUE_LEVELS - 1) {
processes[i].queue_level++;
printf("进程降级到级别%d\n", level + 1);
}
}
}
}
}
}
7. 现代操作系统的作业管理实践
7.1 Linux系统的作业管理
7.1.1 Shell作业控制
# 后台运行作业
./long_running_job &
# 查看后台作业
jobs -l
# 挂起作业(Ctrl+Z)
# 恢复作业到前台
fg %1
# 恢复作业到后台
bg %1
# 终止作业
kill %1
# 等待作业完成
wait %1
7.1.2 系统级作业管理
// 使用waitid系统调用等待作业完成
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
void handle_child_exit(int sig, siginfo_t* info, void* context) {
printf("子进程%d退出,状态: %d\n", info->si_pid, info->si_status);
}
int main() {
struct sigaction sa;
sa.sa_sigaction = handle_child_exit;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
sigaction(SIGCHLD, &sa, NULL);
pid_t pid = fork();
if (pid == 0) {
// 子进程
printf("子进程运行中...\n");
sleep(3);
return 42;
} else if (pid > 0) {
printf("父进程等待子进程完成...\n");
// 父进程可以继续其他工作
sleep(5);
}
return 0;
}
7.2 Windows系统的作业管理
Windows使用作业对象(Job Object)来管理一组进程:
#include <windows.h>
#include <stdio.h>
int main() {
// 创建作业对象
HANDLE hJob = CreateJobObject(NULL, "MyJob");
if (hJob == NULL) {
printf("创建作业失败\n");
return 1;
}
// 设置作业限制
JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = {0};
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
SetInformationJobObject(hJob, JobObjectExtendedLimitInformation, &jeli, sizeof(jeli));
// 创建进程并关联到作业
STARTUPINFO si = {sizeof(si)};
PROCESS_INFORMATION pi;
if (CreateProcess(NULL, "notepad.exe", NULL, NULL, FALSE,
CREATE_SUSPENDED, NULL, NULL, &si, &pi)) {
// 将进程添加到作业
AssignProcessToJobObject(hJob, pi.hProcess);
// 恢复进程执行
ResumeThread(pi.hThread);
printf("进程已添加到作业\n");
// 等待作业完成
WaitForSingleObject(hJob, INFINITE);
printf("作业完成\n");
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);
}
CloseHandle(hJob);
return 0;
}
7.3 分布式作业调度
在集群和云计算环境中,作业管理扩展到分布式系统:
# 简化的分布式作业调度器概念
import threading
import time
from queue import Queue
class DistributedJobScheduler:
def __init__(self):
self.job_queue = Queue()
self.worker_nodes = []
self.completed_jobs = []
def add_worker(self, node_id, capacity):
self.worker_nodes.append({
'id': node_id,
'capacity': capacity,
'current_load': 0,
'status': 'idle'
})
def submit_job(self, job):
self.job_queue.put(job)
print(f"作业{job['id']}已提交")
def schedule(self):
while not self.job_queue.empty():
job = self.job_queue.get()
# 选择负载最小的节点
available_nodes = [n for n in self.worker_nodes
if n['current_load'] < n['capacity']]
if not available_nodes:
print("没有可用节点,等待...")
time.sleep(1)
self.job_queue.put(job) # 重新入队
continue
selected_node = min(available_nodes, key=lambda x: x['current_load'])
# 分配作业
selected_node['current_load'] += job['resource_need']
selected_node['status'] = 'busy'
print(f"作业{job['id']}分配到节点{selected_node['id']}")
# 模拟执行
def execute_job(node, job):
time.sleep(job['run_time'])
node['current_load'] -= job['resource_need']
node['status'] = 'idle' if node['current_load'] == 0 else 'busy'
self.completed_jobs.append(job)
print(f"作业{job['id']}在节点{node['id']}完成")
threading.Thread(target=execute_job, args=(selected_node, job)).start()
def wait_all(self):
while not self.job_queue.empty() or any(n['status'] == 'busy' for n in self.worker_nodes):
time.sleep(0.5)
print("所有作业完成")
# 使用示例
scheduler = DistributedJobScheduler()
scheduler.add_worker('node1', 10)
scheduler.add_worker('node2', 10)
jobs = [
{'id': 'J1', 'run_time': 2, 'resource_need': 3},
{'id': 'J2', 'run_time': 1, 'resource_need': 5},
{'id': 'J3', 'run_time': 3, 'resource_need': 2},
{'id': 'J4', 'run_time': 2, 'resource_need': 4}
]
for job in jobs:
scheduler.submit_job(job)
scheduler.schedule()
scheduler.wait_all()
8. 作业管理的性能指标
8.1 关键性能指标
周转时间(Turnaround Time):作业从提交到完成的总时间
- 公式:完成时间 - 提交时间
等待时间(Waiting Time):作业在后备队列中等待的时间
- 公式:开始时间 - 提交时间
响应时间(Response Time):首次得到响应的时间
- 公式:首次获得CPU时间 - 提交时间
吞吐量(Throughput):单位时间完成的作业数
- 公式:完成作业数 / 总时间
CPU利用率:CPU忙于执行作业的时间比例
- 公式:(总时间 - 空闲时间) / 总时间
8.2 性能分析示例
def calculate_metrics(schedule):
"""计算作业调度的性能指标"""
total_turnaround = 0
total_waiting = 0
total_response = 0
for job in schedule:
turnaround = job['finish_time'] - job['submit_time']
waiting = job['start_time'] - job['submit_time']
response = job['start_time'] - job['submit_time']
total_turnaround += turnaround
total_waiting += waiting
total_response += response
print(f"作业{job['job_id']}: 周转={turnaround}, 等待={waiting}, 响应={response}")
n = len(schedule)
avg_turnaround = total_turnaround / n
avg_waiting = total_waiting / n
avg_response = total_response / n
# 吞吐量 = 作业数 / 总时间
total_time = max(job['finish_time'] for job in schedule)
throughput = n / total_time
print(f"\n平均周转时间: {avg_turnaround:.2f}")
print(f"平均等待时间: {avg_waiting:.2f}")
print(f"平均响应时间: {avg_response:.2f}")
print(f"吞吐量: {throughput:.2f} 作业/单位时间")
return {
'avg_turnaround': avg_turnaround,
'avg_waiting': avg_waiting,
'avg_response': avg_response,
'throughput': throughput
}
# 使用之前的调度结果
schedule = [
{'job_id': 'J1', 'submit_time': 0, 'start_time': 0, 'finish_time': 7},
{'job_id': 'J2', 'submit_time': 2, 'start_time': 7, 'finish_time': 11},
{'job_id': 'J3', 'submit_time': 4, 'start_time': 11, 'finish_time': 12},
{'job_id': 'J4', 'submit_time': 5, 'start_time': 12, 'finish_time': 16}
]
calculate_metrics(schedule)
9. 作业管理的挑战与解决方案
9.1 资源竞争与死锁
问题:多个作业竞争有限资源可能导致死锁。
解决方案:
- 资源预留:作业提交时声明所需资源,调度器确保资源可用才调度
- 死锁检测与恢复:定期检测死锁,通过终止进程或回滚解决
- 银行家算法:在分配资源前检查系统是否仍处于安全状态
// 简化的银行家算法示例
#define MAX_RESOURCES 5
#define MAX_JOBS 3
int available[MAX_RESOURCES] = {3, 3, 2};
int max_demand[MAX_JOBS][MAX_RESOURCES] = {
{7, 5, 3},
{3, 2, 2},
{9, 0, 2}
};
int allocation[MAX_JOBS][MAX_RESOURCES] = {0};
int need[MAX_JOBS][MAX_RESOURCES];
void init_need() {
for (int i = 0; i < MAX_JOBS; i++) {
for (int j = 0; j < MAX_RESOURCES; j++) {
need[i][j] = max_demand[i][j] - allocation[i][j];
}
}
}
int is_safe() {
int work[MAX_RESOURCES];
int finish[MAX_JOBS] = {0};
for (int i = 0; i < MAX_RESOURCES; i++) {
work[i] = available[i];
}
int safe_seq[MAX_JOBS];
int count = 0;
while (count < MAX_JOBS) {
int found = 0;
for (int i = 0; i < MAX_JOBS; i++) {
if (finish[i] == 0) {
int can_allocate = 1;
for (int j = 0; j < MAX_RESOURCES; j++) {
if (need[i][j] > work[j]) {
can_allocate = 0;
break;
}
}
if (can_allocate) {
// 分配资源并模拟完成
for (int j = 0; j < MAX_RESOURCES; j++) {
work[j] += allocation[i][j];
}
safe_seq[count++] = i;
finish[i] = 1;
found = 1;
}
}
}
if (found == 0) {
printf("系统不安全\n");
return 0;
}
}
printf("系统安全,安全序列: ");
for (int i = 0; i < MAX_JOBS; i++) {
printf("J%d ", safe_seq[i]);
}
printf("\n");
return 1;
}
// 检查资源请求
int request_resources(int job_id, int request[]) {
for (int i = 0; i < MAX_RESOURCES; i++) {
if (request[i] > need[job_id][i]) {
printf("错误:请求超过最大需求\n");
return -1;
}
if (request[i] > available[i]) {
printf("错误:资源不足\n");
return -1;
}
}
// 临时分配
for (int i = 0; i < MAX_RESOURCES; i++) {
available[i] -= request[i];
allocation[job_id][i] += request[i];
need[job_id][i] -= request[i];
}
// 检查安全性
if (is_safe()) {
printf("请求批准\n");
return 0;
} else {
// 回滚
for (int i = 0; i < MAX_RESOURCES; i++) {
available[i] += request[i];
allocation[job_id][i] -= request[i];
need[job_id][i] += request[i];
}
printf("请求拒绝,系统将进入不安全状态\n");
return -1;
}
}
9.2 作业饿死(Starvation)
问题:低优先级作业长期得不到执行。
解决方案:
- 优先级老化(Aging):随时间增加等待作业的优先级
- 公平调度:确保每个作业都能获得CPU时间
- 多级反馈队列:动态调整优先级
// 优先级老化示例
#define MAX_PRIORITY 100
#define MIN_PRIORITY 0
typedef struct {
int job_id;
int current_priority;
int wait_time;
int run_time;
} Job;
void aging(Job* jobs, int count) {
for (int i = 0; i < count; i++) {
if (jobs[i].wait_time > 0) {
// 每等待1个时间单位,优先级提升1
jobs[i].current_priority =
MAX_PRIORITY - jobs[i].wait_time;
if (jobs[i].current_priority < MIN_PRIORITY) {
jobs[i].current_priority = MIN_PRIORITY;
}
}
}
}
9.3 资源碎片化
问题:内存碎片导致大作业无法分配。
解决方案:
- 动态分区分配:使用首次适应、最佳适应等算法
- 紧凑技术:移动内存中的作业减少碎片
- 虚拟内存:使用分页机制避免外部碎片
10. 作业管理的未来发展趋势
10.1 云计算环境下的作业管理
现代云平台(如AWS Batch、Azure Batch、Google Cloud Dataflow)提供了强大的作业管理能力:
- 弹性伸缩:根据作业负载自动调整资源
- 容器化:使用Docker/Kubernetes打包作业环境
- 无服务器计算:作业以函数形式执行,无需管理服务器
# Kubernetes Job 示例
apiVersion: batch/v1
kind: Job
metadata:
name: data-processing-job
spec:
parallelism: 3 # 并行执行3个实例
completions: 10 # 总共需要完成10次
template:
spec:
containers:
- name: processor
image: my-processor:latest
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
restartPolicy: OnFailure
10.2 AI驱动的智能调度
机器学习被用于预测作业行为,优化调度决策:
- 作业运行时间预测:基于历史数据预测新作业的运行时间
- 资源需求预测:预测作业的CPU、内存、I/O需求
- 异常检测:识别异常作业行为,提前干预
10.3 边缘计算中的作业管理
在边缘计算环境中,作业管理需要考虑:
- 异构资源:不同边缘节点的计算能力差异
- 网络延迟:作业迁移和数据传输的成本
- 隐私保护:敏感数据的本地处理
11. 实际案例分析:HPC集群作业管理
11.1 Slurm工作负载管理器
Slurm是高性能计算(HPC)领域广泛使用的作业调度器:
# 提交作业脚本
#!/bin/bash
#SBATCH --job-name=mpi_job
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=8
#SBATCH --time=00:30:00
#SBATCH --partition=compute
module load openmpi/4.0.5
mpirun -np 32 ./my_mpi_program
# 作业管理命令
sbatch job_script.sh # 提交作业
squeue # 查看作业队列
scontrol show job 12345 # 查看作业详情
scancel 12345 # 取消作业
sacct # 查看作业历史
11.2 作业调度器的实现架构
// 简化的作业调度器架构
typedef struct {
int job_id;
char user[32];
int priority;
int nodes;
int tasks_per_node;
int time_limit;
char state[16]; // PENDING, RUNNING, COMPLETED, FAILED
} SlurmJob;
typedef struct {
SlurmJob* jobs;
int job_count;
int* node_states; // 0=空闲, 1=占用
int node_count;
} SchedulerState;
void schedule_loop(SchedulerState* state) {
while (1) {
// 1. 检查新提交的作业
check_new_jobs(state);
// 2. 从等待队列中选择作业
for (int i = 0; i < state->job_count; i++) {
if (strcmp(state->jobs[i].state, "PENDING") == 0) {
// 检查资源是否足够
if (can_allocate(state, &state->jobs[i])) {
allocate_resources(state, &state->jobs[i]);
strcpy(state->jobs[i].state, "RUNNING");
launch_job(&state->jobs[i]);
}
}
}
// 3. 检查运行中的作业是否完成
check_completed_jobs(state);
sleep(1); // 每秒调度一次
}
}
12. 总结
操作系统的作业管理是一个复杂而精密的系统,它涵盖了从作业提交到完成的整个生命周期。通过作业控制块、调度算法、资源管理等机制,操作系统实现了对计算任务的高效、公平、安全的管理。
关键要点总结:
- 作业生命周期:提交 → 收容 → 执行 → 完成 → 善后
- 核心数据结构:JCB(作业控制块)、PCB(进程控制块)
- 调度算法:FCFS、SJF、优先级、HRRN、多级反馈队列
- 资源管理:内存分配、CPU调度、I/O管理
- 现代发展:云计算、容器化、AI调度、边缘计算
随着计算技术的不断发展,作业管理也在持续演进,从传统的批处理系统到现代的云原生环境,从单机调度到分布式协调,作业管理始终是操作系统的核心功能之一,为各种计算场景提供基础支撑。# 操作系统如何管理作业从提交到完成的全过程详解
引言:作业管理的核心概念
操作系统作为计算机系统的核心软件,负责管理和协调硬件资源与软件程序的交互。其中,作业管理是操作系统最重要的功能之一。作业(Job)是指用户在一次计算或事务处理过程中要求计算机系统所做工作的集合,它从用户提交开始,到最终完成结束,整个生命周期都受到操作系统的严密监控和调度。
作业管理的目标是提高系统吞吐量、缩短响应时间、公平分配资源,并确保系统的稳定性和安全性。现代操作系统通常采用多道程序设计技术,允许多个作业同时驻留在内存中,通过精心的调度算法实现资源的最优利用。
作业的生命周期概述
一个完整的作业生命周期通常包含以下五个主要阶段:
- 提交阶段(Submission):用户将作业输入到计算机系统
- 收容阶段(Holding):作业进入后备队列,等待进入内存
- 执行阶段(Execution):作业被调入内存并开始运行
- 完成阶段(Completion):作业运行结束,系统回收资源
- 善后阶段(Post-processing):输出结果,清理临时资源
接下来,我们将详细解析每个阶段的操作系统管理机制。
1. 提交阶段:作业进入系统
1.1 作业的输入方式
用户可以通过多种方式提交作业:
- 命令行方式:在终端输入命令,如
./program arg1 arg2 - 批处理文件:将多个命令写入脚本文件,如
batch_job.sh - 图形界面:通过GUI应用程序提交任务
- 网络提交:通过远程登录或Web服务提交作业
1.2 作业控制语言(JCL)
在批处理系统中,用户通常使用作业控制语言(Job Control Language, JCL)来描述作业的要求。例如,在IBM大型机系统中:
//JOB1 JOB (ACCT),'USER',CLASS=A,MSGCLASS=H
//STEP1 EXEC PGM=PROGA
//SYSPRINT DD SYSOUT=*
//SYSIN DD *
input data here
/*
//STEP2 EXEC PGM=PROGB
//SYSPRINT DD SYSOUT=*
//SYSIN DD *
more input
/*
//EOF
这段JCL定义了一个包含两个步骤的作业,每个步骤运行不同的程序,并指定了输入输出设备。
1.3 操作系统接收作业
当作业提交后,操作系统通过以下方式接收:
- 命令解释器(Shell):在交互式系统中,Shell接收用户命令并创建进程
- 作业提交守护进程:在批处理系统中,专门的守护进程(如Linux的
atd或cron)接收作业 - 系统调用接口:用户程序通过
fork()、exec()等系统调用创建新进程
在Linux系统中,一个简单的作业提交过程如下:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:执行作业
execlp("ls", "ls", "-l", NULL);
perror("execlp failed");
return 1;
} else if (pid > 0) {
// 父进程:等待子进程完成
wait(NULL);
printf("作业完成\n");
} else {
perror("fork failed");
}
return 0;
}
2. 收容阶段:作业进入后备队列
2.1 作业控制块(JCB)
当作业被接受后,操作系统会为其创建一个作业控制块(Job Control Block, JCB)。JCB是作业在系统中的唯一标识,包含了作业的所有元数据。
JCB通常包含以下信息:
| 字段 | 说明 |
|---|---|
| 作业标识符 | 唯一的作业ID |
| 用户名 | 提交作业的用户 |
| 优先级 | 作业的优先级别 |
| 作业类型 | CPU密集型、I/O密集型等 |
| 资源需求 | CPU时间、内存大小、I/O设备等 |
| 作业状态 | 提交、后备、运行、完成等 |
| 创建时间 | 作业提交的时间戳 |
| 预计运行时间 | 作业预估的执行时间 |
| 内存需求 | 所需的内存空间大小 |
2.2 后备队列管理
作业进入系统后,首先被放入后备队列(Job Queue),等待作业调度程序将其调入内存。后备队列通常按某种策略排序:
- 先来先服务(FCFS):按提交时间排序
- 优先级调度:按优先级高低排序
- 短作业优先(SJF):按预计运行时间排序
2.3 作业状态转换
作业在收容阶段的状态为提交(Submit)或后备(Holding)。当作业调度程序选择该作业时,状态转换为就绪(Ready)。
状态转换图如下:
提交 → 后备 → 就绪 → 运行 → 完成
3. 执行阶段:作业在内存中运行
3.1 进程创建与内存分配
当作业被作业调度程序选中后,操作系统执行以下步骤:
- 创建进程:为作业创建一个或多个进程
- 分配内存:为进程分配内存空间
- 加载程序:将程序代码和数据加载到内存
- 初始化:设置进程控制块(PCB)、打开文件表等
在Linux中,这个过程通过fork()和exec()系统调用实现:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/resource.h>
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:设置资源限制
struct rlimit rl;
rl.rlim_cur = 100; // CPU时间限制100秒
rl.rlim_max = 200;
setrlimit(RLIMIT_CPU, &rl);
// 设置内存限制
rl.rlim_cur = 1024*1024*100; // 100MB
rl.rlim_max = 1024*1024*200;
setrlimit(RLIMIT_AS, &rl);
// 执行作业程序
execlp("./myjob", "myjob", NULL);
perror("execlp failed");
return 1;
} else if (pid > 0) {
// 父进程:记录作业信息
printf("作业PID: %d\n", pid);
int status;
waitpid(pid, &status, 0);
printf("作业完成,状态: %d\n", status);
}
**进程调度**:操作系统使用进程调度算法(如轮转法、优先级调度)为进程分配CPU时间片
### 3.2 多道程序设计与资源管理
在执行阶段,操作系统管理多个并发执行的作业,确保它们不会相互干扰:
- **内存保护**:每个进程有独立的虚拟地址空间
- **CPU调度**:通过时间片轮转实现并发执行
- **I/O管理**:当进程等待I/O时,CPU可以切换到其他进程
### 3.3 作业内部的多任务处理
一个作业可能包含多个任务(进程/线程)。操作系统通过以下方式管理:
```c
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
// 作业的多个任务函数
void* task1(void* arg) {
printf("任务1开始执行\n");
sleep(2);
printf("任务1完成\n");
return NULL;
}
void* task2(void* arg) {
printf("任务2开始执行\n");
sleep(3);
printf("任务2完成\n");
return NULL;
}
int main() {
pthread_t t1, t2;
// 创建两个线程作为作业的子任务
pthread_create(&t1, NULL, task1, NULL);
pthread_create(&t2, NULL, task2, NULL);
// 等待所有任务完成
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("作业所有任务完成\n");
return 0;
}
4. 完成阶段:作业结束运行
4.1 作业完成的条件
作业完成可能由以下情况触发:
- 正常结束:程序执行到最后一条指令
- 异常终止:遇到错误或异常(如段错误)
- 用户终止:用户通过Ctrl+C或kill命令终止
- 系统终止:系统资源不足或超时
4.2 资源回收
当作业完成时,操作系统执行以下清理工作:
- 回收内存:释放进程占用的虚拟内存
- 关闭文件:关闭所有打开的文件描述符
- 释放I/O设备:释放占用的设备资源
- 清理进程表项:从进程表中移除PCB
- 通知父进程:通过SIGCHLD信号通知父进程
在Linux中,资源回收过程:
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/resource.h>
void check_resources() {
struct rusage usage;
if (getrusage(RUSAGE_CHILDREN, &usage) == 0) {
printf("CPU时间: %ld.%06ld秒\n",
usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
printf("内存使用: %ld KB\n", usage.ru_maxrss);
printf("页面错误: %ld\n", usage.ru_majflt);
}
}
int main() {
pid_t pid = fork();
if (pid == 0) {
// 子进程:执行一些内存操作
int* arr = malloc(1000000 * sizeof(int));
for (int i = 0; i < 1000000; i++) {
arr[i] = i;
}
free(arr);
return 0;
} else if (pid > 0) {
int status;
waitpid(pid, &status, 0);
// 检查资源使用情况
check_resources();
if (WIFEXITED(status)) {
printf("作业正常退出,退出码: %d\n", WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
printf("作业被信号终止,信号: %d\n", WTERMSIG(status));
}
}
**退出状态处理**:操作系统保存作业的退出状态,供父进程查询
### 4.3 作业状态更新
作业完成时,其状态从**运行(Running)**转换为**完成(Complete)**,并从活动队列移除,但仍可能保留在系统日志中供审计。
## 5. 善后阶段:输出处理与清理
### 5.1 结果输出
操作系统处理作业的输出:
- **标准输出**:默认输出到终端或重定向到文件
- **标准错误**:错误信息输出
- **输出重定向**:通过Shell重定向机制
示例:
```bash
# 输出重定向
./myjob > output.txt 2>&1
# 管道处理
./myjob | grep "error" | tee error.log
5.2 临时文件清理
作业可能创建临时文件,操作系统在作业完成后需要清理:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
char temp_filename[256];
void cleanup(int sig) {
printf("收到信号 %d,清理临时文件\n", sig);
remove(temp_filename);
exit(0);
}
int main() {
// 设置信号处理器
signal(SIGINT, cleanup);
signal(SIGTERM, cleanup);
// 创建临时文件
snprintf(temp_filename, sizeof(temp_filename),
"/tmp/job_%d_XXXXXX", getpid());
int fd = mkstemp(temp_filename);
if (fd == -1) {
perror("mkstemp failed");
return 1;
}
printf("临时文件: %s\n", temp_filename);
// 写入数据
write(fd, "temporary data", 14);
close(fd);
// 模拟长时间运行
printf("作业运行中...按Ctrl+C测试清理\n");
while(1) {
sleep(1);
}
return 0;
}
5.3 日志记录
系统记录作业的完成时间、资源使用情况等信息,用于审计和性能分析。
6. 作业调度算法详解
6.1 作业调度与进程调度的区别
| 特性 | 作业调度(高级调度) | 进程调度(低级调度) |
|---|---|---|
| 调度对象 | 作业(Job) | 进程(Process) |
| 发生频率 | 较低(几分钟一次) | 很高(毫秒级) |
| 调度粒度 | 粗粒度 | 细粒度 |
| 决策依据 | 作业优先级、资源需求 | CPU时间片、优先级 |
6.2 常见作业调度算法
6.2.1 先来先服务(FCFS)
# FCFS调度算法示例
def fcfs_scheduling(jobs):
# 按提交时间排序
sorted_jobs = sorted(jobs, key=lambda x: x['submit_time'])
current_time = 0
schedule = []
for job in sorted_jobs:
# 如果作业到达时间晚于当前时间,等待
if job['submit_time'] > current_time:
current_time = job['submit_time']
# 计算开始时间、完成时间和周转时间
start_time = current_time
finish_time = current_time + job['run_time']
turnaround_time = finish_time - job['submit_time']
schedule.append({
'job_id': job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time
})
current_time = finish_time
return schedule
# 示例作业
jobs = [
{'id': 'J1', 'submit_time': 0, 'run_time': 7},
{'id': 'J2', 'submit_time': 2, 'run_time': 4},
{'id': 'J3', 'submit_time': 4, 'run_time': 1},
{'id': 'J4', 'submit_time': 5, 'run_time': 4}
]
schedule = fcfs_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 开始={item['start_time']}, 完成={item['finish_time']}, 周转={item['turnaround_time']}")
6.2.2 短作业优先(SJF)
# SJF调度算法示例
def sjf_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
# 找出所有已提交且未调度的作业
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
# 没有可运行的作业,时间前进到下一个作业提交
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 选择运行时间最短的作业
shortest_job = min(available_jobs, key=lambda x: x['run_time'])
# 调度该作业
start_time = current_time
finish_time = current_time + shortest_job['run_time']
turnaround_time = finish_time - shortest_job['submit_time']
schedule.append({
'job_id': shortest_job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time
})
current_time = finish_time
remaining_jobs.remove(shortest_job)
return schedule
# 使用相同的作业示例
schedule = sjf_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 开始={item['start_time']}, 完成={item['finish_time']}, 周转={item['turnaround_time']}")
6.2.3 优先级调度
# 优先级调度算法示例
def priority_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 选择优先级最高的作业(数值越小优先级越高)
highest_priority_job = min(available_jobs, key=lambda x: x['priority'])
start_time = current_time
finish_time = current_time + highest_priority_job['run_time']
turnaround_time = finish_time - highest_priority_job['submit_time']
schedule.append({
'job_id': highest_priority_job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time,
'priority': highest_priority_job['priority']
})
current_time = finish_time
remaining_jobs.remove(highest_priority_job)
return schedule
# 带优先级的作业
priority_jobs = [
{'id': 'J1', 'submit_time': 0, 'run_time': 7, 'priority': 3},
{'id': 'J2', 'submit_time': 2, 'run_time': 4, 'priority': 1},
{'id': 'J3', 'submit_time': 4, 'run_time': 1, 'priority': 2},
{'id': 'J4', 'submit_time': 5, 'run_time': 4, 'priority': 4}
]
schedule = priority_scheduling(priority_jobs)
for item in schedule:
print(f"作业{item['job_id']}: 优先级={item['priority']}, 开始={item['start_time']}, 完成={item['finish_time']}")
6.2.4 最高响应比优先(HRRN)
# HRRN调度算法示例
def hrrn_scheduling(jobs):
current_time = 0
schedule = []
remaining_jobs = jobs.copy()
while remaining_jobs:
available_jobs = [job for job in remaining_jobs
if job['submit_time'] <= current_time]
if not available_jobs:
next_submit = min(job['submit_time'] for job in remaining_jobs)
current_time = next_submit
continue
# 计算每个作业的响应比
for job in available_jobs:
waiting_time = current_time - job['submit_time']
job['response_ratio'] = (waiting_time + job['run_time']) / job['run_time']
# 选择响应比最高的作业
highest_ratio_job = max(available_jobs, key=lambda x: x['response_ratio'])
start_time = current_time
finish_time = current_time + highest_ratio_job['run_time']
turnaround_time = finish_time - highest_ratio_job['submit_time']
schedule.append({
'job_id': highest_ratio_job['id'],
'start_time': start_time,
'finish_time': finish_time,
'turnaround_time': turnaround_time,
'response_ratio': highest_ratio_job['response_ratio']
})
current_time = finish_time
remaining_jobs.remove(highest_ratio_job)
return schedule
schedule = hrrn_scheduling(jobs)
for item in schedule:
print(f"作业{item['job_id']}: 响应比={item['response_ratio']:.2f}, 开始={item['start_time']}, 完成={item['finish_time']}")
6.3 现代操作系统的混合调度策略
现代操作系统通常采用多级反馈队列(MLFQ)等混合策略:
// 多级反馈队列概念示例
#define QUEUE_LEVELS 3
#define TIME_QUANTUM_0 10 // 最高优先级队列时间片
#define TIME_QUANTUM_1 20 // 中等优先级队列时间片
#define TIME_QUANTUM_2 40 // 低优先级队列时间片
typedef struct {
int queue_level; // 所在队列级别
int time_used; // 已使用CPU时间
int priority; // 动态优先级
} ProcessInfo;
void mlfq_schedule(ProcessInfo* processes, int count) {
// 简化的MLFQ逻辑
for (int level = 0; level < QUEUE_LEVELS; level++) {
// 处理当前队列的所有进程
for (int i = 0; i < count; i++) {
if (processes[i].queue_level == level) {
int time_slice = (level == 0) ? TIME_QUANTUM_0 :
(level == 1) ? TIME_QUANTUM_1 : TIME_QUANTUM_2;
printf("调度进程: 级别=%d, 时间片=%d\n", level, time_slice);
// 如果进程未用完时间片,保持在当前队列
// 如果用完时间片,降级到下一级队列
if (processes[i].time_used >= time_slice) {
if (level < QUEUE_LEVELS - 1) {
processes[i].queue_level++;
printf("进程降级到级别%d\n", level + 1);
}
}
}
}
}
}
7. 现代操作系统的作业管理实践
7.1 Linux系统的作业管理
7.1.1 Shell作业控制
# 后台运行作业
./long_running_job &
# 查看后台作业
jobs -l
# 挂起作业(Ctrl+Z)
# 恢复作业到前台
fg %1
# 恢复作业到后台
bg %1
# 终止作业
kill %1
# 等待作业完成
wait %1
7.1.2 系统级作业管理
// 使用waitid系统调用等待作业完成
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
void handle_child_exit(int sig, siginfo_t* info, void* context) {
printf("子进程%d退出,状态: %d\n", info->si_pid, info->si_status);
}
int main() {
struct sigaction sa;
sa.sa_sigaction = handle_child_exit;
sa.sa_flags = SA_SIGINFO | SA_RESTART;
sigaction(SIGCHLD, &sa, NULL);
pid_t pid = fork();
if (pid == 0) {
// 子进程
printf("子进程运行中...\n");
sleep(3);
return 42;
} else if (pid > 0) {
printf("父进程等待子进程完成...\n");
// 父进程可以继续其他工作
sleep(5);
}
return 0;
}
7.2 Windows系统的作业管理
Windows使用作业对象(Job Object)来管理一组进程:
#include <windows.h>
#include <stdio.h>
int main() {
// 创建作业对象
HANDLE hJob = CreateJobObject(NULL, "MyJob");
if (hJob == NULL) {
printf("创建作业失败\n");
return 1;
}
// 设置作业限制
JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = {0};
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
SetInformationJobObject(hJob, JobObjectExtendedLimitInformation, &jeli, sizeof(jeli));
// 创建进程并关联到作业
STARTUPINFO si = {sizeof(si)};
PROCESS_INFORMATION pi;
if (CreateProcess(NULL, "notepad.exe", NULL, NULL, FALSE,
CREATE_SUSPENDED, NULL, NULL, &si, &pi)) {
// 将进程添加到作业
AssignProcessToJobObject(hJob, pi.hProcess);
// 恢复进程执行
ResumeThread(pi.hThread);
printf("进程已添加到作业\n");
// 等待作业完成
WaitForSingleObject(hJob, INFINITE);
printf("作业完成\n");
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);
}
CloseHandle(hJob);
return 0;
}
7.3 分布式作业调度
在集群和云计算环境中,作业管理扩展到分布式系统:
# 简化的分布式作业调度器概念
import threading
import time
from queue import Queue
class DistributedJobScheduler:
def __init__(self):
self.job_queue = Queue()
self.worker_nodes = []
self.completed_jobs = []
def add_worker(self, node_id, capacity):
self.worker_nodes.append({
'id': node_id,
'capacity': capacity,
'current_load': 0,
'status': 'idle'
})
def submit_job(self, job):
self.job_queue.put(job)
print(f"作业{job['id']}已提交")
def schedule(self):
while not self.job_queue.empty():
job = self.job_queue.get()
# 选择负载最小的节点
available_nodes = [n for n in self.worker_nodes
if n['current_load'] < n['capacity']]
if not available_nodes:
print("没有可用节点,等待...")
time.sleep(1)
self.job_queue.put(job) # 重新入队
continue
selected_node = min(available_nodes, key=lambda x: x['current_load'])
# 分配作业
selected_node['current_load'] += job['resource_need']
selected_node['status'] = 'busy'
print(f"作业{job['id']}分配到节点{selected_node['id']}")
# 模拟执行
def execute_job(node, job):
time.sleep(job['run_time'])
node['current_load'] -= job['resource_need']
node['status'] = 'idle' if node['current_load'] == 0 else 'busy'
self.completed_jobs.append(job)
print(f"作业{job['id']}在节点{node['id']}完成")
threading.Thread(target=execute_job, args=(selected_node, job)).start()
def wait_all(self):
while not self.job_queue.empty() or any(n['status'] == 'busy' for n in self.worker_nodes):
time.sleep(0.5)
print("所有作业完成")
# 使用示例
scheduler = DistributedJobScheduler()
scheduler.add_worker('node1', 10)
scheduler.add_worker('node2', 10)
jobs = [
{'id': 'J1', 'run_time': 2, 'resource_need': 3},
{'id': 'J2', 'run_time': 1, 'resource_need': 5},
{'id': 'J3', 'run_time': 3, 'resource_need': 2},
{'id': 'J4', 'run_time': 2, 'resource_need': 4}
]
for job in jobs:
scheduler.submit_job(job)
scheduler.schedule()
scheduler.wait_all()
8. 作业管理的性能指标
8.1 关键性能指标
周转时间(Turnaround Time):作业从提交到完成的总时间
- 公式:完成时间 - 提交时间
等待时间(Waiting Time):作业在后备队列中等待的时间
- 公式:开始时间 - 提交时间
响应时间(Response Time):首次得到响应的时间
- 公式:首次获得CPU时间 - 提交时间
吞吐量(Throughput):单位时间完成的作业数
- 公式:完成作业数 / 总时间
CPU利用率:CPU忙于执行作业的时间比例
- 公式:(总时间 - 空闲时间) / 总时间
8.2 性能分析示例
def calculate_metrics(schedule):
"""计算作业调度的性能指标"""
total_turnaround = 0
total_waiting = 0
total_response = 0
for job in schedule:
turnaround = job['finish_time'] - job['submit_time']
waiting = job['start_time'] - job['submit_time']
response = job['start_time'] - job['submit_time']
total_turnaround += turnaround
total_waiting += waiting
total_response += response
print(f"作业{job['job_id']}: 周转={turnaround}, 等待={waiting}, 响应={response}")
n = len(schedule)
avg_turnaround = total_turnaround / n
avg_waiting = total_waiting / n
avg_response = total_response / n
# 吞吐量 = 作业数 / 总时间
total_time = max(job['finish_time'] for job in schedule)
throughput = n / total_time
print(f"\n平均周转时间: {avg_turnaround:.2f}")
print(f"平均等待时间: {avg_waiting:.2f}")
print(f"平均响应时间: {avg_response:.2f}")
print(f"吞吐量: {throughput:.2f} 作业/单位时间")
return {
'avg_turnaround': avg_turnaround,
'avg_waiting': avg_waiting,
'avg_response': avg_response,
'throughput': throughput
}
# 使用之前的调度结果
schedule = [
{'job_id': 'J1', 'submit_time': 0, 'start_time': 0, 'finish_time': 7},
{'job_id': 'J2', 'submit_time': 2, 'start_time': 7, 'finish_time': 11},
{'job_id': 'J3', 'submit_time': 4, 'start_time': 11, 'finish_time': 12},
{'job_id': 'J4', 'submit_time': 5, 'start_time': 12, 'finish_time': 16}
]
calculate_metrics(schedule)
9. 作业管理的挑战与解决方案
9.1 资源竞争与死锁
问题:多个作业竞争有限资源可能导致死锁。
解决方案:
- 资源预留:作业提交时声明所需资源,调度器确保资源可用才调度
- 死锁检测与恢复:定期检测死锁,通过终止进程或回滚解决
- 银行家算法:在分配资源前检查系统是否仍处于安全状态
// 简化的银行家算法示例
#define MAX_RESOURCES 5
#define MAX_JOBS 3
int available[MAX_RESOURCES] = {3, 3, 2};
int max_demand[MAX_JOBS][MAX_RESOURCES] = {
{7, 5, 3},
{3, 2, 2},
{9, 0, 2}
};
int allocation[MAX_JOBS][MAX_RESOURCES] = {0};
int need[MAX_JOBS][MAX_RESOURCES];
void init_need() {
for (int i = 0; i < MAX_JOBS; i++) {
for (int j = 0; j < MAX_RESOURCES; j++) {
need[i][j] = max_demand[i][j] - allocation[i][j];
}
}
}
int is_safe() {
int work[MAX_RESOURCES];
int finish[MAX_JOBS] = {0};
for (int i = 0; i < MAX_RESOURCES; i++) {
work[i] = available[i];
}
int safe_seq[MAX_JOBS];
int count = 0;
while (count < MAX_JOBS) {
int found = 0;
for (int i = 0; i < MAX_JOBS; i++) {
if (finish[i] == 0) {
int can_allocate = 1;
for (int j = 0; j < MAX_RESOURCES; j++) {
if (need[i][j] > work[j]) {
can_allocate = 0;
break;
}
}
if (can_allocate) {
// 分配资源并模拟完成
for (int j = 0; j < MAX_RESOURCES; j++) {
work[j] += allocation[i][j];
}
safe_seq[count++] = i;
finish[i] = 1;
found = 1;
}
}
}
if (found == 0) {
printf("系统不安全\n");
return 0;
}
}
printf("系统安全,安全序列: ");
for (int i = 0; i < MAX_JOBS; i++) {
printf("J%d ", safe_seq[i]);
}
printf("\n");
return 1;
}
// 检查资源请求
int request_resources(int job_id, int request[]) {
for (int i = 0; i < MAX_RESOURCES; i++) {
if (request[i] > need[job_id][i]) {
printf("错误:请求超过最大需求\n");
return -1;
}
if (request[i] > available[i]) {
printf("错误:资源不足\n");
return -1;
}
}
// 临时分配
for (int i = 0; i < MAX_RESOURCES; i++) {
available[i] -= request[i];
allocation[job_id][i] += request[i];
need[job_id][i] -= request[i];
}
// 检查安全性
if (is_safe()) {
printf("请求批准\n");
return 0;
} else {
// 回滚
for (int i = 0; i < MAX_RESOURCES; i++) {
available[i] += request[i];
allocation[job_id][i] -= request[i];
need[job_id][i] += request[i];
}
printf("请求拒绝,系统将进入不安全状态\n");
return -1;
}
}
9.2 作业饿死(Starvation)
问题:低优先级作业长期得不到执行。
解决方案:
- 优先级老化(Aging):随时间增加等待作业的优先级
- 公平调度:确保每个作业都能获得CPU时间
- 多级反馈队列:动态调整优先级
// 优先级老化示例
#define MAX_PRIORITY 100
#define MIN_PRIORITY 0
typedef struct {
int job_id;
int current_priority;
int wait_time;
int run_time;
} Job;
void aging(Job* jobs, int count) {
for (int i = 0; i < count; i++) {
if (jobs[i].wait_time > 0) {
// 每等待1个时间单位,优先级提升1
jobs[i].current_priority =
MAX_PRIORITY - jobs[i].wait_time;
if (jobs[i].current_priority < MIN_PRIORITY) {
jobs[i].current_priority = MIN_PRIORITY;
}
}
}
}
9.3 资源碎片化
问题:内存碎片导致大作业无法分配。
解决方案:
- 动态分区分配:使用首次适应、最佳适应等算法
- 紧凑技术:移动内存中的作业减少碎片
- 虚拟内存:使用分页机制避免外部碎片
10. 作业管理的未来发展趋势
10.1 云计算环境下的作业管理
现代云平台(如AWS Batch、Azure Batch、Google Cloud Dataflow)提供了强大的作业管理能力:
- 弹性伸缩:根据作业负载自动调整资源
- 容器化:使用Docker/Kubernetes打包作业环境
- 无服务器计算:作业以函数形式执行,无需管理服务器
# Kubernetes Job 示例
apiVersion: batch/v1
kind: Job
metadata:
name: data-processing-job
spec:
parallelism: 3 # 并行执行3个实例
completions: 10 # 总共需要完成10次
template:
spec:
containers:
- name: processor
image: my-processor:latest
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
restartPolicy: OnFailure
10.2 AI驱动的智能调度
机器学习被用于预测作业行为,优化调度决策:
- 作业运行时间预测:基于历史数据预测新作业的运行时间
- 资源需求预测:预测作业的CPU、内存、I/O需求
- 异常检测:识别异常作业行为,提前干预
10.3 边缘计算中的作业管理
在边缘计算环境中,作业管理需要考虑:
- 异构资源:不同边缘节点的计算能力差异
- 网络延迟:作业迁移和数据传输的成本
- 隐私保护:敏感数据的本地处理
11. 实际案例分析:HPC集群作业管理
11.1 Slurm工作负载管理器
Slurm是高性能计算(HPC)领域广泛使用的作业调度器:
# 提交作业脚本
#!/bin/bash
#SBATCH --job-name=mpi_job
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=8
#SBATCH --time=00:30:00
#SBATCH --partition=compute
module load openmpi/4.0.5
mpirun -np 32 ./my_mpi_program
# 作业管理命令
sbatch job_script.sh # 提交作业
squeue # 查看作业队列
scontrol show job 12345 # 查看作业详情
scancel 12345 # 取消作业
sacct # 查看作业历史
11.2 作业调度器的实现架构
// 简化的作业调度器架构
typedef struct {
int job_id;
char user[32];
int priority;
int nodes;
int tasks_per_node;
int time_limit;
char state[16]; // PENDING, RUNNING, COMPLETED, FAILED
} SlurmJob;
typedef struct {
SlurmJob* jobs;
int job_count;
int* node_states; // 0=空闲, 1=占用
int node_count;
} SchedulerState;
void schedule_loop(SchedulerState* state) {
while (1) {
// 1. 检查新提交的作业
check_new_jobs(state);
// 2. 从等待队列中选择作业
for (int i = 0; i < state->job_count; i++) {
if (strcmp(state->jobs[i].state, "PENDING") == 0) {
// 检查资源是否足够
if (can_allocate(state, &state->jobs[i])) {
allocate_resources(state, &state->jobs[i]);
strcpy(state->jobs[i].state, "RUNNING");
launch_job(&state->jobs[i]);
}
}
}
// 3. 检查运行中的作业是否完成
check_completed_jobs(state);
sleep(1); // 每秒调度一次
}
}
12. 总结
操作系统的作业管理是一个复杂而精密的系统,它涵盖了从作业提交到完成的整个生命周期。通过作业控制块、调度算法、资源管理等机制,操作系统实现了对计算任务的高效、公平、安全的管理。
关键要点总结:
- 作业生命周期:提交 → 收容 → 执行 → 完成 → 善后
- 核心数据结构:JCB(作业控制块)、PCB(进程控制块)
- 调度算法:FCFS、SJF、优先级、HRRN、多级反馈队列
- 资源管理:内存分配、CPU调度、I/O管理
- 现代发展:云计算、容器化、AI调度、边缘计算
随着计算技术的不断发展,作业管理也在持续演进,从传统的批处理系统到现代的云原生环境,从单机调度到分布式协调,作业管理始终是操作系统的核心功能之一,为各种计算场景提供基础支撑。
