电力自动化通信入门:手把手教你用Python模拟IEC104协议的数据采集与遥控
2026/5/31 3:41:08 网站建设 项目流程

电力自动化通信实战:Python模拟IEC104协议的数据采集与遥控

在工业自动化领域,电力系统的远程监控与控制是保障电网稳定运行的关键技术。IEC 60870-5-104(简称IEC104)作为电力自动化系统中广泛采用的通信协议,实现了控制站与受控站之间的高效数据交互。本文将带您用Python构建一个完整的IEC104协议模拟环境,无需真实硬件设备即可深入理解协议核心机制。

1. IEC104协议基础与环境搭建

IEC104协议采用客户端/服务器架构,基于TCP/IP实现实时数据传输。控制站(客户端)负责发送控制命令和接收数据,而受控站(服务器)则执行命令并上报设备状态。协议支持两种传输模式:

  • 非平衡模式:仅控制站可发起通信
  • 平衡模式(常用):双方均可主动传输数据

开发环境配置

# 安装必要库 pip install socket struct time random

协议帧结构由APCI(控制信息)和ASDU(应用数据)组成。关键帧类型包括:

帧类型功能描述典型应用场景
I格式信息传输遥测、遥信数据上报
S格式确认帧接收序号确认
U格式控制帧启动/停止数据传输

2. TCP连接与基础控制帧实现

建立可靠通信的第一步是TCP连接管理。以下是Python实现的服务器端基础框架:

import socket class IEC104Server: def __init__(self, host='0.0.0.0', port=2404): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) print(f"Server listening on {host}:{port}") def handle_connection(self): conn, addr = self.server_socket.accept() print(f"Connected by {addr}") # 默认处于STOPDT状态 data_transmission_active = False while True: data = conn.recv(255) # APDU最大长度255字节 if not data: break # 解析APCI start_byte, length = data[0], data[1] if start_byte != 0x68: print("Invalid start byte") continue control_field = data[2:6] frame_type = control_field[0] & 0x03 # 处理U格式帧 if frame_type == 3: if control_field[0] == 0x07: # STARTDT激活 print("Received STARTDT activate") conn.send(bytes([0x68, 0x04, 0x0B, 0x00, 0x00, 0x00])) # STARTDT确认 data_transmission_active = True elif control_field[0] == 0x13: # TESTFR激活 print("Received TESTFR activate") conn.send(bytes([0x68, 0x04, 0x83, 0x00, 0x00, 0x00])) # TESTFR确认

注意:实际实现中需要添加超时处理和心跳机制,确保连接稳定性

3. 遥信数据采集与上报

单点遥信(SIQ)是电力系统中最基础的状态量,表示开关、断路器等设备的开合状态。其数据结构包含状态值和质量标志:

SIQ字节结构: bit 0: 状态值 (0=分/开, 1=合/关) bit 1-3: 保留 bit 4: 封锁标志 (BL) bit 5: 取代标志 (SB) bit 6: 刷新标志 (NT) bit 7: 有效标志 (IV)

实现周期性数据上报的服务器端代码:

def send_spontaneous_data(conn, io_address, value): # 构建ASDU type_id = 0x01 # 单点遥信 vsq = 0x01 # 单个信息对象 cot = 0x03 # 突发传输原因 asdu_addr = 0x0001 io_addr_bytes = io_address.to_bytes(3, 'little') # 构建SIQ (假设质量标志全正常) siq = value & 0x01 # 组装APDU apci = bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式,序号0 asdu = bytes([type_id, vsq, cot, asdu_addr & 0xFF, (asdu_addr >> 8) & 0xFF]) + io_addr_bytes + bytes([siq]) conn.send(apci + asdu) # 在服务器主循环中添加 if data_transmission_active and random.random() < 0.3: # 30%概率上报 io_address = random.randint(1, 100) state = random.randint(0, 1) send_spontaneous_data(conn, io_address, state)

客户端解析SIQ数据的示例:

def parse_siq(asdu): type_id = asdu[0] if type_id != 0x01: return None io_address = int.from_bytes(asdu[6:9], 'little') siq = asdu[9] state = siq & 0x01 iv = (siq >> 7) & 0x01 # 有效性标志 return { 'address': io_address, 'state': 'ON' if state else 'OFF', 'valid': iv == 0, 'timestamp': time.time() }

4. 遥控命令实现与安全机制

遥控命令(SCO)允许控制站改变受控站设备状态,典型应用包括断路器分合闸操作。IEC104支持两种控制模式:

  1. 直接执行模式:单步完成命令下发与执行
  2. 选择-执行模式:先选择验证,再确认执行

选择-执行模式实现

def handle_control_command(conn, data): type_id = data[6] if type_id != 0x2D: # 单命令C_SC_NA_1 return False cot = data[8] io_address = int.from_bytes(data[10:13], 'little') sco = data[13] # 解析控制命令 execute = (sco & 0x80) == 0 command = sco & 0x01 if cot == 0x06: # 选择命令 print(f"Select command for address {io_address}") # 返回确认帧(肯定确认) confirm_asdu = bytes([0x2D, 0x01, 0x07, 0x01, 0x00]) + data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True elif cot == 0x08: # 执行命令 print(f"Execute {'ON' if command else 'OFF'} for address {io_address}") # 更新设备状态并返回执行确认 confirm_asdu = bytes([0x2D, 0x01, 0x0A, 0x01, 0x00]) + data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True return False

安全提示:实际系统中应实现操作权限验证、防误操作闭锁等安全机制

客户端发送遥控命令的示例流程:

def send_control_command(conn, io_address, command, select_execute=True): # 构建ASDU type_id = 0x2D # 单命令 vsq = 0x01 # 单个信息对象 cot = 0x06 if select_execute else 0x08 # 选择/执行阶段 asdu_addr = 0x0001 io_addr_bytes = io_address.to_bytes(3, 'little') # 构建SCO (选择阶段设置最高位为1) sco = 0x80 if select_execute else 0x00 sco |= (command & 0x01) # 组装APDU apci = bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式,序号0 asdu = bytes([type_id, vsq, cot, asdu_addr & 0xFF, (asdu_addr >> 8) & 0xFF]) + io_addr_bytes + bytes([sco]) conn.send(apci + asdu) # 等待确认 response = conn.recv(255) if len(response) < 6: return False # 验证确认帧 return response[6] == type_id and response[8] == (cot + 1)

5. 高级功能与性能优化

完整的IEC104实现还需要考虑以下高级功能:

时钟同步机制

def handle_time_sync(conn, data): type_id = data[6] if type_id != 0x67: # 时钟同步命令C_CS_NA_1 return False # 解析时间 (CP56Time2a格式) milliseconds = int.from_bytes(data[10:12], 'little') minutes = data[12] & 0x3F hours = data[13] & 0x1F day = data[14] & 0x1F month = data[15] & 0x0F year = data[16] & 0x7F # 返回带时标的确认 current_time = time.time() time_bytes = convert_to_cp56time2a(current_time) confirm_asdu = bytes([0x67, 0x01, 0x07, 0x01, 0x00]) + time_bytes conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) + confirm_asdu) return True

性能优化技巧

  1. 使用缓冲区减少小包传输
  2. 实现帧序号自动管理
  3. 异步处理耗时操作
  4. 连接状态监控与自动恢复
class IEC104Buffer: def __init__(self): self.send_seq = 0 self.recv_seq = 0 self.send_buffer = [] def add_frame(self, apdu): self.send_buffer.append(apdu) self.send_seq = (self.send_seq + 1) % 32768 def get_ack_frames(self, ack_seq): # 返回已确认的帧 acked = [] while self.send_buffer and self.send_buffer[0]['seq'] <= ack_seq: acked.append(self.send_buffer.pop(0)) return acked

通过这个Python实现,我们构建了一个完整的IEC104协议模拟环境,涵盖了从基础连接建立到数据采集、遥控命令等核心功能。这种模拟方法不仅适用于学习协议原理,也可用于自动化测试系统的开发。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询