Python

Python 使用 OpenCV 開啟、顯示、處理與儲存 RTSP 串流影片教學與範例

介紹如何在 Python 中使用 OpenCV 搭配多行程(multiprocessing)的方式開啟、顯示、處理與儲存 RTSP 串流影片。

測試用 RTSP 串流

網路上有一些測試用的 RTSP 串流伺服器,可用於程式的開發與測試:

安裝 OpenCV

安裝 Python 的 OpenCV 模組:

# 安裝 Python 的 OpenCV 模組
pip3 install opencv-python

開啟與顯示 RTSP 串流影像

OpenCV 中的 VideoCapture() 函數可以用來開啟各種不同的影片來源,包含影片檔案、網路攝影機、RTSP 串流等,以下是使用 OpenCV 開啟 RTSP 串流影片的範例。

import cv2

if __name__ == '__main__':

    # 開啟 RTSP 串流
    vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

    # 建立視窗
    cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)

    while True:
        # 從 RTSP 串流讀取一張影像
        ret, image = vidCap.read()

        if ret:
            # 顯示影像
            cv2.imshow('image_display', image)
            cv2.waitKey(10)
        else:
            # 若沒有影像跳出迴圈
            break

    # 釋放資源
    vidCap.release()

    # 關閉所有 OpenCV 視窗
    cv2.destroyAllWindows()

上面這段 Python 指令稿執行之後,就會開啟一個 OpenCV 的視窗,顯示 RTSP 串流的影像:

OpenCV 視窗顯示 RTSP 串流影像

多行程版本

若在 CPU 處理速度比較慢的機器上,或是遇到影格率(frame rate)較高的影片,可能會出現單一執行緒無法及時處理的問題,這時候就可以考慮使用多行程(multiprocessing)的方式,以一個行程專門收取 RTSP 串流影像,然後將收進來的影像透過佇列(queue)交給另外一個行程來進行顯示,這樣就可以改善整體程式的處理效能,比較不會發生程式處理速度跟不上影片速度的問題。

from multiprocessing import Process, Queue
import cv2

def image_display(taskqueue):
    # 建立視窗
    cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)

    while True:
        # 從工作佇列取得影像
        image = taskqueue.get()

        # 若沒有影像則終止迴圈
        if image is None: break

        # 顯示影像
        cv2.imshow('image_display', image)
        cv2.waitKey(10)

if __name__ == '__main__':

    # 開啟 RTSP 串流
    vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

    # 建立工作佇列
    taskqueue = Queue()

    # 建立並執行工作行程
    proc = Process(target=image_display, args=(taskqueue,))
    proc.start()

    while True:
        # 從 RTSP 串流讀取一張影像
        ret, image = vidCap.read()

        if ret:
            # 將影像放入工作佇列
            taskqueue.put(image)
        else:
            # 若沒有影像跳出迴圈
            break

    # 傳入 None 終止工作行程
    taskqueue.put(None)

    # 等待工作行程結束
    proc.join()

    # 釋放資源
    vidCap.release()

    # 關閉所有 OpenCV 視窗
    cv2.destroyAllWindows()

儲存 RTSP 串流影像

以下是使用 OpenCV 讀取 RTSP 串流影像之後,將其儲存成影片檔案的範例,在儲存影片時維持原影片的解析度與影格率,而編碼則採用 mp4v,靠著計算影格數的方式錄製 10 秒鐘的 RTSP 串流影像,儲存為 output.mp4

from multiprocessing import Process, Queue
import cv2

def image_save(taskqueue, width, height, fps):

    # 指定影片編碼
    #fourcc = cv2.VideoWriter_fourcc(*'XVID')
    #fourcc = cv2.VideoWriter_fourcc(*'H264')
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    # 建立 VideoWriter 物件
    writer = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

    while True:
        # 從工作佇列取得影像
        image = taskqueue.get()

        # 若沒有影像則終止迴圈
        if image is None: break

        # 儲存影像
        writer.write(image)

    # 釋放資源
    writer.release()

if __name__ == '__main__':

    # 開啟 RTSP 串流
    vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

    # 取得影像的尺寸大小
    width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # 取得影格率
    fps = vidCap.get(cv2.CAP_PROP_FPS)

    # 建立工作佇列
    taskqueue = Queue()

    # 建立並執行工作行程
    proc = Process(target=image_save, args=(taskqueue, width, height, fps))
    proc.start()

    # 計數器
    frame_counter = 0

    # 總錄製幀數(10 秒鐘)
    total_frames = fps * 10

    while frame_counter < total_frames:
        # 從 RTSP 串流讀取一張影像
        ret, image = vidCap.read()

        if ret:
            # 將影像放入工作佇列
            taskqueue.put(image)
            frame_counter += 1
        else:
            # 若沒有影像跳出迴圈
            break

    # 傳入 None 終止工作行程
    taskqueue.put(None)

    # 等待工作行程結束
    proc.join()

    # 釋放資源
    vidCap.release()

