AI大模型教程
一起来学习

YOLOv8 + DeepSORT 实现多目标“越线计数”:从检测、跟踪到状态机(Day 5)

YOLOv8 + DeepSORT 实现多目标“越线计数”:从检测、跟踪到状态机(Day 5)

系列专栏 · 第 5 篇 / 7
目标:在单个脚本里完成“目标检测 → 多目标跟踪 → 越线计数/方向统计 → 可视化与日志输出”,并讲清楚版本选择、参数含义、常见坑性能优化
适用:行人/车辆/动物等的进出统计、通道客流、店铺进店率评估、安防告警前置等。

1|功能说明与效果预期

  • 检测:使用 ultralyticsYOLOv8(可换任意 pt/onnx/engine),类别可过滤(如只统计人/车)。
  • 跟踪:默认用 deep-sort-realtime(Kalman + ReID),稳定生成 track_id
  • 计数:支持 单条越线区域进出。本文主讲越线:通过线段两侧符号 + 状态机(FSM) 防止误计数。
  • 输出:画面叠加框/ID/轨迹/统计面板;同时导出 CSV 日志(时间戳、id、方向)。
  • 性能:1080p 视频在 T4/3060 上 30~60 FPS(视模型大小而定);CPU 也可跑但帧率有限。

2|环境与版本

2.1 依赖(推荐版本)

ultralytics==8.2.103
deep-sort-realtime==1.3.2
opencv-python==4.9.0.80
numpy>=1.24

为什么这样选?

  • ultralytics 8.2+:接口稳定、支持 model(source) 的简写与半精度。
  • deep-sort-realtime 1.3+:API 简单(DeepSort.update_tracks(detections, frame)),无需额外编译。
  • 如果你要 ByteTrack/StrongSORT,可以把跟踪部分替换即可,计数逻辑不变。

2.2 安装

# 建议在虚拟环境
pip install -U ultralytics deep-sort-realtime opencv-python

3|项目结构与样例素材

line-counter/
├─ counter.py              # 主脚本(可直接运行)
├─ config.yaml             # 可选:参数配置(线段/类别/阈值)
├─ samples/
│  ├─ demo.mp4             # 测试视频(自备)
│  └─ roi.json             # 可选:交互标定生成的线段
└─ runs/
   ├─ out.mp4              # 导出视频
   └─ events.csv           # 计数日志

素材准备

  • 任选一个监控视频(建议固定机位,无遮挡过多)。
  • 需要你先目测越线方向(如“上→下”算 down,“下→上”算 up)。

4|计数核心思路(避免误计的关键)

4.1 线段两侧的“符号”判断

给定线段两端点 A(x1,y1)B(x2,y2),目标中心 P(cx,cy)
计算有向面积:
[
s§ = (x2-x1)cdot(cy-y1) – (y2-y1)cdot(cx-x1)
]

  • s(P) > 0:P 在 AB 的左侧
  • s(P) :P 在 AB 的右侧
  • s(P) = 0:在直线上(考虑数值容差)

当同一 track_ids(P)正→负负→正,并且位移越过阈值,即判定一次“越线事件”。符号变化的方向即为 up / down(你可自定义含义)。

4.2 FSM(状态机)消抖

对每个 track_id 维护状态:

  • state = 'idle' | 'armed'
  • last_sign:上次符号(+1/-1/0)
  • last_pos:上次坐标(防抖、位移判定)

规则:

  1. 初次出现 → 记录 last_sign
  2. sign * last_sign == -1(发生符号翻转),并且 位移 > move_thresh,则触发计数,状态转回 idle
  3. 很久未更新(如 1 秒)自动清理,防止内存涨。

5|完整代码(counter.py

配置参数都放在 argparse 中,无需额外文件也能跑。
python counter.py --source samples/demo.mp4 --line 0,300,1280,300 --classes 0
其中 --classes 0 表示只统计 person(COCO 类别 0),--line 为越线坐标。

# -*- coding: utf-8 -*-
import argparse, os, time, csv, math, json
from collections import defaultdict, deque

import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort


# ---------- 工具:几何/绘图 ----------
def line_sign(A, B, P):
    """返回点 P 相对于有向线段 AB 的符号(>0 左侧,
    (x1, y1), (x2, y2) = A, B
    cx, cy = P
    return (x2 - x1) * (cy - y1) - (y2 - y1) * (cx - x1)

def side_to_dir(prev_sign, curr_sign, up_label="up", down_label="down"):
    """把符号翻转映射为方向:正->负 记为down,负->正 记为up(可按需求互换)"""
    if prev_sign > 0 and curr_sign  0:
        return down_label
    if prev_sign  0 and curr_sign > 0:
        return up_label
    return None

def draw_line(frame, A, B, color=(0, 255, 0), thickness=2):
    cv2.line(frame, (int(A[0]), int(A[1])), (int(B[0]), int(B[1])), color, thickness)

def put_text(img, text, org, scale=0.7, color=(255,255,255), thickness=2, bg=True):
    if bg:
        (w, h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
        x, y = org
        cv2.rectangle(img, (x, y-h-6), (x+w+6, y+6), (0,0,0), -1)
    cv2.putText(img, text, org, cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness, cv2.LINE_AA)

def xyxy_to_cxy(box):
    x1, y1, x2, y2 = box
    return (int((x1+x2)/2), int((y1+y2)/2))


# ---------- 计数器(FSM) ----------
class LineCounter:
    def __init__(self, A, B, move_thresh=20, ttl=60):
        self.A, self.B = A, B
        self.move_thresh = move_thresh
        self.ttl = ttl  # 多少帧不更新就清理
        self.states = {}  # tid -> dict
        self.counts = {"up": 0, "down": 0}

    def update(self, tid, center):
        now = time.time()
        s = self.states.get(tid)
        sign = line_sign(self.A, self.B, center)
        sign = 1 if sign > 0 else (-1 if sign  0 else 0)

        if s is None:
            self.states[tid] = {
                "last_sign": sign,
                "last_pos": center,
                "last_t": now
            }
            return None

        # 清理过期
        s["last_t"] = now
        prev_sign = s["last_sign"]
        last_pos = s["last_pos"]
        s["last_sign"] = sign
        s["last_pos"] = center

        # 位移阈值(像素)
        move = math.dist(center, last_pos)
        if prev_sign == 0 or sign == 0 or prev_sign == sign:
            return None
        if move  self.move_thresh:
            return None

        # 发生翻转:计数
        direction = side_to_dir(prev_sign, sign)
        if direction:
            self.counts[direction] += 1
            return direction
        return None

    def gc(self):
        now = time.time()
        drop = [tid for tid, s in self.states.items() if now - s["last_t"] > self.ttl]
        for tid in drop:
            self.states.pop(tid, None)


# ---------- 推理/跟踪流水线 ----------
class Pipeline:
    def __init__(self, args):
        self.args = args
        self.model = YOLO(args.weights)
        self.half = args.half and (self.model.device.type != "cpu")
        self.model.fuse()
        self.tracker = DeepSort(
            max_age=30, n_init=2, max_iou_distance=0.7,
            embedder="mobilenet", half=self.half
        )

        # 线段
        if args.line:
            x1, y1, x2, y2 = [float(x) for x in args.line.split(",")]
            self.A, self.B = (x1, y1), (x2, y2)
        elif args.roi and os.path.exists(args.roi):
            with open(args.roi, "r", encoding="utf-8") as f:
                data = json.load(f)
            self.A = tuple(data["line"]["A"])
            self.B = tuple(data["line"]["B"])
        else:
            self.A, self.B = (0, 300), (1280, 300)  # fallback

        self.counter = LineCounter(self.A, self.B, move_thresh=args.move_thresh, ttl=60)

        # 类别过滤(COCO:person=0,car=2,bus=5,truck=7)
        if args.classes is not None:
            self.allowed = set([int(x) for x in args.classes.split(",")])
        else:
            self.allowed = None

        # 输出
        os.makedirs(args.out_dir, exist_ok=True)
        self.csv_path = os.path.join(args.out_dir, "events.csv")
        self.video_out = None
        self.csv_fp = open(self.csv_path, "w", newline="", encoding="utf-8")
        self.csv_writer = csv.writer(self.csv_fp)
        self.csv_writer.writerow(["ts", "track_id", "direction", "cx", "cy"])

    def close(self):
        if self.video_out:
            self.video_out.release()
        self.csv_fp.close()

    def run(self):
        cap = cv2.VideoCapture(self.args.source)
        assert cap.isOpened(), f"cannot open: {self.args.source}"
        W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        FPS = cap.get(cv2.CAP_PROP_FPS) or 30.0

        if self.args.save_video:
            out_path = os.path.join(self.args.out_dir, "out.mp4")
            self.video_out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), FPS, (W, H))

        last_fps_t = time.time()
        fcounter = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            fcounter += 1

            # 可选跳帧
            if self.args.skip > 0 and (fcounter % (self.args.skip + 1) != 1):
                continue

            # ---------- 1) 检测 ----------
            res = self.model.predict(
                frame, imgsz=self.args.imgsz, conf=self.args.conf, iou=self.args.iou,
                device=self.model.device, half=self.half, verbose=False
            )[0]
            dets = []
            for *xyxy, conf, cls in res.boxes.data.cpu().numpy():
                cls = int(cls)
                if self.allowed is not None and cls not in self.allowed:
                    continue
                x1, y1, x2, y2 = xyxy
                # DeepSort 需要 (ltrb), confidence, class
                dets.append(([x1, y1, x2, y2], float(conf), cls))

            # ---------- 2) 跟踪 ----------
            tracks = self.tracker.update_tracks(dets, frame=frame)

            # ---------- 3) 可视化 + 计数 ----------
            draw_line(frame, self.A, self.B, color=(50, 220, 50), thickness=2)
            for t in tracks:
                if not t.is_confirmed():
                    continue
                ltrb = t.to_ltrb()
                x1, y1, x2, y2 = map(int, ltrb)
                tid = t.track_id
                cx, cy = xyxy_to_cxy((x1, y1, x2, y2))

                # 计数
                event = self.counter.update(tid, (cx, cy))
                if event:
                    self.csv_writer.writerow([int(time.time()), tid, event, cx, cy])

                # 框与信息
                color = (60, 160, 255)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                put_text(frame, f"id:{tid}", (x1, max(20, y1-8)), scale=0.6, color=(255,255,0))
                cv2.circle(frame, (cx, cy), 3, (0, 255, 255), -1)

            # 右上角面板
            panel = f"UP:{self.counter.counts['up']}  DOWN:{self.counter.counts['down']}"
            put_text(frame, panel, (W-220, 30), scale=0.8, color=(255,255,255))

            # FPS
            now = time.time()
            if now - last_fps_t >= 1.0:
                fps = fcounter / (now - last_fps_t)
                last_fps_t = now
                fcounter = 0
                self.last_fps = fps
            fps = getattr(self, "last_fps", 0.0)
            put_text(frame, f"{fps:.1f} FPS", (10, 30), scale=0.8, color=(0,255,180))

            # 输出/显示
            if self.video_out:
                self.video_out.write(frame)
            if self.args.view:
                cv2.imshow("count", frame)
                if cv2.waitKey(1) & 0xFF == 27:
                    break

            self.counter.gc()

        cap.release()
        self.close()
        cv2.destroyAllWindows()


# ---------- CLI ----------
def parse_args():
    p = argparse.ArgumentParser("YOLOv8 + DeepSORT 越线计数")
    p.add_argument("--source", type=str, default="samples/demo.mp4", help="视频或摄像头(0)")
    p.add_argument("--weights", type=str, default="yolov8n.pt", help="YOLOv8 权重或路径")
    p.add_argument("--imgsz", type=int, default=640)
    p.add_argument("--conf", type=float, default=0.35)
    p.add_argument("--iou", type=float, default=0.5)
    p.add_argument("--classes", type=str, default=None, help="只保留的类别, 如 '0' 或 '0,2,5,7'")
    p.add_argument("--line", type=str, default=None, help="越线坐标 x1,y1,x2,y2")
    p.add_argument("--roi", type=str, default=None, help="JSON 文件: {'line': {'A':[x1,y1],'B':[x2,y2]}}")
    p.add_argument("--move-thresh", type=float, default=20, help="触发计数的最小位移像素")
    p.add_argument("--skip", type=int, default=0, help="跳帧数(0 不跳)")
    p.add_argument("--half", action="store_true", help="半精度推理(GPU)")
    p.add_argument("--view", action="store_true", help="窗口显示")
    p.add_argument("--save-video", action="store_true", help="保存 out.mp4")
    p.add_argument("--out-dir", type=str, default="runs", help="输出目录")
    return p.parse_args()


if __name__ == "__main__":
    args = parse_args()
    if args.source.isdigit():
        args.source = int(args.source)
    os.makedirs(args.out_dir, exist_ok=True)
    Pipeline(args).run()

6|快速上手:两条命令

# 1) 安装依赖
pip install -U ultralytics deep-sort-realtime opencv-python

# 2) 运行
python counter.py --source samples/demo.mp4 --line 0,420,1280,420 --classes 0 --view --save-video

提示

  • --classes 0 表示只计数 COCO 的 person;如果要车流:--classes 2,5,7
  • 摄像头:--source 0
  • 越线方向规则:以 A→B 的方向为上行坐标轴,正→负记为 down负→正记为 up(可在 side_to_dir 里换)。

7|参数解释与调优

参数 说明 建议
--conf 置信度阈值 人流密集可适当降低以减少漏检;过低会增多误检
--iou NMS IOU 阈值 0.5~0.7 合理,太高会残留重复框
--classes 类别过滤 COCO:0 人,2 车,5 巴士,7 卡车
--imgsz 推理输入尺寸 640/736/960;越大检测更准但更慢
--move-thresh 触发位移阈值 20~40 px;摄像头越远、像素越小要减少
--skip 跳帧 省算力;--skip 1 表示每 2 帧跑一次

GPU 提升

  • 轻量模型:yolov8n.pt / yolov8s.pt
  • 半精度:加 --half
  • 固定输入大小,禁用图像增强(默认就是)。

8|常见问题与坑位

  1. ID 抖动导致重复计数
    • 调大 DeepSortmax_age(如 60),允许短时丢失重连。
    • move_thresh 设为 20~40,过滤微小抖动。
  2. 遮挡严重/交汇处漏检
    • 模型换大些:yolov8m.pt
    • 角度不佳时可布设两条线并取“逻辑与”(超出本文范围)。
  3. 画面抖动导致线跟着动
    • 固定机位很重要;移动摄像建议先做视频稳像(OpenCV 的 videostab 或光流)。
  4. 多类别同线计数
    • 可在 events.csv 中加入 cls_name 字段;或建多条计数器分别统计。
  5. 边界穿越(目标只露一半)
    • 使用目标中心点而非底边;必要时可结合速度方向做二次判断。

9|扩展:区域进出计数(思路)

将“线”改为“多边形区域”:

  • 维护 point_in_poly(P, poly)
  • 记录 last_in(布尔),由 False→True 判定“进入”,True→False 判定“离开”;
  • 对抖动边界设缓冲带(形态学膨胀/腐蚀,或扩大/收缩多边形)。

10|导出与部署

  • ONNX/TensorRT:YOLOv8 一行导出
    yolo export model=yolov8n.pt format=onnx  # 或 engine
    

    YOLO('xxx.engine') 作为权重即可加载。

  • 多进程/多路视频:用 multiprocessing 为每路视频开一个进程,或上消息队列;脚本内状态互不干扰。
  • Docker
    • 基础镜像 nvidia/cuda:12.1.0-runtime-ubuntu22.04
    • 安装 Python 3.10 + 依赖;
    • --gpus all 运行。

11|验证与评估

  • 定性:观察 out.mp4 的 ID 连续性、越线计数与肉眼是否一致。
  • 定量:选取 5~10 分钟片段人工标注真值(up/down 次数),对比 events.csv 统计误差。
  • 指标
    • Recall(召回):真实越线中被识别的比例;
    • Precision(准确):识别为越线中正确的比例;
    • IDF1(跟踪一致性):可用 MOTChallenge 工具评估。

12|SEO 关键词

YOLOv8、DeepSORT、目标跟踪、越线计数、客流统计、车辆计数、状态机、Python 实战、OpenCV、TensorRT


13|结语 & CTA

  • 你的视频场景是上/下行还是区域进出?在评论区描述一下(或贴短视频片段),我给你线段坐标和参数定制建议
  • 如果你希望我把脚本改造成 多路 RTSP 实时计数服务(含 REST API、Prometheus 指标),点个赞并评论 “要服务版”,我就更新第 5.5 篇加餐。

附:配置文件模板(可选 config.yaml

# 仅当你不想写命令行时
source: samples/demo.mp4
weights: yolov8n.pt
imgsz: 640
conf: 0.35
iou: 0.5
classes: "0"            # 只统计 person
line: "0,420,1280,420"  # x1,y1,x2,y2
move_thresh: 24
skip: 0
half: true
view: true
save_video: true
out_dir: runs

文章来源于互联网:YOLOv8 + DeepSORT 实现多目标“越线计数”:从检测、跟踪到状态机(Day 5)

赞(0)
未经允许不得转载:5bei.cn大模型教程网 » YOLOv8 + DeepSORT 实现多目标“越线计数”:从检测、跟踪到状态机(Day 5)
分享到: 更多 (0)

AI大模型,我们的未来

小欢软考联系我们