更新时间:2025.9.19
版本:V1.2
本教程实验平台:幻思创新FanciSwarm® 树莓派Zero2W 无人机和个人PC的虚拟机Ubuntu20.04LTS环境,Ubuntu系统安装教程请点击查看。
该项目主要实现了基于FFmpeg的视频流传输、OpenCV的实时视频显示、以及通过OpenCV的Aruco模块进行ArUco码检测和位姿估计。同时,项目中加入了多进程与线程管理,其中使用共享内存来处理并行任务。代码的主要功能包括:
camera_biaoding
├─ biaoding.py
└─ take_pic_for_biaoding.py
Aruco_detection
├─ aruco_detect.py
├─ datalink_serial.py
├─ mavcrc.py
├─ mavcrc.pyc
├─ mavlink.py
├─ mavlink.pyc
└─ __pycache__
├─ datalink_serial.cpython-38.pyc
├─ mavcrc.cpython-36.pyc
├─ mavcrc.cpython-38.pyc
├─ mavlink.cpython-36.pyc
└─ mavlink.cpython-38.pyc
serial_wifi_demo
├─ auto_ap.sh
├─ datalink_serial_wifi.py
└─ README.md

先按照官网教程学会连接无人机的手机APP
在APP中,把串口1波特率配置为460800
相机标定是通过拍摄已知尺寸的棋盘格图像来获取相机的内参矩阵和畸变系数的过程。

以下是使用OpenCV对摄像头进行标定的步骤:
import cv2
import numpy as np
import glob
# 棋盘格规格(内角点的数量) {#棋盘格规格内角点的数量 }
chessboard_size = (9, 6)
# 准备对象点,例如(0,0,0), (1,0,0), (2,0,0) ..., (8,5,0) {#准备对象点例如000-100-200--850 }
objp = np.zeros((chessboard_size[0] * chessboard_size[1], 3), np.float32)
objp[:, :2] = np.mgrid[0:chessboard_size[0], 0:chessboard_size[1]].T.reshape(-1, 2)
# 存储所有的对象点和图像点 {#存储所有的对象点和图像点 }
objpoints = [] # 在真实世界坐标系中的3D点
imgpoints = [] # 在图像平面的2D点
# 获取棋盘格图片 {#获取棋盘格图片 }
images = glob.glob('calibration_images/*.jpg')
for fname in images:
img = cv2.imread(fname)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 找到棋盘格的角点
ret, corners = cv2.findChessboardCorners(gray, chessboard_size, None)
# 如果找到足够的角点,添加对象点和图像点
if ret:
objpoints.append(objp)
imgpoints.append(corners)
# 显示角点
img = cv2.drawChessboardCorners(img, chessboard_size, corners, ret)
cv2.imshow('img', img)
cv2.waitKey(500)
cv2.destroyAllWindows()
# 标定摄像头 {#标定摄像头 }
ret, camera_matrix, dist_coeffs, rvecs, tvecs = cv2.calibrateCamera(objpoints, imgpoints, gray.shape[::-1], None, None)
print("相机内参矩阵:\n", camera_matrix)
print("畸变系数:\n", dist_coeffs)
# 保存标定结果 {#保存标定结果 }
np.savez('calibration_data.npz', camera_matrix=camera_matrix, dist_coeffs=dist_coeffs)
import cv2
import os
import datetime
# 设置保存图片的文件夹 {#设置保存图片的文件夹 }
output_dir = "captured_images"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 设置树莓派摄像头的视频流URL(替换为你的实际URL) {#设置树莓派摄像头的视频流url替换为你的实际url }
video_stream_url = "tcp://192.168.0.201:2024/stream" # 替换为实际的树莓派摄像头视频流URL
# 打开摄像头视频流 {#打开摄像头视频流 }
cap = cv2.VideoCapture(video_stream_url)
if not cap.isOpened():
print("无法打开视频流")
exit()
print("按下 's' 键保存当前帧,按 'q' 键退出程序")
while True:
# 捕获当前帧
ret, frame = cap.read()
if not ret:
print("无法读取视频帧")
break
# 显示实时视频流
cv2.imshow("Raspberry Pi Camera Stream", frame)
# 监听键盘输入
key = cv2.waitKey(1) & 0xFF
if key == ord('s'):
# 生成唯一的文件名并保存图片
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
image_path = os.path.join(output_dir, f"image_{timestamp}.jpg")
cv2.imwrite(image_path, frame)
print(f"图片已保存: {image_path}")
elif key == ord('q'):
# 按下 'q' 键退出程序
print("退出程序")
break
# 释放摄像头资源并关闭窗口 {#释放摄像头资源并关闭窗口 }
cap.release()
cv2.destroyAllWindows()
提供一个在线网站:https://chev.me/arucogen/
需要根据Aruco字典和检测参数来生成不同规格的Aruco码,并需要在检测程序中对应修改。具体更改位置参考下文。
树莓派端作为中转,主要有如下功能:
sudo gedit /etc/rc.local
su pi -c "sh ~/auto_ap.sh"



HOST_IP = "192.168.0.201" # IP地址不固定,要考虑路由器可用IP范围
sudo ifconfig wlan0 192.168.0.201
更详细的树莓派配置说明在serial_wifi_demo的README文件中安装OpenCV-python依赖库,要注意下载的python包版本,不同版本的函数会有差别。
aruco-detection-new v0.1.0
└── opencv-contrib-python v4.6.0.66
└── numpy v1.24.4
pip install opencv-contrib-python==4.6.0.66
pip install numpy==1.24
pip install pynput
更改IP端口
datalink_serial.py第14行中
HOST_IP = "192.168.0.204" # IP地址不固定,要考虑路由器可用IP范围
aruco_detect.py中
ffmpeg_command = [
'ffmpeg',
'-i', 'tcp://192.168.0.204:2024', # IP摄像头地址
'-f', 'image2pipe',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-bufsize', '500k',
'-'
]
更改完IP之后就可以运行例程程序了
python3 aruco_detect.py
启动后可以看见两个窗口,一个为实时显示,另一个为Aruco码检测窗口,出现Aruco码时候会检测并打印其位置信息。
# 加载相机标定数据 {#加载相机标定数据 }
with np.load('/home/wsg/桌面/calib_raspi/calibration_data.npz') as data:
camera_matrix = data['camera_matrix']
dist_coeffs = data['dist_coeffs']
f_x = camera_matrix[0, 0] # 获取水平焦距
f_y = camera_matrix[1, 1] # 获取垂直焦距
cx = camera_matrix[0, 2] # 图像中心的 x 坐标
cy = camera_matrix[1, 2] # 图像中心的 y 坐标
aruco_dict = aruco.Dictionary_get(aruco.DICT_6X6_250)
parameters = aruco.DetectorParameters_create()
if __name__ == '__main__':
W_img = 640
H_img = 480
shared_array = mp.Array('B', W_img * H_img * 3) # 共享内存,大小为图片像素总数
lock = mp.Lock() # 互斥锁,保证对共享内存的访问是线程安全的
ffmpeg_command = [
'ffmpeg',
'-i', 'tcp://192.168.137.234:2024', # IP摄像头地址,替换为你的摄像头IP
'-f', 'image2pipe',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-bufsize', '500k', # 增加缓冲区
'-'
]
p1 = mp.Process(target=video_display, args=(ffmpeg_command, shared_array, lock))
p2 = mp.Process(target=qr_code_detection, args=(shared_array, lock))
p1.start()
p2.start()
p1.join()
p2.join()
入口参数:ffmpeg_command: ffmpeg命令处理视频流数据
shared_array: 多进程共享内存对象,用于在多个进程间共享图像数据。
lock: 用于保护共享内存访问的线程/进程锁(防止读写冲突)。
def video_display(ffmpeg_command, shared_array, lock):
process = subprocess.Popen(ffmpeg_command, stdout=subprocess.PIPE)
W_img = 640
H_img = 480
while True:
raw_frame = process.stdout.read(W_img * H_img * 3)
if len(raw_frame) != W_img * H_img * 3:
break
frame = np.frombuffer(raw_frame, np.uint8).reshape((H_img, W_img, 3))
with lock:
np.copyto(np.frombuffer(shared_array.get_obj(), dtype=np.uint8).reshape(H_img, W_img, 3), frame)
cv2.imshow('Real-Time Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
process.terminate()
def qr_code_detection(shared_array, lock):
W_img = 640
H_img = 480
aruco_size = 0.1 # ArUco码的实际边长,单位:米
kp_x = 0.6 #pid参数
kp_y = 0.6
kp_alt = 0.6
kp_yaw = 0.3
dl = datalink() # 实例化 datalink
data_thread = threading.Thread(target=dl.main)
data_thread.start()
while True:
with lock:
frame = np.frombuffer(shared_array.get_obj(), dtype=np.uint8).reshape(H_img, W_img, 3).copy()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejected = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
if ids is not None:
# 估计ArUco码的位姿
rvecs, tvecs, _ = aruco.estimatePoseSingleMarkers(corners, aruco_size, camera_matrix, dist_coeffs)
for i in range(len(ids)):
# 获取ArUco码的旋转向量和位移向量
rvec = rvecs[i][0]
tvec = tvecs[i][0]
# 绘制检测到的ArUco码的边框和轴
aruco.drawDetectedMarkers(frame, corners)
#aruco.drawAxis(frame, camera_matrix, dist_coeffs, rvec, tvec, 0.1)
# 将位姿转换为控制指令,例如距离(dz_m)、左右偏移量(dx_m)和高度差(dy_m)
dz_m = tvec[2] # 前后距离
dx_m = tvec[0] # 左右偏移
dy_m = -tvec[1] # 高度偏移
print(f"Translation vector: {tvec}")
print(f"Rotation vector: {rvec}")
# ArUco码的中心点 (计算其相对于图像中心的偏移)
corner_points = corners[i][0]
x1, y1 = corner_points[0]
x2, y2 = corner_points[2]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
# 计算相对于图像中心的偏移
dx_pixel = center_x - cx # 水平偏移量(像素)
dy_pixel = center_y - cy # 垂直偏移量(像素)
# 根据焦距计算偏航角 (yaw)(左右偏移量转换成弧度)
d_yaw = np.arctan(dx_pixel / f_x) # 水平偏移转为弧度
# 假设我们将无人机的目标距离设定为1.5米
dx_1 = dz_m - 1.5 # 距离与目标距离之差
dy_1 = dx_m
d_alt_1 = dy_m
#d_yaw = np.arctan(dx_m / f_x) # 获取yaw角
dl.set_pose(kp_x * dx_1, kp_y * dy_1, kp_alt * d_alt_1, kp_yaw * d_yaw)
label = f'dx: {dx_m:.2f} m, dy: {dy_m:.2f} m, dz: {dz_m:.2f} m, d_yaw:{d_yaw:.2f} rad'
cv2.putText(frame, label, (10, 30 + i * 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
else:
print("No ArUco marker detected.")
cv2.imshow('ArUco Code Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
dl.socket_tcp.close()
data_thread.join()
estimatePoseSingleMarkers()
函数作用:估计marker在相机坐标系中的3D位姿。
输出参数:
注意:这里是相机坐标系是以左上角为零点,xy平面平行于相机2D平面,z轴垂直于相机2D平面。

# 估计ArUco码的位姿
rvecs, tvecs, _ = aruco.estimatePoseSingleMarkers(corners, aruco_size, camera_matrix, dist_coeffs)
for i in range(len(ids)):
# 获取ArUco码的旋转向量和位移向量
rvec = rvecs[i][0]
tvec = tvecs[i][0]
# 将位姿转换为控制指令,例如距离(dz_m)、左右偏移量(dx_m)和高度差(dy_m)
dz_m = tvec[2] # 前后距离
dx_m = tvec[0] # 左右偏移
dy_m = -tvec[1] # 高度偏移
print(f"Translation vector: {tvec}")
print(f"Rotation vector: {rvec}")
# ArUco码的中心点 (计算其相对于图像中心的偏移)
corner_points = corners[i][0]
x1, y1 = corner_points[0]
x2, y2 = corner_points[2]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
# 计算相对于图像中心的偏移
dx_pixel = center_x - cx # 水平偏移量(像素)
dy_pixel = center_y - cy # 垂直偏移量(像素)
# 根据焦距计算偏航角 (yaw)(左右偏移量转换成弧度)
d_yaw = np.arctan(dx_pixel / f_x) # 水平偏移转为弧度
# 假设我们将无人机的目标距离设定为1.5米
dx_1 = dz_m - 1.5 # 距离与目标距离之差
dy_1 = dx_m
d_alt_1 = dy_m
#d_yaw = np.arctan(dx_m / f_x) # 获取yaw角
# 设置位姿函数
dl.set_pose(kp_x * dx_1, kp_y * dy_1, kp_alt * d_alt_1, kp_yaw * d_yaw)
# dx:m dy:m d_alt:m d_yaw:rad
def set_pose(self, dx, dy, d_alt, d_yaw):
if (self.pos_x != 0) and (self.pos_y != 0) and (self.att_yaw != 0):
global_dx = dx * math.cos(self.att_yaw) - dy * math.sin(self.att_yaw)
global_dy = dx * math.sin(self.att_yaw) + dy * math.cos(self.att_yaw)
self.x = self.pos_x + global_dx
self.y = self.pos_y + global_dy
self.z = self.relative_alt + d_alt
self.yaw = self.att_yaw + d_yaw
self.mav_drone.set_position_target_local_ned_send(0, 0, 0, 0, 0, self.x, self.y, self.z, 0, 0, 0, 0, 0, 0, self.yaw, 0)
# !/usr/bin/env python
# coding=utf-8
import time
import sys, os
import threading
import math
import struct, array, time, json
from mavlink import *
from socket import *
from pynput import keyboard # 导入键盘监听库文件
pynput 是一个用于监控和控制鼠标、键盘输入的 Python 库。def on_press(self,key):
try:
# 获取按下的按键
print('You pressed {0}'.format(key.char))
if key.char=='a' :
self.set_arm()
print('解锁')
if key.char=='d' :
self.set_disarm()
print('锁定')
if key.char=='t' :
self.set_takeoff()
self.set_xy_pose(0, 0, 0)
print('起飞')
if key.char=='l' :
self.set_land()
print('降落')
except AttributeError:
print('You pressed {0}'.format(key))
if format(key) == 'Key.up':
self.set_xy_pose(0.2, 0, 0)
print('前进')
if format(key) == 'Key.down':
self.set_xy_pose(-0.2, 0, 0)
print('后退')
if format(key) == 'Key.right':
self.set_xy_pose(0, 0.2, 0)
print('右移')
if format(key) == 'Key.left':
self.set_xy_pose(0, -0.2, 0)
print('左移')
定义了一个按键事件处理函数,用于监听键盘输入,根据不同的按键执行对应的操作,如解锁、锁定、起飞、降落及前后左右移动。def main(self):
print('data link is starting...\n')
self.socket_tcp.connect(self.host_addr)
drone_thread = threading.Thread(target=self.drone)
try:
drone_thread.start()
listener = keyboard.Listener(on_press=self.on_press) # 创建键盘监听对象 --------
listener.start() # 开始监听 --------
while True:
self.mav_drone.heartbeat_send(MAV_TYPE_GCS, MAV_AUTOPILOT_INVALID, MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, 0, MAV_STATE_STANDBY, 1)
time.sleep(1)
except Exception as e:
print("Error sending message:", e)
finally:
drone_thread.join()
self.socket_tcp.close()
listener.join() # 等待监听线程 --------
首先我们创建了一个键盘监听对象,并开始进行键盘按键的监听,最后我们需要监听程序不断运行。