儲存長時間串流影片

若需要儲存長時間的 RTSP 串流影片,可以利用以下範例,將串流切成固定長度的影片,依照編號或時間戳記來命名儲存的影片檔案,方便後續處理。

from multiprocessing import Process, Queue
import cv2
from datetime import datetime

def image_save(taskqueue, width, height, fps, frames_per_file):

    # 指定影片編碼
    #fourcc = cv2.VideoWriter_fourcc(*'XVID')
    #fourcc = cv2.VideoWriter_fourcc(*'H264')
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    writer = None

    while True:
        # 從工作佇列取得影像
        image, frame_counter = taskqueue.get()

        # 若沒有影像則終止迴圈
        if image is None: break

        if frame_counter % frames_per_file == 0:

            if writer: writer.release()

            # 建立 VideoWriter 物件(以數字編號)
            # index = int(frame_counter // frames_per_file)
            # writer = cv2.VideoWriter(f'output-{index}.mp4', fourcc, fps, (width, height))

            # 建立 VideoWriter 物件(以時間命名)
            now = datetime.now()
            timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
            writer = cv2.VideoWriter(f'output-{timestamp}.mp4', fourcc, fps, (width, height))

        # 儲存影像
        writer.write(image)

    # 釋放資源
    writer.release()

if __name__ == '__main__':

    # 開啟 RTSP 串流
    vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')

    # 取得影像的尺寸大小
    width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # 取得影格率
    fps = vidCap.get(cv2.CAP_PROP_FPS)

    # 建立工作佇列
    taskqueue = Queue()

    # 計數器
    frame_counter = 0

    # 總錄製幀數(30 秒鐘)
    total_frames = fps * 30

    # 每個檔案的幀數(10 秒鐘)
    frames_per_file = fps * 10

    # 建立並執行工作行程
    proc = Process(target=image_save, args=(taskqueue, width, height, fps, frames_per_file))
    proc.start()

    while frame_counter < total_frames:
        # 從 RTSP 串流讀取一張影像
        ret, image = vidCap.read()

        if ret:
            # 將影像放入工作佇列
            taskqueue.put((image, frame_counter))
            frame_counter += 1
        else:
            # 若沒有影像跳出迴圈
            break

    # 傳入 None 終止工作行程
    taskqueue.put((None, None))

    # 等待工作行程結束
    proc.join()

    # 釋放資源
    vidCap.release()

這裡為了方便起見,檔案名稱用的時間戳記是放在工作行程中產生的,所以時間可能會有一些誤差,如果需要非常精確的時間,就要將時間戳記改為接收串流時同時取得時間。

錄製機器作動影片

這裡我的應用場景是將一台網路攝影機架設在工廠內,鏡頭對準特定的機台,然後我希望透過 RTSP 串流影片監看機台的狀況,當機台有動作時自動將作動過程的影片錄製下來,以利後續的 AI 分析,以下是我在開發過程中所用的基本架構範例。

from multiprocessing import Process, Queue
import cv2
from datetime import datetime
import numpy as np

# 開發模式
DEV_MODE = False

def printLogMsg(msg):
    now = datetime.now()
    timestamp = now.strftime("%Y/%m/%d %H:%M:%S")
    print("[{}] {}".format(timestamp, msg))

def image_save(taskqueue, width, height, fps):

    # 指定影片編碼
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    #fourcc = cv2.VideoWriter_fourcc(*'H264')
    #fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    writer = None

    while True:
        # 從工作佇列取得影像
        image, frameCounter = taskqueue.get()

        # 若沒有影像則終止迴圈
        if image is None: break

        if frameCounter == 0:

            if writer: writer.release()

            # 建立 VideoWriter 物件(以時間命名)
            now = datetime.now()
            timestamp = now.strftime("%Y%m%d-%H%M%S")
            writer = cv2.VideoWriter(f'test-output-{timestamp}.avi', fourcc, fps, (width, height))

        # 儲存影像
        writer.write(image)

    # 釋放資源
    if writer: writer.release()

