YOLOv8 + DeepSORT 实现多目标“越线计数”:从检测、跟踪到状态机(Day 5)
系列专栏 · 第 5 篇 / 7
目标:在单个脚本里完成“目标检测 → 多目标跟踪 → 越线计数/方向统计 → 可视化与日志输出”,并讲清楚版本选择、参数含义、常见坑与性能优化。
适用:行人/车辆/动物等的进出统计、通道客流、店铺进店率评估、安防告警前置等。
1|功能说明与效果预期
-
检测:使用
ultralytics的 YOLOv8(可换任意 pt/onnx/engine),类别可过滤(如只统计人/车)。 -
跟踪:默认用
deep-sort-realtime(Kalman + ReID),稳定生成track_id。 - 计数:支持 单条越线 或 区域进出。本文主讲越线:通过线段两侧符号 + 状态机(FSM) 防止误计数。
- 输出:画面叠加框/ID/轨迹/统计面板;同时导出 CSV 日志(时间戳、id、方向)。
- 性能:1080p 视频在 T4/3060 上 30~60 FPS(视模型大小而定);CPU 也可跑但帧率有限。
2|环境与版本
2.1 依赖(推荐版本)
ultralytics==8.2.103
deep-sort-realtime==1.3.2
opencv-python==4.9.0.80
numpy>=1.24
为什么这样选?
ultralytics 8.2+:接口稳定、支持model(source)的简写与半精度。deep-sort-realtime 1.3+:API 简单(DeepSort.update_tracks(detections, frame)),无需额外编译。- 如果你要 ByteTrack/StrongSORT,可以把跟踪部分替换即可,计数逻辑不变。
2.2 安装
# 建议在虚拟环境
pip install -U ultralytics deep-sort-realtime opencv-python
3|项目结构与样例素材
line-counter/
├─ counter.py # 主脚本(可直接运行)
├─ config.yaml # 可选:参数配置(线段/类别/阈值)
├─ samples/
│ ├─ demo.mp4 # 测试视频(自备)
│ └─ roi.json # 可选:交互标定生成的线段
└─ runs/
├─ out.mp4 # 导出视频
└─ events.csv # 计数日志
素材准备:
- 任选一个监控视频(建议固定机位,无遮挡过多)。
- 需要你先目测越线方向(如“上→下”算
down,“下→上”算up)。
4|计数核心思路(避免误计的关键)
4.1 线段两侧的“符号”判断
给定线段两端点 A(x1,y1)、B(x2,y2),目标中心 P(cx,cy),
计算有向面积:
[
s§ = (x2-x1)cdot(cy-y1) – (y2-y1)cdot(cx-x1)
]
-
s(P) > 0:P 在 AB 的左侧 -
s(P) :P 在 AB 的右侧 -
s(P) = 0:在直线上(考虑数值容差)
当同一 track_id 的 s(P) 从正→负或负→正,并且位移越过阈值,即判定一次“越线事件”。符号变化的方向即为 up / down(你可自定义含义)。
4.2 FSM(状态机)消抖
对每个 track_id 维护状态:
state = 'idle' | 'armed'-
last_sign:上次符号(+1/-1/0) -
last_pos:上次坐标(防抖、位移判定)
规则:
- 初次出现 → 记录
last_sign。 - 若
sign * last_sign == -1(发生符号翻转),并且位移 > move_thresh,则触发计数,状态转回idle。 - 很久未更新(如 1 秒)自动清理,防止内存涨。
5|完整代码(counter.py)
配置参数都放在 argparse 中,无需额外文件也能跑。
python counter.py --source samples/demo.mp4 --line 0,300,1280,300 --classes 0
其中--classes 0表示只统计 person(COCO 类别 0),--line为越线坐标。
# -*- coding: utf-8 -*-
import argparse, os, time, csv, math, json
from collections import defaultdict, deque
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
# ---------- 工具:几何/绘图 ----------
def line_sign(A, B, P):
"""返回点 P 相对于有向线段 AB 的符号(>0 左侧,
(x1, y1), (x2, y2) = A, B
cx, cy = P
return (x2 - x1) * (cy - y1) - (y2 - y1) * (cx - x1)
def side_to_dir(prev_sign, curr_sign, up_label="up", down_label="down"):
"""把符号翻转映射为方向:正->负 记为down,负->正 记为up(可按需求互换)"""
if prev_sign > 0 and curr_sign 0:
return down_label
if prev_sign 0 and curr_sign > 0:
return up_label
return None
def draw_line(frame, A, B, color=(0, 255, 0), thickness=2):
cv2.line(frame, (int(A[0]), int(A[1])), (int(B[0]), int(B[1])), color, thickness)
def put_text(img, text, org, scale=0.7, color=(255,255,255), thickness=2, bg=True):
if bg:
(w, h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
x, y = org
cv2.rectangle(img, (x, y-h-6), (x+w+6, y+6), (0,0,0), -1)
cv2.putText(img, text, org, cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness, cv2.LINE_AA)
def xyxy_to_cxy(box):
x1, y1, x2, y2 = box
return (int((x1+x2)/2), int((y1+y2)/2))
# ---------- 计数器(FSM) ----------
class LineCounter:
def __init__(self, A, B, move_thresh=20, ttl=60):
self.A, self.B = A, B
self.move_thresh = move_thresh
self.ttl = ttl # 多少帧不更新就清理
self.states = {} # tid -> dict
self.counts = {"up": 0, "down": 0}
def update(self, tid, center):
now = time.time()
s = self.states.get(tid)
sign = line_sign(self.A, self.B, center)
sign = 1 if sign > 0 else (-1 if sign 0 else 0)
if s is None:
self.states[tid] = {
"last_sign": sign,
"last_pos": center,
"last_t": now
}
return None
# 清理过期
s["last_t"] = now
prev_sign = s["last_sign"]
last_pos = s["last_pos"]
s["last_sign"] = sign
s["last_pos"] = center
# 位移阈值(像素)
move = math.dist(center, last_pos)
if prev_sign == 0 or sign == 0 or prev_sign == sign:
return None
if move self.move_thresh:
return None
# 发生翻转:计数
direction = side_to_dir(prev_sign, sign)
if direction:
self.counts[direction] += 1
return direction
return None
def gc(self):
now = time.time()
drop = [tid for tid, s in self.states.items() if now - s["last_t"] > self.ttl]
for tid in drop:
self.states.pop(tid, None)
# ---------- 推理/跟踪流水线 ----------
class Pipeline:
def __init__(self, args):
self.args = args
self.model = YOLO(args.weights)
self.half = args.half and (self.model.device.type != "cpu")
self.model.fuse()
self.tracker = DeepSort(
max_age=30, n_init=2, max_iou_distance=0.7,
embedder="mobilenet", half=self.half
)
# 线段
if args.line:
x1, y1, x2, y2 = [float(x) for x in args.line.split(",")]
self.A, self.B = (x1, y1), (x2, y2)
elif args.roi and os.path.exists(args.roi):
with open(args.roi, "r", encoding="utf-8") as f:
data = json.load(f)
self.A = tuple(data["line"]["A"])
self.B = tuple(data["line"]["B"])
else:
self.A, self.B = (0, 300), (1280, 300) # fallback
self.counter = LineCounter(self.A, self.B, move_thresh=args.move_thresh, ttl=60)
# 类别过滤(COCO:person=0,car=2,bus=5,truck=7)
if args.classes is not None:
self.allowed = set([int(x) for x in args.classes.split(",")])
else:
self.allowed = None
# 输出
os.makedirs(args.out_dir, exist_ok=True)
self.csv_path = os.path.join(args.out_dir, "events.csv")
self.video_out = None
self.csv_fp = open(self.csv_path, "w", newline="", encoding="utf-8")
self.csv_writer = csv.writer(self.csv_fp)
self.csv_writer.writerow(["ts", "track_id", "direction", "cx", "cy"])
def close(self):
if self.video_out:
self.video_out.release()
self.csv_fp.close()
def run(self):
cap = cv2.VideoCapture(self.args.source)
assert cap.isOpened(), f"cannot open: {self.args.source}"
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 30.0
if self.args.save_video:
out_path = os.path.join(self.args.out_dir, "out.mp4")
self.video_out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), FPS, (W, H))
last_fps_t = time.time()
fcounter = 0
while True:
ret, frame = cap.read()
if not ret:
break
fcounter += 1
# 可选跳帧
if self.args.skip > 0 and (fcounter % (self.args.skip + 1) != 1):
continue
# ---------- 1) 检测 ----------
res = self.model.predict(
frame, imgsz=self.args.imgsz, conf=self.args.conf, iou=self.args.iou,
device=self.model.device, half=self.half, verbose=False
)[0]
dets = []
for *xyxy, conf, cls in res.boxes.data.cpu().numpy():
cls = int(cls)
if self.allowed is not None and cls not in self.allowed:
continue
x1, y1, x2, y2 = xyxy
# DeepSort 需要 (ltrb), confidence, class
dets.append(([x1, y1, x2, y2], float(conf), cls))
# ---------- 2) 跟踪 ----------
tracks = self.tracker.update_tracks(dets, frame=frame)
# ---------- 3) 可视化 + 计数 ----------
draw_line(frame, self.A, self.B, color=(50, 220, 50), thickness=2)
for t in tracks:
if not t.is_confirmed():
continue
ltrb = t.to_ltrb()
x1, y1, x2, y2 = map(int, ltrb)
tid = t.track_id
cx, cy = xyxy_to_cxy((x1, y1, x2, y2))
# 计数
event = self.counter.update(tid, (cx, cy))
if event:
self.csv_writer.writerow([int(time.time()), tid, event, cx, cy])
# 框与信息
color = (60, 160, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
put_text(frame, f"id:{tid}", (x1, max(20, y1-8)), scale=0.6, color=(255,255,0))
cv2.circle(frame, (cx, cy), 3, (0, 255, 255), -1)
# 右上角面板
panel = f"UP:{self.counter.counts['up']} DOWN:{self.counter.counts['down']}"
put_text(frame, panel, (W-220, 30), scale=0.8, color=(255,255,255))
# FPS
now = time.time()
if now - last_fps_t >= 1.0:
fps = fcounter / (now - last_fps_t)
last_fps_t = now
fcounter = 0
self.last_fps = fps
fps = getattr(self, "last_fps", 0.0)
put_text(frame, f"{fps:.1f} FPS", (10, 30), scale=0.8, color=(0,255,180))
# 输出/显示
if self.video_out:
self.video_out.write(frame)
if self.args.view:
cv2.imshow("count", frame)
if cv2.waitKey(1) & 0xFF == 27:
break
self.counter.gc()
cap.release()
self.close()
cv2.destroyAllWindows()
# ---------- CLI ----------
def parse_args():
p = argparse.ArgumentParser("YOLOv8 + DeepSORT 越线计数")
p.add_argument("--source", type=str, default="samples/demo.mp4", help="视频或摄像头(0)")
p.add_argument("--weights", type=str, default="yolov8n.pt", help="YOLOv8 权重或路径")
p.add_argument("--imgsz", type=int, default=640)
p.add_argument("--conf", type=float, default=0.35)
p.add_argument("--iou", type=float, default=0.5)
p.add_argument("--classes", type=str, default=None, help="只保留的类别, 如 '0' 或 '0,2,5,7'")
p.add_argument("--line", type=str, default=None, help="越线坐标 x1,y1,x2,y2")
p.add_argument("--roi", type=str, default=None, help="JSON 文件: {'line': {'A':[x1,y1],'B':[x2,y2]}}")
p.add_argument("--move-thresh", type=float, default=20, help="触发计数的最小位移像素")
p.add_argument("--skip", type=int, default=0, help="跳帧数(0 不跳)")
p.add_argument("--half", action="store_true", help="半精度推理(GPU)")
p.add_argument("--view", action="store_true", help="窗口显示")
p.add_argument("--save-video", action="store_true", help="保存 out.mp4")
p.add_argument("--out-dir", type=str, default="runs", help="输出目录")
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
if args.source.isdigit():
args.source = int(args.source)
os.makedirs(args.out_dir, exist_ok=True)
Pipeline(args).run()
6|快速上手:两条命令
# 1) 安装依赖
pip install -U ultralytics deep-sort-realtime opencv-python
# 2) 运行
python counter.py --source samples/demo.mp4 --line 0,420,1280,420 --classes 0 --view --save-video
提示:
--classes 0表示只计数 COCO 的 person;如果要车流:--classes 2,5,7。- 摄像头:
--source 0。- 越线方向规则:以 A→B 的方向为上行坐标轴,正→负记为
down,负→正记为up(可在side_to_dir里换)。
7|参数解释与调优
| 参数 | 说明 | 建议 |
|---|---|---|
--conf |
置信度阈值 | 人流密集可适当降低以减少漏检;过低会增多误检 |
--iou |
NMS IOU 阈值 | 0.5~0.7 合理,太高会残留重复框 |
--classes |
类别过滤 | COCO:0 人,2 车,5 巴士,7 卡车 |
--imgsz |
推理输入尺寸 | 640/736/960;越大检测更准但更慢 |
--move-thresh |
触发位移阈值 | 20~40 px;摄像头越远、像素越小要减少 |
--skip |
跳帧 | 省算力;--skip 1 表示每 2 帧跑一次 |
GPU 提升
- 轻量模型:
yolov8n.pt / yolov8s.pt; - 半精度:加
--half; - 固定输入大小,禁用图像增强(默认就是)。
8|常见问题与坑位
-
ID 抖动导致重复计数
- 调大
DeepSort的max_age(如 60),允许短时丢失重连。 - 把
move_thresh设为 20~40,过滤微小抖动。
- 调大
-
遮挡严重/交汇处漏检
- 模型换大些:
yolov8m.pt; - 角度不佳时可布设两条线并取“逻辑与”(超出本文范围)。
- 模型换大些:
-
画面抖动导致线跟着动
- 固定机位很重要;移动摄像建议先做视频稳像(OpenCV 的
videostab或光流)。
- 固定机位很重要;移动摄像建议先做视频稳像(OpenCV 的
-
多类别同线计数
- 可在
events.csv中加入cls_name字段;或建多条计数器分别统计。
- 可在
-
边界穿越(目标只露一半)
- 使用目标中心点而非底边;必要时可结合速度方向做二次判断。
9|扩展:区域进出计数(思路)
将“线”改为“多边形区域”:
- 维护
point_in_poly(P, poly); - 记录
last_in(布尔),由False→True判定“进入”,True→False判定“离开”; - 对抖动边界设缓冲带(形态学膨胀/腐蚀,或扩大/收缩多边形)。
10|导出与部署
-
ONNX/TensorRT:YOLOv8 一行导出
yolo export model=yolov8n.pt format=onnx # 或 engine把
YOLO('xxx.engine')作为权重即可加载。 -
多进程/多路视频:用
multiprocessing为每路视频开一个进程,或上消息队列;脚本内状态互不干扰。 -
Docker:
- 基础镜像
nvidia/cuda:12.1.0-runtime-ubuntu22.04; - 安装 Python 3.10 + 依赖;
-
--gpus all运行。
- 基础镜像
11|验证与评估
- 定性:观察 out.mp4 的 ID 连续性、越线计数与肉眼是否一致。
-
定量:选取 5~10 分钟片段人工标注真值(up/down 次数),对比
events.csv统计误差。 -
指标:
- Recall(召回):真实越线中被识别的比例;
- Precision(准确):识别为越线中正确的比例;
- IDF1(跟踪一致性):可用 MOTChallenge 工具评估。
12|SEO 关键词
YOLOv8、DeepSORT、目标跟踪、越线计数、客流统计、车辆计数、状态机、Python 实战、OpenCV、TensorRT
13|结语 & CTA
- 你的视频场景是上/下行还是区域进出?在评论区描述一下(或贴短视频片段),我给你线段坐标和参数的定制建议。
- 如果你希望我把脚本改造成 多路 RTSP 实时计数服务(含 REST API、Prometheus 指标),点个赞并评论 “要服务版”,我就更新第 5.5 篇加餐。
附:配置文件模板(可选 config.yaml)
# 仅当你不想写命令行时
source: samples/demo.mp4
weights: yolov8n.pt
imgsz: 640
conf: 0.35
iou: 0.5
classes: "0" # 只统计 person
line: "0,420,1280,420" # x1,y1,x2,y2
move_thresh: 24
skip: 0
half: true
view: true
save_video: true
out_dir: runs
5bei.cn大模型教程网










