什么是MAC地址?基础概念解析

MAC地址(Media Access Control Address)是网络设备在数据链路层的唯一标识符,通常被称为物理地址或硬件地址。它由48位二进制数组成,通常以十六进制格式表示,如00:1A:2B:3C:4D:5E。MAC地址的前24位(前三个字节)由IEEE分配给设备制造商,称为OUI(Organizationally Unique Identifier),后24位由制造商自行分配。

MAC地址的结构和格式

MAC地址的标准格式如下:

  • 格式XX:XX:XX:XX:XX:XX(十六进制)
  • 长度:48位(6字节)
  • 示例00:1A:2B:3C:4D:5E
# Python示例:验证和解析MAC地址
import re

def validate_mac(mac_address):
    """验证MAC地址格式是否正确"""
    pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
    return bool(re.match(pattern, mac_address))

def parse_mac(mac_address):
    """解析MAC地址的OUI部分"""
    if validate_mac(mac_address):
        oui = mac_address[:8].upper()  # 前3字节
        return oui
    return None

# 测试示例
test_mac = "00:1A:2B:3C:4D:5E"
if validate_mac(test_mac):
    print(f"MAC地址 {test_mac} 格式有效")
    print(f"OUI部分: {parse_mac(test_mac)}")
else:
    print("MAC地址格式无效")

MAC地址学习原理的核心机制

交换机的工作原理:CAM表

交换机通过MAC地址学习机制建立和维护一个MAC地址表(也称为CAM表 - Content Addressable Memory)。这个表记录了MAC地址与交换机端口的映射关系。

MAC地址学习过程详解

当交换机接收到一个数据帧时,会执行以下步骤:

  1. 源MAC地址学习:检查帧的源MAC地址,并将其与接收端口关联
  2. 目的MAC地址查找:在CAM表中查找目的MAC地址
  3. 转发决策
    • 如果找到匹配项,从对应端口转发
    • 如果未找到,泛洪到所有其他端口(除接收端口)
    • 如果目的MAC是广播地址(FF:FF:FF:FF:FF:FF),泛洪
# 模拟交换机MAC地址学习过程
class Switch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}  # {mac: port}
        self.port_status = {}  # {port: active}
    
    def learn_mac(self, mac_address, port):
        """学习MAC地址"""
        if mac_address not in self.mac_table:
            print(f"[{self.name}] 学习新MAC: {mac_address} -> 端口 {port}")
        else:
            print(f"[{self.name}] 更新MAC: {mac_address} -> 端口 {port}")
        self.mac_table[mac_address] = port
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """转发数据帧"""
        print(f"\n[{self.name}] 接收帧: SRC={src_mac}, DST={dst_mac}, IN_PORT={in_port}")
        
        # 1. 学习源MAC
        self.learn_mac(src_mac, in_port)
        
        # 2. 处理目的MAC
        if dst_mac == "FF:FF:FF:FF:FF:FF":
            print(f"  -> 广播帧,泛洪到所有端口(除{in_port})")
            return [port for port in self.port_status if port != in_port]
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port == in_port:
                print(f"  -> 目的端口与源端口相同,丢弃")
                return []
            print(f"  -> 单播转发到端口 {out_port}")
            return [out_port]
        else:
            print(f"  -> 未知目的MAC,泛洪到所有端口(除{in_port})")
            return [port for port in self.port_status if port != in_port]

# 创建交换机并模拟通信
switch = Switch("CoreSwitch")
switch.port_status = {"eth0": True, "eth1": True, "eth2": True, "eth3": True}

# 模拟主机通信
print("=== 主机A(00:1A:2B:3C:4D:5E) 发送数据到主机B(00:1A:2B:3C:4D:5F) ===")
switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

print("\n=== 主机B回复主机A ===")
switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")

print("\n=== 主机A再次发送数据到主机B ===")
switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

MAC地址表的老化机制

为了防止MAC地址表被过时的条目占满,交换机使用老化机制:

  • 默认老化时间:300秒(5分钟)
  • 当MAC地址在老化时间内没有被刷新,条目会被删除
  • 这确保了网络拓扑变化时,表项能及时更新
# 带老化机制的交换机实现
import time
from collections import defaultdict

class AdvancedSwitch:
    def __init__(self, name, aging_time=300):
        self.name = name
        self.aging_time = aging_time
        self.mac_table = {}  # {mac: {'port': port, 'last_seen': timestamp}}
        self.port_status = {}
    
    def learn_mac(self, mac_address, port):
        """学习MAC地址并更新时间戳"""
        current_time = time.time()
        self.mac_table[mac_address] = {
            'port': port,
            'last_seen': current_time
        }
        print(f"[{self.name}] 学习/更新 MAC: {mac_address} -> 端口 {port}")
    
    def aging_process(self):
        """老化处理"""
        current_time = time.time()
        expired_macs = []
        for mac, info in self.mac_table.items():
            if current_time - info['last_seen'] > self.aging_time:
                expired_macs.append(mac)
        
        for mac in expired_macs:
            del self.mac_table[mac]
            print(f"[{self.name}] 老化删除 MAC: {mac}")
    
    def show_mac_table(self):
        """显示当前MAC地址表"""
        print(f"\n[{self.name}] 当前MAC地址表:")
        print("-" * 50)
        for mac, info in self.mac_table.items():
            age = time.time() - info['last_seen']
            print(f"MAC: {mac} -> Port: {info['port']} (Age: {age:.1f}s)")
        print("-" * 50)

# 模拟老化过程
print("=== MAC地址表老化演示 ===")
advanced_switch = AdvancedSwitch("CoreSwitch", aging_time=5)  # 设置5秒老化时间用于演示

# 学习MAC地址
advanced_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
advanced_switch.learn_mac("00:1A:2B:3C:4D:5F", "eth1")
advanced_switch.show_mac_table()

print("\n等待3秒...")
time.sleep(3)
advanced_switch.aging_process()
advanced_switch.show_mac_table()

print("\n刷新MAC地址A...")
advanced_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
advanced_switch.show_mac_table()