def captureRTSP(src):
    # 開啟 RTSP 串流
    vidCap = cv2.VideoCapture(src)

    # 取得影像的尺寸大小
    imgWidth = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
    imgHeight = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    printLogMsg(f"Size: {imgWidth} x {imgHeight}")

    # 取得影格率
    fps = vidCap.get(cv2.CAP_PROP_FPS)
    printLogMsg(f"FPS: {fps}")

    # 建立工作佇列
    taskqueue = Queue()

    # 目標影像 FPS
    desired_fps = 15

    # 建立並執行工作行程
    proc = Process(target=image_save, args=(taskqueue, imgWidth, imgHeight, desired_fps))
    proc.start()

    avgImage = None
    avgImageFloat = None

    # 計數器
    frameCounter = 0

    while vidCap.isOpened():
        # 從 RTSP 串流讀取一張影像
        ret, image = vidCap.read()

        if ret:
            # 忽略黑白影像(紅外線)
            if np.array_equal(image[:,:,0], image[:,:,1]) and np.array_equal(image[:,:,0], image[:,:,2]):
                avgImage = None
                avgImageFloat = None
                if frameCounter:
                    printLogMsg(f"Stop recording (frame counter = {frameCounter})")
                frameCounter = 0
                continue

            # 取出中央部分影像,作為變動依據
            cropImage = image[(imgHeight//4):(imgHeight//4*3), (imgWidth//8*3):(imgWidth//8*5)]
            centerImage = cv2.resize(cropImage, (imgHeight//4, imgWidth//8))
            centerArea = imgHeight//4 * imgWidth//8

            # 初始化平均影像
            if avgImage is None:
                avgImage = cv2.blur(centerImage, (4, 4))
                avgImageFloat = np.float32(avgImage)
                printLogMsg("avgImage initialization")
                continue

            # 模糊處理
            blurImage = cv2.blur(centerImage, (4, 4))

            # 計算目前影格與平均影像的差異值
            diffImage = cv2.absdiff(avgImage, blurImage)

            # 將圖片轉為灰階
            grayImage = cv2.cvtColor(diffImage, cv2.COLOR_BGR2GRAY)

            # 篩選出變動程度大於門檻值的區域
            ret, binImage = cv2.threshold(grayImage, 5, 255, cv2.THRESH_BINARY)

            # 使用型態轉換函數去除雜訊
            kernel = np.ones((5, 5), np.uint8)
            binImage = cv2.morphologyEx(binImage, cv2.MORPH_OPEN, kernel, iterations=2)
            binImage = cv2.morphologyEx(binImage, cv2.MORPH_CLOSE, kernel, iterations=2)

            # 產生等高線
            countours, _ = cv2.findContours(binImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

            detected = False
            for c in countours:
                if cv2.contourArea(c) > centerArea / 5:
                    detected = True
                    if DEV_MODE:
                        # 計算等高線的外框範圍
                        (x, y, w, h) = cv2.boundingRect(c)

                        # 畫出外框
                        cv2.rectangle(centerImage, (x, y), (x + w, y + h), (0, 255, 0), 2)
                    else:
                        break

            if DEV_MODE:
                # 顯示偵測結果影像
                cv2.imshow('image', centerImage)
                cv2.imshow('avgImage', avgImage)
                cv2.imshow('blurImage', blurImage)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    # 傳入 None 終止工作行程
                    taskqueue.put((None, None))
                    break

            # 更新平均影像
            cv2.accumulateWeighted(blurImage, avgImageFloat, 0.1)
            avgImage = cv2.convertScaleAbs(avgImageFloat)

            # 將影像放入工作佇列
            if detected:
                if DEV_MODE == False:
                    taskqueue.put((image, frameCounter))
                if frameCounter == 0:
                    printLogMsg("Start recording")
                frameCounter += 1
            else:
                if frameCounter:
                    printLogMsg(f"Stop recording (frame counter = {frameCounter})")
                frameCounter = 0
                continue
        else:
            # 若沒有影像跳出迴圈
            printLogMsg("no frame")
            break

    # 傳入 None 終止工作行程
    taskqueue.put((None, None))

    # 等待工作行程結束
    proc.join()

    # 釋放資源
    vidCap.release()

    # 關閉所有 OpenCV 視窗
    cv2.destroyAllWindows()

if __name__ == '__main__':
    captureRTSP('rtsp://ipcam.stream:8554/bars')

在現場的網路攝影機是 24 小時全天運作的,但是機器作動的頻率很低,大部分的時間都是處於閒置狀態,而在機器閒置的時間,現場的燈也都是關閉的。正常開燈的時候網路攝影機使用一般的攝像頭,對應的影格率(FPS)為 15,但當網路攝影機遇到關燈的狀態時,會自動遷換為紅外線攝像頭,影像雖然還是 RGB 格式,但是色彩會轉為灰階,而此時對應的影格率(FPS)就會降為 10

由於機器只會在開燈的時候作動,所以我在程式中加上判斷影像色彩的條件,若整張影像的 RGB 三個 channels 都完全有相同的值,就判定為紅外線影像,將其直接忽略,減少不必要的計算。

當取得串流影像之後,我們將畫面中央的部分取出來,並降低解析度(節省計算量),參考 Python 與 OpenCV 實作移動偵測程式教學,偵測影像中央是否有變動,當出現影像大幅度變動時,就將串流影片儲存下來。

另外為了方便開發與測試,我們靠著 DEV_MODE 變數設定開發模式,當程式處於開發模式的時候,會開啟監看視窗,顯示幾種關鍵的影像內容,同時開發模式也不會儲存任何影像。

在儲存影片檔案時,會自動將獨立的影片片段分開儲存,檔案名稱則自動以開始錄製的時間來命名,影片的解析度、影格率都維持跟來源影片相同。

參考資料

Share
Published by
Office Guide
Tags: OpenCV

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago