以下作品由安信可社區(qū)用戶WT_0213制作
通過小安Moss+AiPi-PalChatV1+AiPi-BW21+機器視覺項目,讓家居更加智能,可玩性更高!更有樂趣!
先上視頻看看效果:
【電子DIY作品】小安Moss+AiPi-PalChatV1+AiPi-BW21+機器視覺_嗶哩嗶哩_bilibili

一、硬件
選用AiPi-PalChatV1 + AiPi-BW21 / AiPi-Cam-D200,由于上期做的基于BW21-CBV-Kit火災隱患警報器剛好符合條件且功能未完全開發(fā)出來,所以這次選擇AiPi-PalChatV1 + AiPi-BW21組合來做這個項目。
二、背景
最近刷B站看到流浪地球的Moss,感覺非常帥,而且B站也有很多使用小智實現(xiàn)的Moss。
看到這筆者也想要一個Moss了,由于當前技術(shù)有限,無法實現(xiàn)完整的類似AiPi-PalChatV1的功能,所以借助AiPi-PalChatV1實現(xiàn)語音功能,通過小智MCP功能做視覺識別。

三、設備

還記得它嗎?
是的,這次主角還是它,是不是和Moss有那么一丟丟像?

●BW21-CBV-Kit:可以尋找物品,對當前環(huán)境進行識別分析。
●硬件利用 AiPi-PalChatV1 + AiPi-BW21 組合,實現(xiàn)為AiPi-PalChatV1添加視覺系統(tǒng):可以識別當前環(huán)境信息,例如:房間環(huán)境,物品位置,陳設等等。視覺模型支持的它都可以實現(xiàn)。
由于AiPi-BW21的rtsp視頻流有一定延遲,所以檢測靜態(tài)環(huán)境或?qū)嵤┞什桓叩牡胤绞褂煤芊奖?;也可以將AiPi-BW21替換為小安派-Cam-D200,提供rtsp視頻流就可以。
●智譜glm-4v-plus-0111 視覺模型:支持base64的圖像,壞處是它收費,好在費用不高。另外一個是glm-4v-flash模型,好處是免費,壞處是不支持base64圖像,必須將圖片上傳到服務器,然后將url給大模型。(各有利弊,自己取舍使用的模型可以根據(jù)自己的需求作調(diào)整。很多免費的模型。)
#include #include #include #include "RTSP.h" #include "StreamIO.h" #include "VideoStream.h" #include "VideoStreamOverlay.h" RTSP rtsp; IPAddress ip; int rtsp_portnum; StreamIO videoStreamer(1, 1); VideoSetting config(VIDEO_FHD, 30, VIDEO_H264, 0); #define CHANNEL 0 // 定義紅外模塊引腳 const int infraredPin = 20; // 定義MQ - 2煙霧模塊引腳 const int mq2Pin = A0; // 定義蜂鳴器引腳 const int buzzerPin = 8; // 定義煙霧傳感器閾值 const int smokeThreshold = 500; char ssid[] = "SSID"; // your network SSID (name) char pass[] = "PASSWORD"; // your network password int status = WL_IDLE_STATUS; // Indicator of Wifi status char mqttServer[] = "192.168.50.19"; // broker.mqttgo.io char clientId[] = "alerm"; char publishTopicMsg[] = "homeassistant/alermMsg"; char publishTopicImg[] = "homeassistant/alermImg"; char publishPayload[] = "alarm device"; char subscribeTopic[] = "homeassistant/alermMsg"; void callback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (unsigned int i = 0; i < length; i++) { Serial.print((char)(payload[i])); } Serial.println(); } WiFiClient wifiClient; PubSubClient client(wifiClient); void reconnect() { // Loop until we're reconnected while (!(client.connected())) { Serial.print("rnAttempting MQTT connection..."); // Attempt to connect if (client.connect(clientId)) { Serial.println("connected"); // Once connected, publish an announcement and resubscribe client.publish(publishTopicMsg, publishPayload); client.subscribe(subscribeTopic); } else { Serial.println("failed, rc="); Serial.print(client.state()); Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying delay(5000); } } } void play() { for(int note = 0; note < 3; note++){ // 升調(diào)(200Hz→800Hz) for(int i=600; i<=800; i++) { tone(buzzerPin, i); delay(5); } // 降調(diào)(800Hz→200Hz) for(int i=800; i?>=600; i--) { tone(buzzerPin, i); delay(5); } } noTone(buzzerPin); } void setup() { Serial.begin(115200); // 將紅外引腳設置為輸入模式 pinMode(infraredPin, INPUT); // 將蜂鳴器引腳設置為輸出模式 // pinMode(buzzerPin, OUTPUT); // 初始化蜂鳴器為關(guān)閉狀態(tài) digitalWrite(buzzerPin, LOW); // wait for serial port to connect. while (!Serial) { ; } // Attempt to connect to WiFi network while (status != WL_CONNECTED) { Serial.print("rnAttempting to connect to SSID: "); Serial.println(ssid); // Connect to WPA/WPA2 network. Change this line if using open or WEP network: status = WiFi.begin(ssid, pass); // wait 10 seconds for connection: delay(10000); } ip = WiFi.localIP(); wifiClient.setNonBlockingMode(); // 這里需要注意一下,如果沒有MQTT服務需要注釋 client.setServer(mqttServer, 1883); client.setCallback(callback); delay(1500); if (!(client.connected())) { reconnect(); } // 這里需要注意一下,如果沒有MQTT服務需要注釋 // config.setBitrate(2 * 1024 * 1024); // Re Camera.configVideoChannel(CHANNEL, config); Camera.videoInit(); // Configure RTSP with corresponding video format information rtsp.configVideo(config); rtsp.begin(); rtsp_portnum = rtsp.getPort(); // Configure StreamIO object to stream data from video channel to RTSP videoStreamer.registerInput(Camera.getStream(CHANNEL)); videoStreamer.registerOutput(rtsp); if (videoStreamer.begin() != 0) { Serial.println("StreamIO link start failed"); } Camera.channelBegin(CHANNEL); Camera.printInfo(); // Start OSD drawing on RTSP video channel OSD.configVideo(CHANNEL, config); OSD.begin(); delay(5000); } void loop() { // 讀取紅外模塊狀態(tài) int infraredValue = digitalRead(infraredPin); // 讀取MQ - 2煙霧模塊模擬值 int mq2Value = analogRead(mq2Pin); // 打印傳感器數(shù)值 Serial.print("Infrared: "); Serial.print(infraredValue); Serial.print(", Smoke: "); Serial.println(mq2Value); JsonDocument doc; doc["fire"] = infraredValue; doc["mq2"] = mq2Value; char json_string[256]; serializeJson(doc, json_string); Serial.print("Publishing: "); Serial.println(json_string); // 這里需要注意一下,如果沒有MQTT服務需要注釋 client.publish(publishTopicMsg, json_string); // 這里需要注意一下,如果沒有MQTT服務需要注釋 // 判斷是否觸發(fā)報警條件 if (infraredValue == LOW && mq2Value > smokeThreshold) { // 觸發(fā)報警,打開蜂鳴器 // digitalWrite(buzzerPin, HIGH); Serial.println("Alarm triggered!"); // 短暫延遲,避免頻繁讀取 play(); delay(4500); } // client.loop(); // 短暫延遲,避免頻繁讀取 delay(500); }
!?。?strong>沒有MQTT服務,需要將MQTT相關(guān)代碼注釋掉才行!??!
以上代碼已經(jīng)實現(xiàn)的rtsp功能,獲取到對應的rtsp地址就可以了。
可以參考:
【教程】小安派BW21-CBV-Kit——RTSP音頻推流
獲取rtsp地址,* 由于 RTSP 被用作串流協(xié)議,輸入 “rtsp://{IPaddress}:{port}”' 作為網(wǎng)絡 URL,將 {IPaddress} 替換為 BW21-CBV-Kit 的 IP 地址。
AiPi-PalChatV2 好像還支持攝像頭,用AiPi-PalChatV2實現(xiàn)可能會更加小巧,集成度更高。
四、準備工作
拉取代碼
拉取MCP代碼
git clone https://gitee.com/lazy-ai/xiaozi-vision-mcp.git
拉取代碼后,可以使用VSCode打開目錄結(jié)構(gòu)為:

MCP 主要代碼
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ RTSP視頻流接收器 該模塊提供了一個用于接收和處理RTSP視頻流的類 """ import cv2 import numpy as np import threading import time import logging from typing import Optional, Tuple, Callable, Union, List, Dict, Any # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('RTSPReceiver') class RTSPReceiver: """ RTSP視頻流接收器類 該類用于連接到RTSP視頻流,讀取視頻幀,并提供各種控制和處理功能。 屬性: rtsp_url (str): RTSP流的URL buffer_size (int): 幀緩沖區(qū)大小 reconnect_attempts (int): 連接斷開時的重連嘗試次數(shù) reconnect_delay (float): 重連嘗試之間的延遲(秒) """ def __init__(self, rtsp_url: str, buffer_size: int = 10, reconnect_attempts: int = 5, reconnect_delay: float = 2.0): """ 初始化RTSP接收器 參數(shù): rtsp_url (str): RTSP流的URL buffer_size (int, 可選): 幀緩沖區(qū)大小,默認為10 reconnect_attempts (int, 可選): 連接斷開時的重連嘗試次數(shù),默認為5 reconnect_delay (float, 可選): 重連嘗試之間的延遲(秒),默認為2.0 """ self.rtsp_url = rtsp_url self.buffer_size = buffer_size self.reconnect_attempts = reconnect_attempts self.reconnect_delay = reconnect_delay # 內(nèi)部屬性 self._cap = None # OpenCV VideoCapture對象 self._is_running = False # 指示接收器是否正在運行 self._is_paused = False # 指示接收器是否暫停 self._frame_buffer = [] # 幀緩沖區(qū) self._current_frame = None # 當前幀 self._frame_count = 0 # 接收的幀計數(shù) self._last_frame_time = 0 # 上一幀的時間戳 self._fps = 0 # 當前幀率 self._lock = threading.Lock() # 用于線程安全操作的鎖 self._thread = None # 視頻接收線程 self._callbacks = [] # 幀處理回調(diào)函數(shù)列表 self._connection_status = False # 連接狀態(tài) self._last_error = None # 最后一個錯誤 def connect(self) -> bool: """ 連接到RTSP流 返回: bool: 連接成功返回True,否則返回False """ try: logger.info(f"正在連接到RTSP流: {self.rtsp_url}") # 設置OpenCV的RTSP相關(guān)參數(shù) self._cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) # 設置緩沖區(qū)大小 self._cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size) # 檢查連接是否成功 if not self._cap.isOpened(): logger.error("無法連接到RTSP流") self._connection_status = False return False # 獲取視頻流信息 self._width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self._height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self._fps = self._cap.get(cv2.CAP_PROP_FPS) logger.info(f"成功連接到RTSP流,分辨率: {self._width}x{self._height}, FPS: {self._fps}") self._connection_status = True return True except Exception as e: logger.error(f"連接RTSP流時發(fā)生錯誤: {str(e)}") self._last_error = str(e) self._connection_status = False return False def disconnect(self) -> None: """ 斷開與RTSP流的連接 """ self.stop() if self._cap is not None: self._cap.release() self._cap = None self._connection_status = False logger.info("已斷開與RTSP流的連接") def start(self) -> bool: """ 開始接收視頻流 返回: bool: 成功啟動返回True,否則返回False """ if self._is_running: logger.warning("接收器已經(jīng)在運行") return True if not self._connection_status: success = self.connect() if not success: return False self._is_running = True self._is_paused = False self._thread = threading.Thread(target=self._receive_frames, daemon=True) self._thread.start() logger.info("開始接收視頻流") return True def stop(self) -> None: """ 停止接收視頻流 """ self._is_running = False if self._thread is not None and self._thread.is_alive(): self._thread.join(timeout=1.0) logger.info("停止接收視頻流") def pause(self) -> None: """ 暫停接收視頻流 """ self._is_paused = True logger.info("暫停接收視頻流") def resume(self) -> None: """ 恢復接收視頻流 """ self._is_paused = False logger.info("恢復接收視頻流") def is_connected(self) -> bool: """ 檢查是否已連接到RTSP流 返回: bool: 已連接返回True,否則返回False """ return self._connection_status def is_running(self) -> bool: """ 檢查接收器是否正在運行 返回: bool: 正在運行返回True,否則返回False """ return self._is_running def is_paused(self) -> bool: """ 檢查接收器是否已暫停 返回: bool: 已暫停返回True,否則返回False """ return self._is_paused def get_current_frame(self) -> Optional[np.ndarray]: """ 獲取當前幀 返回: Optional[np.ndarray]: 當前幀,如果沒有可用幀則返回None """ with self._lock: return self._current_frame.copy() if self._current_frame is not None else None def get_frame_info(self) -> Dict[str, Any]: """ 獲取幀信息 返回: Dict[str, Any]: 包含幀信息的字典 """ return { 'width': self._width if hasattr(self, '_width') else None, 'height': self._height if hasattr(self, '_height') else None, 'fps': self._fps, 'frame_count': self._frame_count, 'is_running': self._is_running, 'is_paused': self._is_paused, 'connection_status': self._connection_status, 'last_error': self._last_error } def add_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: """ 添加幀處理回調(diào)函數(shù) 參數(shù): callback (Callable[[np.ndarray], None]): 接收幀作為參數(shù)的回調(diào)函數(shù) """ self._callbacks.append(callback) logger.info(f"添加了幀處理回調(diào)函數(shù),當前回調(diào)函數(shù)數(shù)量: {len(self._callbacks)}") def remove_frame_callback(self, callback: Callable[[np.ndarray], None]) -> bool: """ 移除幀處理回調(diào)函數(shù) 參數(shù): callback (Callable[[np.ndarray], None]): 要移除的回調(diào)函數(shù) 返回: bool: 成功移除返回True,否則返回False """ if callback in self._callbacks: self._callbacks.remove(callback) logger.info(f"移除了幀處理回調(diào)函數(shù),當前回調(diào)函數(shù)數(shù)量: {len(self._callbacks)}") return True return False def save_frame(self, filename: str, frame: Optional[np.ndarray] = None) -> bool: """ 保存幀為圖像文件 參數(shù): filename (str): 文件名 frame (Optional[np.ndarray], 可選): 要保存的幀,默認為當前幀 返回: bool: 成功保存返回True,否則返回False """ try: if frame is None: frame = self.get_current_frame() if frame is None: logger.error("沒有可用的幀可保存") return False cv2.imwrite(filename, frame) logger.info(f"幀已保存到: {filename}") return True except Exception as e: logger.error(f"保存幀時發(fā)生錯誤: {str(e)}") self._last_error = str(e) return False def _receive_frames(self) -> None: """ 接收幀的內(nèi)部方法(在單獨的線程中運行) """ reconnect_count = 0 while self._is_running: try: # 如果暫停,則等待 if self._is_paused: time.sleep(0.1) continue # 檢查連接狀態(tài) if not self._connection_status or self._cap is None: if reconnect_count < self.reconnect_attempts: logger.info(f"嘗試重新連接 ({reconnect_count + 1}/{self.reconnect_attempts})") success = self.connect() if success: reconnect_count = 0 else: reconnect_count += 1 time.sleep(self.reconnect_delay) continue else: logger.error(f"重連失敗,已達到最大嘗試次數(shù): {self.reconnect_attempts}") self._is_running = False break # 讀取幀 ret, frame = self._cap.read() # 計算當前幀率 current_time = time.time() if self._last_frame_time > 0: time_diff = current_time - self._last_frame_time if time_diff > 0: self._fps = 0.8 * self._fps + 0.2 * (1.0 / time_diff) # 平滑幀率 self._last_frame_time = current_time if not ret: logger.warning("無法讀取幀,可能是流結(jié)束或連接問題") self._connection_status = False continue # 更新當前幀和幀計數(shù) with self._lock: self._current_frame = frame self._frame_count += 1 # 更新幀緩沖區(qū) if len(self._frame_buffer) >= self.buffer_size: self._frame_buffer.pop(0) self._frame_buffer.append(frame) # 處理回調(diào)函數(shù) for callback in self._callbacks: try: callback(frame.copy()) except Exception as e: logger.error(f"執(zhí)行幀回調(diào)函數(shù)時發(fā)生錯誤: {str(e)}") except Exception as e: logger.error(f"接收幀時發(fā)生錯誤: {str(e)}") self._last_error = str(e) self._connection_status = False time.sleep(0.1) # 避免在錯誤情況下的快速循環(huán) def __enter__(self): """ 上下文管理器入口 """ self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """ 上下文管理器出口 """ self.disconnect() def __del__(self): """ 析構(gòu)函數(shù) """ self.disconnect() # 示例用法 if __name__ == "__main__": # RTSP流URL示例 rtsp_url = "rtsp://stream.strba.sk:1935/strba/VYHLAD_JAZERO.stream" # 創(chuàng)建接收器實例 receiver = RTSPReceiver(rtsp_url) try: # 連接并開始接收 if receiver.connect(): receiver.start() # 定義一個簡單的幀處理回調(diào)函數(shù) def process_frame(frame): # 在這里可以添加自定義的幀處理邏輯 # 例如:檢測、識別、轉(zhuǎn)換等 pass # 添加回調(diào)函數(shù) receiver.add_frame_callback(process_frame) # 顯示視頻流 window_name = "RTSP Stream" cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) print("按 'q' 鍵退出") try: while True: frame = receiver.get_current_frame() if frame is not None: cv2.imshow(window_name, frame) # 檢查鍵盤輸入 key = cv2.waitKey(1) & 0xFF if key == ord('q'): break elif key == ord('s'): # 按's'鍵保存當前幀 receiver.save_frame(f"frame_{receiver._frame_count}.jpg") elif key == ord('p'): # 按'p'鍵暫停/恢復 if receiver.is_paused(): receiver.resume() else: receiver.pause() finally: cv2.destroyAllWindows() else: print("無法連接到RTSP流") finally: # 確保資源被正確釋放 receiver.disconnect()
測試rtsp可以在rtsp目錄下執(zhí)行:
python rtsp_reiver.py
效果如圖:

rtsp視頻流用的網(wǎng)上的一個地址:
rtsp://stream.strba.sk:1935/strba/VYHLAD_JAZERO.stream
五、注冊智譜
創(chuàng)建API_KEY。這里可以通過筆者專屬邀請鏈接注冊即可獲得額外GLM-4-Air 2000萬Tokens好友專屬福利,鏈接:智譜AI開放平臺
1、登錄智譜

2、控制

添加新的API Key

填寫API key名稱,確定后創(chuàng)建

創(chuàng)建成功后會在列表中展示出來,點擊“復制”。
3、附加(非必要,但建議)
實名認證,贈送免費資源。

進入個人中心,點擊“認證”。

個人實名認證。

填寫實名信息。

支付寶掃碼,進行人臉認證。

認證完成后,點擊“已完成刷臉認證”。

這時會發(fā)現(xiàn),多了500萬的免費tokens,還是很棒的。
?。。?注意?。?!筆者就是沒有領(lǐng)取免費的資源包,直接調(diào)用付費模型,被扣費了。

智譜客服確認了下問題不大,并且費用也不高。






問答就是產(chǎn)生的欠費可以不用在意,也不用補繳。如果用到余額就需要交,并且欠費金額有上限,不用害怕無限欠費,或者欠費過多問題,欠費到上限后調(diào)用接口會報錯。
六、小智MCP接入點
打開 小智 AI 聊天機器人。

點擊控制臺,登錄。

點擊配置角色,拉到屏幕最下方。

右下角MCP接入點。

復制接入點地址即可,也可以參考:
安信可AiPi-PalChatV1 + MCP通過HomeAssistant自動化控制設備
七、配置
修改配置文件。

填好執(zhí)行
python mcp_pipe.py mcp_moss.py

現(xiàn)實如上信息,表示MCP節(jié)點已經(jīng)啟動完成。
RTSP視頻流:

使用小智PC客戶端執(zhí)行結(jié)果,效果與AiPi-PalChatV1 是一致的。

MCP調(diào)用結(jié)果示例:

小智智能體記憶:

審核編輯 黃宇
-
機器人
+關(guān)注
關(guān)注
213文章
30309瀏覽量
218437 -
AI
+關(guān)注
關(guān)注
88文章
37213瀏覽量
291961 -
開發(fā)板
+關(guān)注
關(guān)注
25文章
6027瀏覽量
110726
發(fā)布評論請先 登錄

【小智AI語音開發(fā)板】做個自己的Moss機器人?
評論