print("\n等待3秒...")
time.sleep(3)
advanced_switch.aging_process()
advanced_switch.show_mac_table()

VLAN环境中的MAC地址学习

在VLAN环境中,MAC地址学习是基于VLAN的。每个VLAN维护独立的MAC地址表,确保不同VLAN之间的隔离。

VLAN标签的作用

  • 802.1Q标准:在以太网帧中添加4字节的VLAN标签
  • VLAN ID:12位,范围1-4094
  • 优先级:3位,用于QoS
# VLAN环境中的MAC地址学习模拟
class VLANSwitch:
    def __init__(self, name):
        self.name = name
        self.vlan_mac_tables = defaultdict(dict)  # {vlan_id: {mac: port}}
        self.vlan_port_membership = defaultdict(set)  # {vlan_id: set(ports)}
    
    def add_port_to_vlan(self, port, vlan_id):
        """将端口加入VLAN"""
        self.vlan_port_membership[vlan_id].add(port)
        print(f"[{self.name}] 端口 {port} 加入 VLAN {vlan_id}")
    
    def learn_mac(self, mac_address, port, vlan_id):
        """在指定VLAN中学习MAC地址"""
        if port not in self.vlan_port_membership[vlan_id]:
            print(f"[{self.name}] 错误: 端口 {port} 不在 VLAN {vlan_id} 中")
            return
        
        self.vlan_mac_tables[vlan_id][mac_address] = port
        print(f"[{self.name}] VLAN {vlan_id}: 学习 MAC {mac_address} -> 端口 {port}")
    
    def forward_frame(self, src_mac, dst_mac, in_port, vlan_id):
        """在VLAN内转发帧"""
        print(f"\n[{self.name}] VLAN {vlan_id}: 帧 SRC={src_mac}, DST={dst_mac}, IN_PORT={in_port}")
        
        # 学习源MAC
        self.learn_mac(src_mac, in_port, vlan_id)
        
        # 转发逻辑
        if dst_mac == "FF:FF:FF:FF:FF:FF":
            # 广播:发送到同VLAN的所有其他端口
            out_ports = [p for p in self.vlan_port_membership[vlan_id] 
                        if p != in_port]
            print(f"  -> 广播到VLAN {vlan_id} 端口: {out_ports}")
            return out_ports
        
        if dst_mac in self.vlan_mac_tables[vlan_id]:
            out_port = self.vlan_mac_tables[vlan_id][dst_mac]
            if out_port == in_port:
                print(f"  -> 端口相同,丢弃")
                return []
            print(f"  -> 单播转发到端口 {out_port}")
            return [out_port]
        else:
            out_ports = [p for p in self.vlan_port_membership[vlan_id] 
                        if p != in_port]
            print(f"  -> 未知目的MAC,泛洪到VLAN {vlan_id} 端口: {out_ports}")
            return out_ports
    
    def show_vlan_tables(self):
        """显示所有VLAN的MAC地址表"""
        print(f"\n[{self.name}] VLAN MAC地址表:")
        for vlan_id, mac_table in self.vlan_mac_tables.items():
            print(f"  VLAN {vlan_id}:")
            for mac, port in mac_table.items():
                print(f"    {mac} -> {port}")

# VLAN配置示例
print("=== VLAN环境MAC地址学习演示 ===")
vlan_switch = VLANSwitch("VLAN-Switch")

# 配置VLAN
vlan_switch.add_port_to_vlan("eth0", 10)
vlan_switch.add_port_to_vlan("eth1", 10)
vlan_switch.add_port_to_vlan("eth2", 20)
vlan_switch.add_port_to_vlan("eth3", 20)

# VLAN 10 内通信
vlan_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0", 10)
vlan_switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1", 10)

# VLAN 20 内通信
vlan_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:61", "eth2", 20)

# 跨VLAN通信(应该被隔离)
vlan_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:60", "eth0", 10)

vlan_switch.show_vlan_tables()

高级MAC地址学习技术

1. 端口安全(Port Security)

端口安全限制端口上允许学习的MAC地址数量,防止MAC地址泛洪攻击。

# 端口安全实现
class PortSecuritySwitch:
    def __init__(self, name, max_macs_per_port=3):
        self.name = name
        self.max_macs_per_port = max_macs_per_port
        self.port_security = defaultdict(set)  # {port: set(macs)}
        self.mac_table = {}
    
    def learn_mac(self, mac_address, port):
        """带端口安全的MAC地址学习"""
        # 检查端口安全限制
        if len(self.port_security[port]) >= self.max_macs_per_port:
            if mac_address not in self.port_security[port]:
                print(f"[{self.name}] 端口 {port} 达到MAC地址上限 {self.max_macs_per_port},拒绝学习 {mac_address}")
                return False
        
        self.port_security[port].add(mac_address)
        self.mac_table[mac_address] = port
        print(f"[{self.name}] 端口 {port} 学习 MAC: {mac_address}")
        return True
    
    def show_port_security(self):
        """显示端口安全状态"""
        print(f"\n[{self.name}] 端口安全状态:")
        for port, macs in self.port_security.items():
            print(f"  端口 {port}: {len(macs)}/{self.max_macs_per_port} MAC地址")
            for mac in macs:
                print(f"    - {mac}")

# 端口安全演示
print("=== 端口安全演示 ===")
secure_switch = PortSecuritySwitch("SecureSwitch", max_macs_per_port=2)

# 正常学习
secure_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
secure_switch.learn_mac("00:1A:2B:3C:4D:5F", "eth0")
secure_switch.learn_mac("00:1A:2B:3C:4D:60", "eth0")  # 应该被拒绝
secure_switch.learn_mac("00:1A:2B:3C:4D:61", "eth1")  # 不同端口,应该允许

secure_switch.show_port_security()

2. MAC地址过滤

基于MAC地址的访问控制列表(ACL)可以允许或拒绝特定MAC地址的通信。

# MAC地址过滤实现
class MACFilterSwitch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}
        self.blacklist = set()  # 黑名单
        self.whitelist = set()  # 白名单
        self.use_whitelist = False  # 是否启用白名单模式
    
    def add_blacklist(self, mac_address):
        """添加到黑名单"""
        self.blacklist.add(mac_address.upper())
        print(f"[{self.name}] 添加黑名单: {mac_address}")
    
    def add_whitelist(self, mac_address):
        """添加到白名单"""
        self.whitelist.add(mac_address.upper())
        print(f"[{self.name}] 添加白名单: {mac_address}")
    
    def enable_whitelist_mode(self):
        """启用白名单模式"""
        self.use_whitelist = True
        print(f"[{self.name}] 启用白名单模式")
    
    def is_allowed(self, mac_address):
        """检查MAC地址是否被允许"""
        mac = mac_address.upper()
        
        if self.use_whitelist:
            return mac in self.whitelist
        
        return mac not in self.blacklist
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """带MAC过滤的帧转发"""
        print(f"\n[{self.name}] 帧 SRC={src_mac}, DST={dst_mac}")
        
        # 检查源MAC
        if not self.is_allowed(src_mac):
            print(f"  -> 源MAC {src_mac} 被拒绝")
            return []
        
        # 检查目的MAC
        if not self.is_allowed(dst_mac):
            print(f"  -> 目的MAC {dst_mac} 被拒绝")
            return []
        
        # 正常转发逻辑
        self.mac_table[src_mac] = in_port
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port != in_port:
                print(f"  -> 转发到端口 {out_port}")
                return [out_port]
        
        print(f"  -> 泛洪")
        return ["eth1", "eth2", "eth3"]  # 简化:泛洪到其他端口

# MAC过滤演示
print("=== MAC地址过滤演示 ===")
filter_switch = MACFilterSwitch("FilterSwitch")

# 配置黑名单
filter_switch.add_blacklist("00:1A:2B:3C:4D:99")

# 测试黑名单
filter_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:99", "eth0")
filter_switch.forward_frame("00:1A:2B:3C:4D:99", "00:1A:2B:3C:4D:5E", "eth0")

# 配置白名单
filter_switch.add_whitelist("00:1A:2B:3C:4D:5E")
filter_switch.add_whitelist("00:1A:2B:3C:4D:5F")
filter_switch.enable_whitelist_mode()

# 测试白名单
filter_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
filter_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:5F", "eth0")

实际网络设备中的MAC地址学习

真实交换机的CAM表结构

真实交换机的CAM表通常包含以下字段:

  • MAC地址:48位硬件地址
  • 端口号:物理端口或端口组
  • VLAN ID:所属VLAN
  • 类型:动态/静态
  • 老化时间:剩余存活时间

Cisco交换机命令示例

# 查看MAC地址表
show mac address-table

# 查看特定VLAN的MAC地址表
show mac address-table vlan 10

# 查看特定MAC地址
show mac address-table address 001a.2b3c.4d5e

# 静态添加MAC地址表项
mac address-table static 001a.2b3c.4d5e vlan 10 interface gigabitethernet0/1

# 设置老化时间
mac address-table aging-time 300

# 清除MAC地址表
clear mac address-table dynamic

Linux bridge的MAC地址学习

Linux bridge使用内核模块实现MAC地址学习:

# 创建bridge
sudo brctl addbr br0

# 添加接口到bridge
sudo brctl addif br0 eth0
sudo brctl addif br0 eth1

# 查看bridge的MAC地址表
bridge fdb show br br0

# 查看bridge信息
brctl show br0

# 设置老化时间(秒)
echo 300 > /sys/class/net/br0/bridge/ageing_time

# 静态添加MAC地址条目
bridge fdb add 00:1a:2b:3c:4d:5e dev eth0 self permanent

Python模拟真实交换机行为

# 真实交换机行为模拟器
class RealisticSwitch:
    def __init__(self, name, max_macs=8192, aging_time=300):
        self.name = name
        self.max_macs = max_macs
        self.aging_time = aging_time
        self.mac_table = {}
        self.stats = {
            'learned': 0,
            'forwarded': 0,
            'flooded': 0,
            'dropped': 0
        }
    
    def forward_frame(self, src_mac, dst_mac, in_port, vlan=1):
        """模拟真实交换机的完整转发流程"""
        current_time = time.time()
        
        # 1. 检查MAC表是否已满
        if len(self.mac_table) >= self.max_macs:
            # 交换机通常会拒绝学习新MAC或删除最老的条目
            if src_mac not in self.mac_table:
                print(f"[{self.name}] MAC表已满,无法学习新MAC: {src_mac}")
                self.stats['dropped'] += 1
                return []
        
        # 2. 学习源MAC
        if src_mac not in self.mac_table:
            self.stats['learned'] += 1
        
        self.mac_table[src_mac] = {
            'port': in_port,
            'vlan': vlan,
            'last_seen': current_time
        }
        
        # 3. 处理目的MAC
        # 广播/组播处理
        if dst_mac.startswith("01:00:5E") or dst_mac == "FF:FF:FF:FF:FF:FF":
            self.stats['flooded'] += 1
            return [p for p in self.port_status if p != in_port]
        
        # 单播处理
        if dst_mac in self.mac_table:
            dst_info = self.mac_table[dst_mac]
            # 检查VLAN匹配
            if dst_info['vlan'] != vlan:
                print(f"[{self.name}] VLAN隔离: {src_mac}({vlan}) -> {dst_mac}({dst_info['vlan']})")
                self.stats['dropped'] += 1
                return []
            
            out_port = dst_info['port']
            if out_port == in_port:
                # 端口隔离或环路保护
                print(f"[{self.name}] 端口相同,丢弃")
                self.stats['dropped'] += 1
                return []
            
            self.stats['forwarded'] += 1
            return [out_port]
        else:
            # 未知单播泛洪
            self.stats['flooded'] += 1
            return [p for p in self.port_status if p != in_port]
    
    def periodic_aging(self):
        """定期老化处理"""
        current_time = time.time()
        expired = []
        for mac, info in self.mac_table.items():
            if current_time - info['last_seen'] > self.aging_time:
                expired.append(mac)
        
        for mac in expired:
            del self.mac_table[mac]
        
        return len(expired)
    
    def show_status(self):
        """显示交换机状态"""
        print(f"\n[{self.name}] 状态统计:")
        print(f"  MAC表大小: {len(self.mac_table)}/{self.max_macs}")
        print(f"  学习: {self.stats['learned']}")
        print(f"  转发: {self.stats['forwarded']}")
        print(f"  泛洪: {self.stats['flooded']}")
        print(f"  丢弃: {self.stats['dropped']}")

# 真实场景模拟
print("=== 真实交换机场景模拟 ===")
real_switch = RealisticSwitch("RealSwitch", max_macs=5)
real_switch.port_status = {"eth0": True, "eth1": True, "eth2": True}

# 模拟正常通信
real_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
real_switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")
real_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

# 模拟MAC表满
real_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:61", "eth2")
real_switch.forward_frame("00:1A:2B:3C:4D:62", "00:1A:2B:3C:4D:63", "eth2")
real_switch.forward_frame("00:1A:2B:3C:4D:64", "00:1A:2B:3C:4D:65", "eth2")  # 应该失败

real_switch.show_status()

MAC地址学习的安全考虑

1. MAC地址欺骗攻击

攻击者可以伪造源MAC地址来:

  • 绕过MAC地址过滤
  • 进行中间人攻击
  • 耗尽交换机的MAC表空间
# MAC地址欺骗检测示例
class SecureSwitch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}  # {mac: {'port': port, 'timestamp': ts}}
        self.port_mac_history = defaultdict(set)  # {port: set(macs)}
        self.suspicious_ports = set()
    
    def detect_spoofing(self, mac_address, port):
        """检测MAC地址欺骗"""
        # 检查该端口是否已经学习过不同的MAC地址
        if port in self.port_mac_history:
            learned_macs = self.port_mac_history[port]
            if len(learned_macs) > 0 and mac_address not in learned_macs:
                # 同一端口出现多个MAC地址,可能欺骗
                print(f"[{self.name}] 警告: 端口 {port} 出现MAC地址变化!")
                print(f"  历史MAC: {learned_macs}")
                print(f"  新MAC: {mac_address}")
                self.suspicious_ports.add(port)
                return True
        
        self.port_mac_history[port].add(mac_address)
        return False
    
    def forward_frame_secure(self, src_mac, dst_mac, in_port):
        """安全的帧转发"""
        # 检测欺骗
        if self.detect_spoofing(src_mac, in_port):
            print(f"  -> 检测到MAC欺骗,丢弃帧")
            return []
        
        # 正常转发
        self.mac_table[src_mac] = {'port': in_port, 'timestamp': time.time()}
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]['port']
            if out_port != in_port:
                return [out_port]
        
        return [p for p in ["eth0", "eth1", "eth2"] if p != in_port]

# 欺骗检测演示
print("=== MAC地址欺骗检测演示 ===")
secure_switch = SecureSwitch("SecureSwitch")

# 正常通信
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")

# 欺骗尝试
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:99", "00:1A:2B:3C:4D:5E", "eth0")

2. MAC地址泛洪攻击防护

MAC地址泛洪攻击通过发送大量伪造的源MAC地址来耗尽交换机的MAC表,导致交换机退化为集线器模式。

# MAC地址泛洪防护
class AntiFloodSwitch:
    def __init__(self, name, max_macs_per_port=10):
        self.name = name
        self.max_macs_per_port = max_macs_per_port
        self.port_macs = defaultdict(set)
        self.mac_table = {}
    
    def learn_mac(self, mac_address, port):
        """防止MAC地址泛洪"""
        # 检查端口MAC地址数量限制
        if len(self.port_macs[port]) >= self.max_macs_per_port:
            if mac_address not in self.port_macs[port]:
                print(f"[{self.name}] 端口 {port} MAC地址数量超限,拒绝学习 {mac_address}")
                return False
        
        self.port_macs[port].add(mac_address)
        self.mac_table[mac_address] = port
        return True
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """防泛洪的帧转发"""
        # 学习源MAC(带防护)
        if not self.learn_mac(src_mac, in_port):
            # 拒绝学习,但仍尝试转发(可能已存在)
            pass
        
        # 正常转发逻辑
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port != in_port:
                return [out_port]
        
        return [p for p in ["eth0", "eth1", "eth2"] if p != in_port]

# 防泛洪演示
print("=== MAC地址泛洪防护演示 ===")
anti_flood = AntiFloodSwitch("AntiFloodSwitch", max_macs_per_port=3)

# 模拟泛洪攻击
for i in range(10):
    fake_mac = f"00:1A:2B:3C:4D:{i:02x}"
    print(f"尝试学习 MAC: {fake_mac}")
    anti_flood.learn_mac(fake_mac, "eth0")

print(f"\n端口 eth0 学习到的MAC地址数量: {len(anti_flood.port_macs['eth0'])}")

总结

MAC地址学习是现代网络设备的核心功能,它确保了数据帧在数据链路层的正确转发。通过理解以下关键点,您可以从入门到精通掌握这一机制:

  1. 基础原理:交换机通过源MAC地址学习建立CAM表,通过目的MAC地址查找进行转发决策
  2. 老化机制:定期清理过时条目,适应网络拓扑变化
  3. VLAN隔离:每个VLAN维护独立的MAC地址表
  4. 安全防护:端口安全、MAC过滤、欺骗检测和泛洪防护
  5. 实际应用:从真实设备命令到软件模拟,全面理解实现细节

掌握MAC地址学习原理对于网络故障排查、安全防护和性能优化都至关重要。在实际网络环境中,合理配置相关参数可以显著提升网络的安全性和稳定性。# MAC地址学习原理揭秘:从入门到精通详解网络设备如何识别设备地址并实现高效通信

什么是MAC地址?基础概念解析

MAC地址(Media Access Control Address)是网络设备在数据链路层的唯一标识符,通常被称为物理地址或硬件地址。它由48位二进制数组成,通常以十六进制格式表示,如00:1A:2B:3C:4D:5E。MAC地址的前24位(前三个字节)由IEEE分配给设备制造商,称为OUI(Organizationally Unique Identifier),后24位由制造商自行分配。

MAC地址的结构和格式

MAC地址的标准格式如下:

  • 格式XX:XX:XX:XX:XX:XX(十六进制)
  • 长度:48位(6字节)
  • 示例00:1A:2B:3C:4D:5E
# Python示例:验证和解析MAC地址
import re

def validate_mac(mac_address):
    """验证MAC地址格式是否正确"""
    pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
    return bool(re.match(pattern, mac_address))

def parse_mac(mac_address):
    """解析MAC地址的OUI部分"""
    if validate_mac(mac_address):
        oui = mac_address[:8].upper()  # 前3字节
        return oui
    return None

# 测试示例
test_mac = "00:1A:2B:3C:4D:5E"
if validate_mac(test_mac):
    print(f"MAC地址 {test_mac} 格式有效")
    print(f"OUI部分: {parse_mac(test_mac)}")
else:
    print("MAC地址格式无效")

MAC地址学习原理的核心机制

交换机的工作原理:CAM表

交换机通过MAC地址学习机制建立和维护一个MAC地址表(也称为CAM表 - Content Addressable Memory)。这个表记录了MAC地址与交换机端口的映射关系。

MAC地址学习过程详解

当交换机接收到一个数据帧时,会执行以下步骤:

  1. 源MAC地址学习:检查帧的源MAC地址,并将其与接收端口关联
  2. 目的MAC地址查找:在CAM表中查找目的MAC地址
  3. 转发决策
    • 如果找到匹配项,从对应端口转发
    • 如果未找到,泛洪到所有其他端口(除接收端口)
    • 如果目的MAC是广播地址(FF:FF:FF:FF:FF:FF),泛洪
# 模拟交换机MAC地址学习过程
class Switch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}  # {mac: port}
        self.port_status = {}  # {port: active}
    
    def learn_mac(self, mac_address, port):
        """学习MAC地址"""
        if mac_address not in self.mac_table:
            print(f"[{self.name}] 学习新MAC: {mac_address} -> 端口 {port}")
        else:
            print(f"[{self.name}] 更新MAC: {mac_address} -> 端口 {port}")
        self.mac_table[mac_address] = port
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """转发数据帧"""
        print(f"\n[{self.name}] 接收帧: SRC={src_mac}, DST={dst_mac}, IN_PORT={in_port}")
        
        # 1. 学习源MAC
        self.learn_mac(src_mac, in_port)
        
        # 2. 处理目的MAC
        if dst_mac == "FF:FF:FF:FF:FF:FF":
            print(f"  -> 广播帧,泛洪到所有端口(除{in_port})")
            return [port for port in self.port_status if port != in_port]
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port == in_port:
                print(f"  -> 目的端口与源端口相同,丢弃")
                return []
            print(f"  -> 单播转发到端口 {out_port}")
            return [out_port]
        else:
            print(f"  -> 未知目的MAC,泛洪到所有端口(除{in_port})")
            return [port for port in self.port_status if port != in_port]

# 创建交换机并模拟通信
switch = Switch("CoreSwitch")
switch.port_status = {"eth0": True, "eth1": True, "eth2": True, "eth3": True}

# 模拟主机通信
print("=== 主机A(00:1A:2B:3C:4D:5E) 发送数据到主机B(00:1A:2B:3C:4D:5F) ===")
switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

print("\n=== 主机B回复主机A ===")
switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")

print("\n=== 主机A再次发送数据到主机B ===")
switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

MAC地址表的老化机制

为了防止MAC地址表被过时的条目占满,交换机使用老化机制:

  • 默认老化时间:300秒(5分钟)
  • 当MAC地址在老化时间内没有被刷新,条目会被删除
  • 这确保了网络拓扑变化时,表项能及时更新
# 带老化机制的交换机实现
import time
from collections import defaultdict

class AdvancedSwitch:
    def __init__(self, name, aging_time=300):
        self.name = name
        self.aging_time = aging_time
        self.mac_table = {}  # {mac: {'port': port, 'last_seen': timestamp}}
        self.port_status = {}
    
    def learn_mac(self, mac_address, port):
        """学习MAC地址并更新时间戳"""
        current_time = time.time()
        self.mac_table[mac_address] = {
            'port': port,
            'last_seen': current_time
        }
        print(f"[{self.name}] 学习/更新 MAC: {mac_address} -> 端口 {port}")
    
    def aging_process(self):
        """老化处理"""
        current_time = time.time()
        expired_macs = []
        for mac, info in self.mac_table.items():
            if current_time - info['last_seen'] > self.aging_time:
                expired_macs.append(mac)
        
        for mac in expired_macs:
            del self.mac_table[mac]
            print(f"[{self.name}] 老化删除 MAC: {mac}")
    
    def show_mac_table(self):
        """显示当前MAC地址表"""
        print(f"\n[{self.name}] 当前MAC地址表:")
        print("-" * 50)
        for mac, info in self.mac_table.items():
            age = time.time() - info['last_seen']
            print(f"MAC: {mac} -> Port: {info['port']} (Age: {age:.1f}s)")
        print("-" * 50)

# 模拟老化过程
print("=== MAC地址表老化演示 ===")
advanced_switch = AdvancedSwitch("CoreSwitch", aging_time=5)  # 设置5秒老化时间用于演示

# 学习MAC地址
advanced_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
advanced_switch.learn_mac("00:1A:2B:3C:4D:5F", "eth1")
advanced_switch.show_mac_table()

print("\n等待3秒...")
time.sleep(3)
advanced_switch.aging_process()
advanced_switch.show_mac_table()

print("\n刷新MAC地址A...")
advanced_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
advanced_switch.show_mac_table()

print("\n等待3秒...")
time.sleep(3)
advanced_switch.aging_process()
advanced_switch.show_mac_table()

VLAN环境中的MAC地址学习

在VLAN环境中,MAC地址学习是基于VLAN的。每个VLAN维护独立的MAC地址表,确保不同VLAN之间的隔离。

VLAN标签的作用

  • 802.1Q标准:在以太网帧中添加4字节的VLAN标签
  • VLAN ID:12位,范围1-4094
  • 优先级:3位,用于QoS
# VLAN环境中的MAC地址学习模拟
class VLANSwitch:
    def __init__(self, name):
        self.name = name
        self.vlan_mac_tables = defaultdict(dict)  # {vlan_id: {mac: port}}
        self.vlan_port_membership = defaultdict(set)  # {vlan_id: set(ports)}
    
    def add_port_to_vlan(self, port, vlan_id):
        """将端口加入VLAN"""
        self.vlan_port_membership[vlan_id].add(port)
        print(f"[{self.name}] 端口 {port} 加入 VLAN {vlan_id}")
    
    def learn_mac(self, mac_address, port, vlan_id):
        """在指定VLAN中学习MAC地址"""
        if port not in self.vlan_port_membership[vlan_id]:
            print(f"[{self.name}] 错误: 端口 {port} 不在 VLAN {vlan_id} 中")
            return
        
        self.vlan_mac_tables[vlan_id][mac_address] = port
        print(f"[{self.name}] VLAN {vlan_id}: 学习 MAC {mac_address} -> 端口 {port}")
    
    def forward_frame(self, src_mac, dst_mac, in_port, vlan_id):
        """在VLAN内转发帧"""
        print(f"\n[{self.name}] VLAN {vlan_id}: 帧 SRC={src_mac}, DST={dst_mac}, IN_PORT={in_port}")
        
        # 学习源MAC
        self.learn_mac(src_mac, in_port, vlan_id)
        
        # 转发逻辑
        if dst_mac == "FF:FF:FF:FF:FF:FF":
            # 广播:发送到同VLAN的所有其他端口
            out_ports = [p for p in self.vlan_port_membership[vlan_id] 
                        if p != in_port]
            print(f"  -> 广播到VLAN {vlan_id} 端口: {out_ports}")
            return out_ports
        
        if dst_mac in self.vlan_mac_tables[vlan_id]:
            out_port = self.vlan_mac_tables[vlan_id][dst_mac]
            if out_port == in_port:
                print(f"  -> 端口相同,丢弃")
                return []
            print(f"  -> 单播转发到端口 {out_port}")
            return [out_port]
        else:
            out_ports = [p for p in self.vlan_port_membership[vlan_id] 
                        if p != in_port]
            print(f"  -> 未知目的MAC,泛洪到VLAN {vlan_id} 端口: {out_ports}")
            return out_ports
    
    def show_vlan_tables(self):
        """显示所有VLAN的MAC地址表"""
        print(f"\n[{self.name}] VLAN MAC地址表:")
        for vlan_id, mac_table in self.vlan_mac_tables.items():
            print(f"  VLAN {vlan_id}:")
            for mac, port in mac_table.items():
                print(f"    {mac} -> {port}")

# VLAN配置示例
print("=== VLAN环境MAC地址学习演示 ===")
vlan_switch = VLANSwitch("VLAN-Switch")

# 配置VLAN
vlan_switch.add_port_to_vlan("eth0", 10)
vlan_switch.add_port_to_vlan("eth1", 10)
vlan_switch.add_port_to_vlan("eth2", 20)
vlan_switch.add_port_to_vlan("eth3", 20)

# VLAN 10 内通信
vlan_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0", 10)
vlan_switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1", 10)

# VLAN 20 内通信
vlan_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:61", "eth2", 20)

# 跨VLAN通信(应该被隔离)
vlan_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:60", "eth0", 10)

vlan_switch.show_vlan_tables()

高级MAC地址学习技术

1. 端口安全(Port Security)

端口安全限制端口上允许学习的MAC地址数量,防止MAC地址泛洪攻击。

# 端口安全实现
class PortSecuritySwitch:
    def __init__(self, name, max_macs_per_port=3):
        self.name = name
        self.max_macs_per_port = max_macs_per_port
        self.port_security = defaultdict(set)  # {port: set(macs)}
        self.mac_table = {}
    
    def learn_mac(self, mac_address, port):
        """带端口安全的MAC地址学习"""
        # 检查端口安全限制
        if len(self.port_security[port]) >= self.max_macs_per_port:
            if mac_address not in self.port_security[port]:
                print(f"[{self.name}] 端口 {port} 达到MAC地址上限 {self.max_macs_per_port},拒绝学习 {mac_address}")
                return False
        
        self.port_security[port].add(mac_address)
        self.mac_table[mac_address] = port
        print(f"[{self.name}] 端口 {port} 学习 MAC: {mac_address}")
        return True
    
    def show_port_security(self):
        """显示端口安全状态"""
        print(f"\n[{self.name}] 端口安全状态:")
        for port, macs in self.port_security.items():
            print(f"  端口 {port}: {len(macs)}/{self.max_macs_per_port} MAC地址")
            for mac in macs:
                print(f"    - {mac}")

# 端口安全演示
print("=== 端口安全演示 ===")
secure_switch = PortSecuritySwitch("SecureSwitch", max_macs_per_port=2)

# 正常学习
secure_switch.learn_mac("00:1A:2B:3C:4D:5E", "eth0")
secure_switch.learn_mac("00:1A:2B:3C:4D:5F", "eth0")
secure_switch.learn_mac("00:1A:2B:3C:4D:60", "eth0")  # 应该被拒绝
secure_switch.learn_mac("00:1A:2B:3C:4D:61", "eth1")  # 不同端口,应该允许

secure_switch.show_port_security()

2. MAC地址过滤

基于MAC地址的访问控制列表(ACL)可以允许或拒绝特定MAC地址的通信。

# MAC地址过滤实现
class MACFilterSwitch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}
        self.blacklist = set()  # 黑名单
        self.whitelist = set()  # 白名单
        self.use_whitelist = False  # 是否启用白名单模式
    
    def add_blacklist(self, mac_address):
        """添加到黑名单"""
        self.blacklist.add(mac_address.upper())
        print(f"[{self.name}] 添加黑名单: {mac_address}")
    
    def add_whitelist(self, mac_address):
        """添加到白名单"""
        self.whitelist.add(mac_address.upper())
        print(f"[{self.name}] 添加白名单: {mac_address}")
    
    def enable_whitelist_mode(self):
        """启用白名单模式"""
        self.use_whitelist = True
        print(f"[{self.name}] 启用白名单模式")
    
    def is_allowed(self, mac_address):
        """检查MAC地址是否被允许"""
        mac = mac_address.upper()
        
        if self.use_whitelist:
            return mac in self.whitelist
        
        return mac not in self.blacklist
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """带MAC过滤的帧转发"""
        print(f"\n[{self.name}] 帧 SRC={src_mac}, DST={dst_mac}")
        
        # 检查源MAC
        if not self.is_allowed(src_mac):
            print(f"  -> 源MAC {src_mac} 被拒绝")
            return []
        
        # 检查目的MAC
        if not self.is_allowed(dst_mac):
            print(f"  -> 目的MAC {dst_mac} 被拒绝")
            return []
        
        # 正常转发逻辑
        self.mac_table[src_mac] = in_port
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port != in_port:
                print(f"  -> 转发到端口 {out_port}")
                return [out_port]
        
        print(f"  -> 泛洪")
        return ["eth1", "eth2", "eth3"]  # 简化:泛洪到其他端口

# MAC过滤演示
print("=== MAC地址过滤演示 ===")
filter_switch = MACFilterSwitch("FilterSwitch")

# 配置黑名单
filter_switch.add_blacklist("00:1A:2B:3C:4D:99")

# 测试黑名单
filter_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:99", "eth0")
filter_switch.forward_frame("00:1A:2B:3C:4D:99", "00:1A:2B:3C:4D:5E", "eth0")

# 配置白名单
filter_switch.add_whitelist("00:1A:2B:3C:4D:5E")
filter_switch.add_whitelist("00:1A:2B:3C:4D:5F")
filter_switch.enable_whitelist_mode()

# 测试白名单
filter_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
filter_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:5F", "eth0")

实际网络设备中的MAC地址学习

真实交换机的CAM表结构

真实交换机的CAM表通常包含以下字段:

  • MAC地址:48位硬件地址
  • 端口号:物理端口或端口组
  • VLAN ID:所属VLAN
  • 类型:动态/静态
  • 老化时间:剩余存活时间

Cisco交换机命令示例

# 查看MAC地址表
show mac address-table

# 查看特定VLAN的MAC地址表
show mac address-table vlan 10

# 查看特定MAC地址
show mac address-table address 001a.2b3c.4d5e

# 静态添加MAC地址表项
mac address-table static 001a.2b3c.4d5e vlan 10 interface gigabitethernet0/1

# 设置老化时间
mac address-table aging-time 300

# 清除MAC地址表
clear mac address-table dynamic

Linux bridge的MAC地址学习

Linux bridge使用内核模块实现MAC地址学习:

# 创建bridge
sudo brctl addbr br0

# 添加接口到bridge
sudo brctl addif br0 eth0
sudo brctl addif br0 eth1

# 查看bridge的MAC地址表
bridge fdb show br br0

# 查看bridge信息
brctl show br0

# 设置老化时间(秒)
echo 300 > /sys/class/net/br0/bridge/ageing_time

# 静态添加MAC地址条目
bridge fdb add 00:1a:2b:3c:4d:5e dev eth0 self permanent

Python模拟真实交换机行为

# 真实交换机行为模拟器
class RealisticSwitch:
    def __init__(self, name, max_macs=8192, aging_time=300):
        self.name = name
        self.max_macs = max_macs
        self.aging_time = aging_time
        self.mac_table = {}
        self.stats = {
            'learned': 0,
            'forwarded': 0,
            'flooded': 0,
            'dropped': 0
        }
    
    def forward_frame(self, src_mac, dst_mac, in_port, vlan=1):
        """模拟真实交换机的完整转发流程"""
        current_time = time.time()
        
        # 1. 检查MAC表是否已满
        if len(self.mac_table) >= self.max_macs:
            # 交换机通常会拒绝学习新MAC或删除最老的条目
            if src_mac not in self.mac_table:
                print(f"[{self.name}] MAC表已满,无法学习新MAC: {src_mac}")
                self.stats['dropped'] += 1
                return []
        
        # 2. 学习源MAC
        if src_mac not in self.mac_table:
            self.stats['learned'] += 1
        
        self.mac_table[src_mac] = {
            'port': in_port,
            'vlan': vlan,
            'last_seen': current_time
        }
        
        # 3. 处理目的MAC
        # 广播/组播处理
        if dst_mac.startswith("01:00:5E") or dst_mac == "FF:FF:FF:FF:FF:FF":
            self.stats['flooded'] += 1
            return [p for p in self.port_status if p != in_port]
        
        # 单播处理
        if dst_mac in self.mac_table:
            dst_info = self.mac_table[dst_mac]
            # 检查VLAN匹配
            if dst_info['vlan'] != vlan:
                print(f"[{self.name}] VLAN隔离: {src_mac}({vlan}) -> {dst_mac}({dst_info['vlan']})")
                self.stats['dropped'] += 1
                return []
            
            out_port = dst_info['port']
            if out_port == in_port:
                # 端口隔离或环路保护
                print(f"[{self.name}] 端口相同,丢弃")
                self.stats['dropped'] += 1
                return []
            
            self.stats['forwarded'] += 1
            return [out_port]
        else:
            # 未知单播泛洪
            self.stats['flooded'] += 1
            return [p for p in self.port_status if p != in_port]
    
    def periodic_aging(self):
        """定期老化处理"""
        current_time = time.time()
        expired = []
        for mac, info in self.mac_table.items():
            if current_time - info['last_seen'] > self.aging_time:
                expired.append(mac)
        
        for mac in expired:
            del self.mac_table[mac]
        
        return len(expired)
    
    def show_status(self):
        """显示交换机状态"""
        print(f"\n[{self.name}] 状态统计:")
        print(f"  MAC表大小: {len(self.mac_table)}/{self.max_macs}")
        print(f"  学习: {self.stats['learned']}")
        print(f"  转发: {self.stats['forwarded']}")
        print(f"  泛洪: {self.stats['flooded']}")
        print(f"  丢弃: {self.stats['dropped']}")

