介紹如何在 Python 中使用 OpenCV 搭配多行程(multiprocessing)的方式開啟、顯示、處理與儲存 RTSP 串流影片。
測試用 RTSP 串流
網路上有一些測試用的 RTSP 串流伺服器,可用於程式的開發與測試:
安裝 OpenCV
安裝 Python 的 OpenCV 模組:
# 安裝 Python 的 OpenCV 模組
pip3 install opencv-python
開啟與顯示 RTSP 串流影像
OpenCV 中的 VideoCapture()
函數可以用來開啟各種不同的影片來源,包含影片檔案、網路攝影機、RTSP 串流等,以下是使用 OpenCV 開啟 RTSP 串流影片的範例。
import cv2 if __name__ == '__main__': # 開啟 RTSP 串流 vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars') # 建立視窗 cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE) while True: # 從 RTSP 串流讀取一張影像 ret, image = vidCap.read() if ret: # 顯示影像 cv2.imshow('image_display', image) cv2.waitKey(10) else: # 若沒有影像跳出迴圈 break # 釋放資源 vidCap.release() # 關閉所有 OpenCV 視窗 cv2.destroyAllWindows()
上面這段 Python 指令稿執行之後,就會開啟一個 OpenCV 的視窗,顯示 RTSP 串流的影像:
多行程版本
若在 CPU 處理速度比較慢的機器上,或是遇到影格率(frame rate)較高的影片,可能會出現單一執行緒無法及時處理的問題,這時候就可以考慮使用多行程(multiprocessing)的方式,以一個行程專門收取 RTSP 串流影像,然後將收進來的影像透過佇列(queue)交給另外一個行程來進行顯示,這樣就可以改善整體程式的處理效能,比較不會發生程式處理速度跟不上影片速度的問題。
from multiprocessing import Process, Queue import cv2 def image_display(taskqueue): # 建立視窗 cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE) while True: # 從工作佇列取得影像 image = taskqueue.get() # 若沒有影像則終止迴圈 if image is None: break # 顯示影像 cv2.imshow('image_display', image) cv2.waitKey(10) if __name__ == '__main__': # 開啟 RTSP 串流 vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars') # 建立工作佇列 taskqueue = Queue() # 建立並執行工作行程 proc = Process(target=image_display, args=(taskqueue,)) proc.start() while True: # 從 RTSP 串流讀取一張影像 ret, image = vidCap.read() if ret: # 將影像放入工作佇列 taskqueue.put(image) else: # 若沒有影像跳出迴圈 break # 傳入 None 終止工作行程 taskqueue.put(None) # 等待工作行程結束 proc.join() # 釋放資源 vidCap.release() # 關閉所有 OpenCV 視窗 cv2.destroyAllWindows()
儲存 RTSP 串流影像
以下是使用 OpenCV 讀取 RTSP 串流影像之後,將其儲存成影片檔案的範例,在儲存影片時維持原影片的解析度與影格率,而編碼則採用 mp4v
,靠著計算影格數的方式錄製 10 秒鐘的 RTSP 串流影像,儲存為 output.mp4
:
from multiprocessing import Process, Queue import cv2 def image_save(taskqueue, width, height, fps): # 指定影片編碼 #fourcc = cv2.VideoWriter_fourcc(*'XVID') #fourcc = cv2.VideoWriter_fourcc(*'H264') fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 建立 VideoWriter 物件 writer = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height)) while True: # 從工作佇列取得影像 image = taskqueue.get() # 若沒有影像則終止迴圈 if image is None: break # 儲存影像 writer.write(image) # 釋放資源 writer.release() if __name__ == '__main__': # 開啟 RTSP 串流 vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars') # 取得影像的尺寸大小 width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 取得影格率 fps = vidCap.get(cv2.CAP_PROP_FPS) # 建立工作佇列 taskqueue = Queue() # 建立並執行工作行程 proc = Process(target=image_save, args=(taskqueue, width, height, fps)) proc.start() # 計數器 frame_counter = 0 # 總錄製幀數(10 秒鐘) total_frames = fps * 10 while frame_counter < total_frames: # 從 RTSP 串流讀取一張影像 ret, image = vidCap.read() if ret: # 將影像放入工作佇列 taskqueue.put(image) frame_counter += 1 else: # 若沒有影像跳出迴圈 break # 傳入 None 終止工作行程 taskqueue.put(None) # 等待工作行程結束 proc.join() # 釋放資源 vidCap.release()
儲存長時間串流影片
若需要儲存長時間的 RTSP 串流影片,可以利用以下範例,將串流切成固定長度的影片,依照編號或時間戳記來命名儲存的影片檔案,方便後續處理。
from multiprocessing import Process, Queue import cv2 from datetime import datetime def image_save(taskqueue, width, height, fps, frames_per_file): # 指定影片編碼 #fourcc = cv2.VideoWriter_fourcc(*'XVID') #fourcc = cv2.VideoWriter_fourcc(*'H264') fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = None while True: # 從工作佇列取得影像 image, frame_counter = taskqueue.get() # 若沒有影像則終止迴圈 if image is None: break if frame_counter % frames_per_file == 0: if writer: writer.release() # 建立 VideoWriter 物件(以數字編號) # index = int(frame_counter // frames_per_file) # writer = cv2.VideoWriter(f'output-{index}.mp4', fourcc, fps, (width, height)) # 建立 VideoWriter 物件(以時間命名) now = datetime.now() timestamp = now.strftime("%Y-%m-%d-%H-%M-%S") writer = cv2.VideoWriter(f'output-{timestamp}.mp4', fourcc, fps, (width, height)) # 儲存影像 writer.write(image) # 釋放資源 writer.release() if __name__ == '__main__': # 開啟 RTSP 串流 vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars') # 取得影像的尺寸大小 width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 取得影格率 fps = vidCap.get(cv2.CAP_PROP_FPS) # 建立工作佇列 taskqueue = Queue() # 計數器 frame_counter = 0 # 總錄製幀數(30 秒鐘) total_frames = fps * 30 # 每個檔案的幀數(10 秒鐘) frames_per_file = fps * 10 # 建立並執行工作行程 proc = Process(target=image_save, args=(taskqueue, width, height, fps, frames_per_file)) proc.start() while frame_counter < total_frames: # 從 RTSP 串流讀取一張影像 ret, image = vidCap.read() if ret: # 將影像放入工作佇列 taskqueue.put((image, frame_counter)) frame_counter += 1 else: # 若沒有影像跳出迴圈 break # 傳入 None 終止工作行程 taskqueue.put((None, None)) # 等待工作行程結束 proc.join() # 釋放資源 vidCap.release()
這裡為了方便起見,檔案名稱用的時間戳記是放在工作行程中產生的,所以時間可能會有一些誤差,如果需要非常精確的時間,就要將時間戳記改為接收串流時同時取得時間。
錄製機器作動影片
這裡我的應用場景是將一台網路攝影機架設在工廠內,鏡頭對準特定的機台,然後我希望透過 RTSP 串流影片監看機台的狀況,當機台有動作時自動將作動過程的影片錄製下來,以利後續的 AI 分析,以下是我在開發過程中所用的基本架構範例。
from multiprocessing import Process, Queue import cv2 from datetime import datetime import numpy as np # 開發模式 DEV_MODE = False def printLogMsg(msg): now = datetime.now() timestamp = now.strftime("%Y/%m/%d %H:%M:%S") print("[{}] {}".format(timestamp, msg)) def image_save(taskqueue, width, height, fps): # 指定影片編碼 fourcc = cv2.VideoWriter_fourcc(*'XVID') #fourcc = cv2.VideoWriter_fourcc(*'H264') #fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = None while True: # 從工作佇列取得影像 image, frameCounter = taskqueue.get() # 若沒有影像則終止迴圈 if image is None: break if frameCounter == 0: if writer: writer.release() # 建立 VideoWriter 物件(以時間命名) now = datetime.now() timestamp = now.strftime("%Y%m%d-%H%M%S") writer = cv2.VideoWriter(f'test-output-{timestamp}.avi', fourcc, fps, (width, height)) # 儲存影像 writer.write(image) # 釋放資源 if writer: writer.release() def captureRTSP(src): # 開啟 RTSP 串流 vidCap = cv2.VideoCapture(src) # 取得影像的尺寸大小 imgWidth = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH)) imgHeight = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT)) printLogMsg(f"Size: {imgWidth} x {imgHeight}") # 取得影格率 fps = vidCap.get(cv2.CAP_PROP_FPS) printLogMsg(f"FPS: {fps}") # 建立工作佇列 taskqueue = Queue() # 目標影像 FPS desired_fps = 15 # 建立並執行工作行程 proc = Process(target=image_save, args=(taskqueue, imgWidth, imgHeight, desired_fps)) proc.start() avgImage = None avgImageFloat = None # 計數器 frameCounter = 0 while vidCap.isOpened(): # 從 RTSP 串流讀取一張影像 ret, image = vidCap.read() if ret: # 忽略黑白影像(紅外線) if np.array_equal(image[:,:,0], image[:,:,1]) and np.array_equal(image[:,:,0], image[:,:,2]): avgImage = None avgImageFloat = None if frameCounter: printLogMsg(f"Stop recording (frame counter = {frameCounter})") frameCounter = 0 continue # 取出中央部分影像,作為變動依據 cropImage = image[(imgHeight//4):(imgHeight//4*3), (imgWidth//8*3):(imgWidth//8*5)] centerImage = cv2.resize(cropImage, (imgHeight//4, imgWidth//8)) centerArea = imgHeight//4 * imgWidth//8 # 初始化平均影像 if avgImage is None: avgImage = cv2.blur(centerImage, (4, 4)) avgImageFloat = np.float32(avgImage) printLogMsg("avgImage initialization") continue # 模糊處理 blurImage = cv2.blur(centerImage, (4, 4)) # 計算目前影格與平均影像的差異值 diffImage = cv2.absdiff(avgImage, blurImage) # 將圖片轉為灰階 grayImage = cv2.cvtColor(diffImage, cv2.COLOR_BGR2GRAY) # 篩選出變動程度大於門檻值的區域 ret, binImage = cv2.threshold(grayImage, 5, 255, cv2.THRESH_BINARY) # 使用型態轉換函數去除雜訊 kernel = np.ones((5, 5), np.uint8) binImage = cv2.morphologyEx(binImage, cv2.MORPH_OPEN, kernel, iterations=2) binImage = cv2.morphologyEx(binImage, cv2.MORPH_CLOSE, kernel, iterations=2) # 產生等高線 countours, _ = cv2.findContours(binImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detected = False for c in countours: if cv2.contourArea(c) > centerArea / 5: detected = True if DEV_MODE: # 計算等高線的外框範圍 (x, y, w, h) = cv2.boundingRect(c) # 畫出外框 cv2.rectangle(centerImage, (x, y), (x + w, y + h), (0, 255, 0), 2) else: break if DEV_MODE: # 顯示偵測結果影像 cv2.imshow('image', centerImage) cv2.imshow('avgImage', avgImage) cv2.imshow('blurImage', blurImage) if cv2.waitKey(1) & 0xFF == ord('q'): # 傳入 None 終止工作行程 taskqueue.put((None, None)) break # 更新平均影像 cv2.accumulateWeighted(blurImage, avgImageFloat, 0.1) avgImage = cv2.convertScaleAbs(avgImageFloat) # 將影像放入工作佇列 if detected: if DEV_MODE == False: taskqueue.put((image, frameCounter)) if frameCounter == 0: printLogMsg("Start recording") frameCounter += 1 else: if frameCounter: printLogMsg(f"Stop recording (frame counter = {frameCounter})") frameCounter = 0 continue else: # 若沒有影像跳出迴圈 printLogMsg("no frame") break # 傳入 None 終止工作行程 taskqueue.put((None, None)) # 等待工作行程結束 proc.join() # 釋放資源 vidCap.release() # 關閉所有 OpenCV 視窗 cv2.destroyAllWindows() if __name__ == '__main__': captureRTSP('rtsp://ipcam.stream:8554/bars')
在現場的網路攝影機是 24 小時全天運作的,但是機器作動的頻率很低,大部分的時間都是處於閒置狀態,而在機器閒置的時間,現場的燈也都是關閉的。正常開燈的時候網路攝影機使用一般的攝像頭,對應的影格率(FPS)為 15
,但當網路攝影機遇到關燈的狀態時,會自動遷換為紅外線攝像頭,影像雖然還是 RGB 格式,但是色彩會轉為灰階,而此時對應的影格率(FPS)就會降為 10
。
由於機器只會在開燈的時候作動,所以我在程式中加上判斷影像色彩的條件,若整張影像的 RGB 三個 channels 都完全有相同的值,就判定為紅外線影像,將其直接忽略,減少不必要的計算。
當取得串流影像之後,我們將畫面中央的部分取出來,並降低解析度(節省計算量),參考 Python 與 OpenCV 實作移動偵測程式教學,偵測影像中央是否有變動,當出現影像大幅度變動時,就將串流影片儲存下來。
另外為了方便開發與測試,我們靠著 DEV_MODE
變數設定開發模式,當程式處於開發模式的時候,會開啟監看視窗,顯示幾種關鍵的影像內容,同時開發模式也不會儲存任何影像。
在儲存影片檔案時,會自動將獨立的影片片段分開儲存,檔案名稱則自動以開始錄製的時間來命名,影片的解析度、影格率都維持跟來源影片相同。