ble_fota_tool.py 8.7 KB


  1. #!/usr/bin/env python3
  2. import asyncio
  3. import struct
  4. import time
  5. from bleak import BleakScanner, BleakClient
  6. # 完整UUID定义
  7. FOTA_SERVICE_UUID = "0000f000-0000-1000-8000-00805f9b34fb" # 完整服务UUID
  8. FOTA_CMD_CHAR_UUID = "0000f001-0000-1000-8000-00805f9b34fb" # 完整命令特征UUID
  9. FOTA_DATA_CHAR_UUID = "0000f002-0000-1000-8000-00805f9b34fb" # 完整数据特征UUID
  10. # Command definitions
  11. CMD_START_UPGRADE = 0x01
  12. CMD_END_UPGRADE = 0x02
  13. # 每包数据大小
  14. MAX_PACKET_SIZE = 200
  15. class SimpleFotaTool:
  16. def __init__(self, device_name, firmware_path):
  17. self.device_name = device_name
  18. self.firmware_path = firmware_path
  19. self.client = None
  20. self.firmware_data = None
  21. self.total_size = 0
  22. self.target_device = None
  23. async def load_firmware(self):
  24. """Load firmware file into memory"""
  25. try:
  26. with open(self.firmware_path, 'rb') as f:
  27. self.firmware_data = f.read()
  28. self.total_size = len(self.firmware_data)
  29. print(f" 固件加载完成,大小: {self.total_size} 字节")
  30. return True
  31. except Exception as e:
  32. print(f" 加载固件失败: {e}")
  33. return False
  34. async def scan_device(self):
  35. """Scan for the target device"""
  36. print("\n2. 扫描目标设备...")
  37. print(" 正在扫描,请等待...")
  38. try:
  39. devices = await BleakScanner.discover(timeout=10.0)
  40. found_devices = []
  41. for device in devices:
  42. if device.name and self.device_name in device.name:
  43. found_devices.append(device)
  44. print(f" 找到匹配设备: {device.name} (地址: {device.address})")
  45. if not found_devices:
  46. print(f" 未找到设备: {self.device_name}")
  47. return None
  48. # 选择第一个匹配的设备
  49. self.target_device = found_devices[0]
  50. print(f" 选择设备: {self.target_device.name} (地址: {self.target_device.address})")
  51. return self.target_device
  52. except Exception as e:
  53. print(f" 扫描失败: {e}")
  54. return None
  55. async def connect_device(self, device):
  56. """Connect to the target device"""
  57. print("\n3. 建立BLE连接...")
  58. try:
  59. self.client = BleakClient(device.address)
  60. await self.client.connect(timeout=30.0)
  61. print(f" 连接成功,状态: {self.client.is_connected}")
  62. # 调试:打印所有服务和特征值
  63. print("\n4. 发现服务和特征值...")
  64. # 兼容不同版本的Bleak库
  65. try:
  66. # 新版本Bleak
  67. services = self.client.services
  68. except AttributeError:
  69. # 旧版本Bleak
  70. services = await self.client.get_services()
  71. fota_service_found = False
  72. for service in services:
  73. if service.uuid.lower() == FOTA_SERVICE_UUID.lower():
  74. fota_service_found = True
  75. print(f" 找到FOTA服务: {service.uuid}")
  76. for char in service.characteristics:
  77. print(f" 特征值: {char.uuid} - 属性: {char.properties}")
  78. if char.uuid.lower() == FOTA_CMD_CHAR_UUID.lower():
  79. print(f" -> 命令特征值 (可写)")
  80. elif char.uuid.lower() == FOTA_DATA_CHAR_UUID.lower():
  81. print(f" -> 数据特征值 (可写)")
  82. if not fota_service_found:
  83. print(" 警告: 未找到FOTA服务,但继续尝试...")
  84. return True
  85. except Exception as e:
  86. print(f" 连接失败: {e}")
  87. return False
  88. async def write_characteristic(self, uuid, data):
  89. """写入特征值"""
  90. try:
  91. await self.client.write_gatt_char(uuid, data, response=True)
  92. # 正确提取短UUID(从完整UUID中提取f001/f002部分)
  93. # 完整UUID格式: "0000f001-0000-1000-8000-00805f9b34fb"
  94. # 我们想要提取 "f001" 部分
  95. short_uuid = uuid.split('-')[0][-4:]
  96. print(f" 写入特征值 {short_uuid},数据长度: {len(data)} 字节")
  97. return True
  98. except Exception as e:
  99. print(f" 写入特征值失败: {e}")
  100. return False
  101. async def send_start_command(self):
  102. """发送开始升级命令"""
  103. print("\n5. 发送开始升级命令...")
  104. # 连接成功后短暂延时
  105. print(" 连接成功,等待1秒...")
  106. await asyncio.sleep(1)
  107. # 发送开始升级命令
  108. start_cmd = struct.pack("<BI", CMD_START_UPGRADE, self.total_size)
  109. if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, start_cmd):
  110. return False
  111. print(" 开始命令发送完成")
  112. await asyncio.sleep(1) # 等待设备准备
  113. return True
  114. async def send_firmware_data(self):
  115. """Send firmware data in chunks with optimized delay"""
  116. print("\n6. 分块传输固件数据...")
  117. sent_bytes = 0
  118. start_time = time.time()
  119. packet_count = 0
  120. # 优化延时:减少到100ms以提高速度
  121. PACKET_DELAY = 0.1
  122. while sent_bytes < self.total_size:
  123. chunk_size = min(MAX_PACKET_SIZE, self.total_size - sent_bytes)
  124. chunk = self.firmware_data[sent_bytes:sent_bytes + chunk_size]
  125. if not await self.write_characteristic(FOTA_DATA_CHAR_UUID, chunk):
  126. return False
  127. sent_bytes += chunk_size
  128. packet_count += 1
  129. # 短暂延时,避免数据丢失
  130. await asyncio.sleep(PACKET_DELAY)
  131. # 每20个数据包显示一次进度
  132. if packet_count % 20 == 0 or sent_bytes >= self.total_size:
  133. progress = (sent_bytes / self.total_size) * 100
  134. elapsed = time.time() - start_time
  135. speed = sent_bytes / elapsed / 1024 if elapsed > 0 else 0
  136. remaining_time = (self.total_size - sent_bytes) / (sent_bytes / elapsed) if sent_bytes > 0 else 0
  137. print(f" 进度: {progress:.1f}% - {speed:.1f} KB/s - 已发送 {packet_count} 包 - 预计剩余: {remaining_time:.1f}s")
  138. total_time = time.time() - start_time
  139. avg_speed = self.total_size / total_time / 1024
  140. print(f" 数据传输完成! 总时间: {total_time:.1f}s, 平均速度: {avg_speed:.1f} KB/s")
  141. return True
  142. async def end_upgrade(self):
  143. """Send end upgrade command"""
  144. print("\n7. 发送结束升级命令...")
  145. end_cmd = struct.pack("B", CMD_END_UPGRADE)
  146. if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, end_cmd):
  147. return False
  148. print(" 结束命令发送完成")
  149. # 等待设备处理
  150. print("\n8. 等待设备处理升级...")
  151. await asyncio.sleep(5) # 给设备足够时间处理
  152. return True
  153. async def run(self):
  154. """Main execution flow"""
  155. # 1. 加载固件文件
  156. print("\n1. 加载固件文件...")
  157. if not await self.load_firmware():
  158. return False
  159. # 2. 扫描目标设备
  160. device = await self.scan_device()
  161. if not device:
  162. return False
  163. # 3. 连接设备
  164. if not await self.connect_device(device):
  165. return False
  166. try:
  167. # 4. 发送开始命令
  168. if not await self.send_start_command():
  169. return False
  170. # 5. 发送固件数据
  171. if not await self.send_firmware_data():
  172. return False
  173. # 6. 结束升级
  174. if not await self.end_upgrade():
  175. return False
  176. print("\n" + "="*50)
  177. print("升级流程完成! 设备应该正在重启...")
  178. print("="*50)
  179. return True
  180. except Exception as e:
  181. print(f" 升级过程中出现错误: {e}")
  182. return False
  183. finally:
  184. # 断开连接
  185. if self.client and self.client.is_connected:
  186. await self.client.disconnect()
  187. print(" 已断开连接")
  188. async def main():
  189. import argparse
  190. parser = argparse.ArgumentParser(description="蓝牙FOTA升级工具")
  191. parser.add_argument("-f", "--firmware", required=True, help="固件文件路径")
  192. parser.add_argument("-d", "--device", default="Air8000_FOTA", help="设备名称")
  193. args = parser.parse_args()
  194. tool = SimpleFotaTool(args.device, args.firmware)
  195. success = await tool.run()
  196. return success
  197. if __name__ == "__main__":
  198. success = asyncio.run(main())
  199. exit(0 if success else 1)