# 真实场景模拟
print("=== 真实交换机场景模拟 ===")
real_switch = RealisticSwitch("RealSwitch", max_macs=5)
real_switch.port_status = {"eth0": True, "eth1": True, "eth2": True}

# 模拟正常通信
real_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
real_switch.forward_frame("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")
real_switch.forward_frame("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")

# 模拟MAC表满
real_switch.forward_frame("00:1A:2B:3C:4D:60", "00:1A:2B:3C:4D:61", "eth2")
real_switch.forward_frame("00:1A:2B:3C:4D:62", "00:1A:2B:3C:4D:63", "eth2")
real_switch.forward_frame("00:1A:2B:3C:4D:64", "00:1A:2B:3C:4D:65", "eth2")  # 应该失败

real_switch.show_status()

MAC地址学习的安全考虑

1. MAC地址欺骗攻击

攻击者可以伪造源MAC地址来:

  • 绕过MAC地址过滤
  • 进行中间人攻击
  • 耗尽交换机的MAC表空间
# MAC地址欺骗检测示例
class SecureSwitch:
    def __init__(self, name):
        self.name = name
        self.mac_table = {}  # {mac: {'port': port, 'timestamp': ts}}
        self.port_mac_history = defaultdict(set)  # {port: set(macs)}
        self.suspicious_ports = set()
    
    def detect_spoofing(self, mac_address, port):
        """检测MAC地址欺骗"""
        # 检查该端口是否已经学习过不同的MAC地址
        if port in self.port_mac_history:
            learned_macs = self.port_mac_history[port]
            if len(learned_macs) > 0 and mac_address not in learned_macs:
                # 同一端口出现多个MAC地址,可能欺骗
                print(f"[{self.name}] 警告: 端口 {port} 出现MAC地址变化!")
                print(f"  历史MAC: {learned_macs}")
                print(f"  新MAC: {mac_address}")
                self.suspicious_ports.add(port)
                return True
        
        self.port_mac_history[port].add(mac_address)
        return False
    
    def forward_frame_secure(self, src_mac, dst_mac, in_port):
        """安全的帧转发"""
        # 检测欺骗
        if self.detect_spoofing(src_mac, in_port):
            print(f"  -> 检测到MAC欺骗,丢弃帧")
            return []
        
        # 正常转发
        self.mac_table[src_mac] = {'port': in_port, 'timestamp': time.time()}
        
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]['port']
            if out_port != in_port:
                return [out_port]
        
        return [p for p in ["eth0", "eth1", "eth2"] if p != in_port]

# 欺骗检测演示
print("=== MAC地址欺骗检测演示 ===")
secure_switch = SecureSwitch("SecureSwitch")

# 正常通信
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:5E", "00:1A:2B:3C:4D:5F", "eth0")
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:5F", "00:1A:2B:3C:4D:5E", "eth1")

# 欺骗尝试
secure_switch.forward_frame_secure("00:1A:2B:3C:4D:99", "00:1A:2B:3C:4D:5E", "eth0")

2. MAC地址泛洪攻击防护

MAC地址泛洪攻击通过发送大量伪造的源MAC地址来耗尽交换机的MAC表,导致交换机退化为集线器模式。

# MAC地址泛洪防护
class AntiFloodSwitch:
    def __init__(self, name, max_macs_per_port=10):
        self.name = name
        self.max_macs_per_port = max_macs_per_port
        self.port_macs = defaultdict(set)
        self.mac_table = {}
    
    def learn_mac(self, mac_address, port):
        """防止MAC地址泛洪"""
        # 检查端口MAC地址数量限制
        if len(self.port_macs[port]) >= self.max_macs_per_port:
            if mac_address not in self.port_macs[port]:
                print(f"[{self.name}] 端口 {port} MAC地址数量超限,拒绝学习 {mac_address}")
                return False
        
        self.port_macs[port].add(mac_address)
        self.mac_table[mac_address] = port
        return True
    
    def forward_frame(self, src_mac, dst_mac, in_port):
        """防泛洪的帧转发"""
        # 学习源MAC(带防护)
        if not self.learn_mac(src_mac, in_port):
            # 拒绝学习,但仍尝试转发(可能已存在)
            pass
        
        # 正常转发逻辑
        if dst_mac in self.mac_table:
            out_port = self.mac_table[dst_mac]
            if out_port != in_port:
                return [out_port]
        
        return [p for p in ["eth0", "eth1", "eth2"] if p != in_port]

# 防泛洪演示
print("=== MAC地址泛洪防护演示 ===")
anti_flood = AntiFloodSwitch("AntiFloodSwitch", max_macs_per_port=3)

# 模拟泛洪攻击
for i in range(10):
    fake_mac = f"00:1A:2B:3C:4D:{i:02x}"
    print(f"尝试学习 MAC: {fake_mac}")
    anti_flood.learn_mac(fake_mac, "eth0")

print(f"\n端口 eth0 学习到的MAC地址数量: {len(anti_flood.port_macs['eth0'])}")

总结

MAC地址学习是现代网络设备的核心功能,它确保了数据帧在数据链路层的正确转发。通过理解以下关键点,您可以从入门到精通掌握这一机制:

  1. 基础原理:交换机通过源MAC地址学习建立CAM表,通过目的MAC地址查找进行转发决策
  2. 老化机制:定期清理过时条目,适应网络拓扑变化
  3. VLAN隔离:每个VLAN维护独立的MAC地址表
  4. 安全防护:端口安全、MAC过滤、欺骗检测和泛洪防护
  5. 实际应用:从真实设备命令到软件模拟,全面理解实现细节

掌握MAC地址学习原理对于网络故障排查、安全防护和性能优化都至关重要。在实际网络环境中,合理配置相关参数可以显著提升网络的安全性和稳定性。