馴服佇列:如何管理資料管道

有許多數據驅動的應用程式需要在線處理數據以及存儲原始數據。例如推薦引擎、物聯網處理器、事件驅動服務等。如果您曾經需要建立資料處理管道或工作流程,您可能熟悉處理多種資料類型的挑戰,同時考慮到在系統生命週期中出現新資料類型的可能性。 

在本文中,我將討論我為最近一個使用 AWS DocumentDB、MSK 和 Lambda 函數的專案提出的解決方案,並將提供有關部署簡單管道的說明以及有用的 Go 程式碼片段。

什麼是數據管道?

關於資料管道的快速說明:它們允許您處理、管理和儲存資料。就像管道可以幫助您自動管理水流一樣,資料管道也是資訊流動的管道。您建立的這些管道越好,您的企業技術就能更好地支援您的所有業務流程。

資料處理管道的設計

在建立資料處理管道時,通常需要兩個元件:資料傳輸和處理區塊。處理區塊是按照給定序列應用於輸入資料的操作,該序列可能會根據資料類型而變化,而資料傳輸是資料在各個處理區塊之間移動的方式。您可以將其視為一條生產線,其中傳送帶在各個工位之間移動不同階段的產品,直到最終產品從生產線末端出來。 

傳送帶是我們的資料傳輸,工作站是處理區塊。通常,您還需要一種儲存原始資料的方法,因為您永遠不知道以後可能想用它做什麼,或者某些錯誤是否會迫使您再次通過管道運行資料。我的設計要求很簡單:

  1. 建立完全託管的系統
  2. 保留所有原始資料的副本
  3. 近乎即時且低延遲地處理數據
  4. 考慮隨著時間的推移新資料類型出現的可能性,而無需系統停機

在研究了幾個選項之後,我為我的資料管道選擇了以下設計:

架構圖

讓我們先回顧一下我們將使用的建置區塊,以了解它們是什麼以及如何使用它們:AWS DocumentDB 是 Amazon基於 MongoDB 3.6 或 4.0的託管MongoDB服務。因此,它可以儲存、查詢和索引 JSON 和 BSON 資料。   

MongoDB 是一個可用來源的 NoSQL JSON 資料庫,它使用JavaScript作為其查詢語言的基礎,因此允許您在伺服器端執行 JavaScript 函數。在 MongoDB 部署中,可以定義多個資料庫,每個資料庫中有多個集合。 Mongo 3.6 中引入的一個實用功能是Change Streams。更改流允許應用程式透過註冊特定集合上的事件來存取即時資料變更。    

事件通知可以配置為包括增量或完整文件並捕獲“插入”、“更改”和“刪除”事件。AWS MSK(Apache Kafka 的託管流) 是 Amazon 的託管 Kafka 服務。Apache Kafka是一種廣泛使用的開源分散式事件儲存和串流處理平台。 Kafka 專為高吞吐量、低延遲的即時資料處理而設計。使用 Kafka,您可以定義“主題”來發布鍵/值訊息。多個發布者可以發布一個主題,多個消費者可以使用它。 Kafka 主題使用ZooKeeper叢集進行管理,而發布和消費則透過 Kafka 代理完成。     

AWS Lambda 是一種無伺服器、事件驅動的運算服務,可讓您執行程式碼來回應各種事件和觸發器,同時自動管理程式碼所需的底層運算資源。 Lambda 支援多種程式語言,包括 Node.js、Python、Java、Ruby、C# 和 Go。 

Lambda 的一個非常有用的功能是能夠觸發 Kafka 主題上的函數,這使其非常適合用作資料處理管道的一部分。AWS VPC 是一種在 AWS 中建立虛擬私人網 台灣電話號碼資料 路的方法。這使您可以將服務包含在更安全的環境中,並讓您完全控制誰和什麼可以存取網路及其資源。AWS EC2 是 Amazon 的彈性運算雲,您可以在其中部署虛擬或實體執行個體(電腦)並管理其安全性和網路屬性。 

AWS S3 是一種簡單的儲存服務。使用 S3 物件存儲,您可以建立儲存桶並在其中儲存檔案和資料夾。 S3 為您提供全面的存取控制和安全性。AWS IAM 是亞馬遜的身份和存取管理系統。 IAM 可讓您管理使用者、角色和策略,以便您可以實現對資源的細粒度存取控制,並以安全的方式向其他使用者和 AWS 帳戶授予存取權限。AWS CloudWatch 是 Amazon 的可觀察性平台,您可以在其中聚合來自 AWS 服務的日誌並輕鬆過濾和搜尋它們。

電話號碼數據

把事情放在一起

這個想法是,DocumentDB 用作管道的入口點,而 MSK 用作資料傳輸。處理區塊之間的每個路徑都是使用 Kafka 主題實現的。一個處理區塊將其輸出發佈到主題,而下一個處理區塊則使用該主題來取得其輸入。 

第一個處理區塊將充當“路由器”,分析新資料並決定其類型。它將資料發佈到該資料類型的專用主題,以便可以對該資料使用適當的處理區塊。我先將一條新的原始資料插入 DocumentDB 中。接下來,我使用 MSK 連接器註冊到 DocumentDB 集合的變更流,並將新插入的文件推送到初始 MSK 主題,該主題是「路由器」的輸入。 

然後,我配置一個基於 Lambda 的「路由器」函數來使用初始 MSK 主題、分析訊息並將 享受充滿螢幕的生活嗎? negi opticals 的眼睛舒適秘訣  每個訊息發佈到專用 MSK 主題。最後,對於每個資料類型專用主題,我都會有一個特定的 Lambda 函數,它知道如何處理該資料。然後,我可以根據需要繼續在管道上建立更多處理元素。 

一旦所有部分就位,我所要做的就是透過管道運行新數據,只需將其插入到我的 DocumentDB 集合中即可。從那時起,一切都會自動發生。此外,透過結合 Kafka 主題和 Lambda 函數,我可以為新資料訊息動態建立主題,然後定義處理程序來處理它們。訊息將在主題中等待,直到我建立可以處理它們的處理器,一旦我部署新處理器,它就可以開始處理訊息,這意味著訊息永遠不會丟失。 

這種設計還允許我隨著時間的推移動態更改處理管道的佈局。為了配置 DocumentDB 和 MSK,我使用部署在 EC2 上的堡壘執行個體。此實例允許我使用安全 SSH 連線以及連接埠轉送來連線到我的 VPC,以使我的本機環境能夠存取 VPC。我使用 S3 儲存桶來儲存 Kafka 連接器包和 Lambda 函數的程式碼包。此外,我使用 IAM 為 Kafka 連接器和 Lambda 函數建立所需的執行角色。 

最後,我使用 CloudWatch 透過將 Lambda 日誌匯集到 CloudWatch 日誌群組來了解 Lambda 函數正在執行的操作。現在讓我們檢查每個元件並了解如何配置和/或部署它們。

專有網路網關

DocumentDB和MSK都只部署在VPC中。要從本機電腦連接到它們以進行開發、測試和調試,您需要建立一個進入 VPC 的網關。我們將使用預設 VPC,但也可以使用任何 VPC。 

請參閱AWS 文檔,以了解有關如何建立新 VPC 的信息,以防預設 VPC 不合適。我們首先 資料庫資料庫 建立一個將用作網關的 EC2 執行個體。只需在預設 VPC 中啟動一個新的 EC2 執行個體並選擇 Ubuntu 20.04 作為作業系統映像(在撰寫本文時,Ubuntu 22.04 中不支援 mongo CLI):  

啟動新的EC2實例

 

接下來,建立一個 SSH 金鑰對,您將使用它從本機電腦存取新實例。點擊「建立新金鑰對」以建立新金鑰對並下載公鑰,或選擇現有金鑰對:

新的EC2實例SSH密鑰對

接下來,我們來看看「網路設定」部分。確保選擇您要使用的 VPC 和安全群組:

新的 EC2 實例網路設定

最後,啟動您的新實例。執行個體啟動後,您可以透過 SSH 連線到新執行個體:

透過 SSH 連接到新實例

文件資料庫

現在我們有了 VPC,我們可以開始考慮部署 DocumentDB

架構——DocumentDB

首先進入 Amazon DocumentDB 儀表板並點擊「建立叢集」。為您的叢集命名,確保所選引擎版本(MongoDB 版本)為“4.0.0”,然後選擇所需的實例類別。我們將用來讓 MSK 註冊到 DocumentDB 更改流的連接器要求 MongoDB 部署是副本集的一部分,因此請確保實例數量大於 1:DocumentDB — 啟動新實例 

接下來,在身分驗證部分下,填寫您要在叢集中使用的管理員使用者名稱和密碼。現在,點擊底部的「顯示進階設定」開關以開啟網路設置,並確保所選的 VPC 與您部署 EC2 執行個體的 VPC 相同:

DocumentDB — 新實例網路設定

調整任何其他設置,然後點擊底部的“建立叢集”按鈕以啟動新叢集。該過程需要幾分鐘,然後您將根據您的選擇看到以下或類似內容:

AWS 文件資料庫叢集

為了測試我們的新集群,我們需要在 EC2 網關執行個體中安裝 mongo 用戶端。請依照以下說明進行:

接下來,進入 AWS 控制台中的叢集詳細信息,並按照說明將 CA 憑證下載到您的 EC2 執行個體。接下來,運行新安裝的 mongo 用戶端以連接到新叢集:

如果連線失敗,您可能需要管理 DocumentDB 叢集的安全性群組以允許存取您的 EC2 執行個體的安全群組。為此,請進入叢集的詳細資訊並向下捲動至「安全群組」部分:

AWS 安全群組

選擇安全性群組,然後進入「入站規則」標籤:

AWS 安全群組入站規則

按一下「編輯入站規則」按鈕編輯入站規則,然後新增一條規則,允許來自「自訂」來源的所需流量類型(如果需要指定端口,請使用 27017)。在搜尋框中,搜尋並選擇您用於 EC2 執行個體的安全性群組:

將入站規則新增至AWS安全群組

最後,保存規則。您現在應該可以從您的實例存取 DocumentDB。為了讓開發和調試更容易,您可能需要使用Robo 3T等工具,它可以讓您使用直覺且易於使用的漂亮 GUI 存取 MongoDB,並且可以輕鬆查看和管理資料。您需要使用 SSH 透過 EC2 執行個體將連接埠 27017 從本機轉送至 DocumentDB: 

SSH 連接到網關,連接埠轉送到 DocumentDB

現在您可以設定本機 Robo 3T 或 mongo 用戶端以存取本機電腦上的連接埠 27017:

Robo 3T — 新連接

對於 Robo 3T,請確保允許無效主機名,因為您的本機主機名稱與 CA 憑證中的主機名稱不同:

Robo 3T — 設定 CA 憑證

現在,我們可以建立一個名為「pipeline」的新資料庫,並在其中建立一個名為「intake」的集合。我們也建立一個名為「puser」的新用戶,該用戶具有「pipeline」資料庫的讀取權限:

我們需要做的最後一件事是在新集合上啟用更改流。為此,我們需要像上面那樣連接到 DocumentDB,然後執行以下命令:

如果您像我一樣使用 Robo 3T,請右鍵單擊左側樹中的「管道」資料庫,然後選擇「開啟 Shell」。現在您可以輸入上面的命令並使用CTRL+ENTER來執行它。  

現在我們已經設定了 DocumentDB,我們可以繼續使用 MSK。

架構-MSK

我們將使用「快速建立」選項來部署 MSK 叢集。對於這個小演示,我們將使用“kafka.t3.small”風格並僅分配 1GB 空間。如果您需要變更網路設定以選擇不同的 VPC、區域和子網,則必須從「快速建立」切換為「自訂建立」。 ,您必須開始設定VPC 或子網路之間的路由,這在本文中不會介紹。 =“1400”]MSK集群[/標題] 

為了測試我們的集群,我們需要取得代理的位址。按一下集群,選擇“屬性”選項卡,然後向下捲動到“代理”部分。在那裡,您將找到已作為叢集一部分部署的代理程式的清單:

MSK 叢集代理

管理 Kafka 叢集是使用 Kafka CLI 工具完成的。 CLI 工具需要 Java 執行時,因為 Kafka 是用 Scala 寫的,在 JVM(Java 虛擬機器)上執行。我們使用的是 openjdk-8-jre。現在,從下載 Kafka 套件到 EC2 執行個體並解壓縮。對於本文檔,我們使用 Kafka 2.13–3.1.0 。接下來,使用「kafka-topics」指令取得現有主題的清單。您需要提供一個引導伺服器,它可以是叢集中的任何代理(我們使用第一個):  

列出 Kafka 主題

請注意,連接埠 9092 不使用 TLS。如果您希望使用安全的 TLS 連接,則應按照以下步驟操作:

  1. 建立客戶檔案:
  2. 建立初始信任儲存:

請注意,檔案的位置將根據您電腦上安裝的 JRE 進行變更。

  1. 最後運行命令如下:

現在我們已經部署並可存取 MSK 集群,我們可以建立初始主題:

讓我們向新主題發布一條測試訊息:

我們可以看到我們的新消息確實已發布並且也可以被使用。但我們如何明確話題呢?無法直接從主題中刪除訊息。相反,我們必須更改保留策略並等待 Kafka 為我們刪除所有過期的訊息,然後才能恢復原來的保留策略。首先,我們取得目前設置,發現沒有設定策略:

新策略需要一段時間才能生效,但一旦生效,我們可以運行我們的消費者並看到主題中沒有任何內容:

最後,我們可以恢復預設策略:

如果為了建立 Kafka MongoDB 連接器並更輕鬆地重複使用 Lambda 程式碼,我們需要建 立一個用於保存程式碼包的 S3 儲存桶。為此,首先轉到 S3 儀表板,然後按一下「建立儲存桶」按鈕以建立新儲存桶。填寫新儲存桶的名稱,選擇要求的區域,然後向下捲動並點擊「建立儲存桶」按鈕。我們暫時保留所有預設選項,但如果您想更改任何內容,可以稍後使用它們。

IAM 執行角色

我們的下一個挑戰是將 DocumentDB 與 Kafka 連結起來,以便將新文件插入 DocumentDB 時自動將包含完整文件資料的通知放入給定的 Kafka 主題中。為此,我們將使用 Kafka 連接器,該連接器將為我們的集合註冊 Mongo 更改流,然後將新文件發佈到所選的 Kafka 主題。 

我們將首先為新連接器建立 IAM 執行角色。請注意,在建立連接器時,AWS 將允許您建立執行角色。然而,事實證明,由於 AWS 對執行角色的工作方式進行了一些更改,使用此選項會導致 MSK Connect 無法使用服務連結角色。 AWS 已意識到此問題,但截至撰寫本文時尚未修復。所以,我們需要手動創建自己的角色。    

請注意,此策略授予 許多權限。我們這樣做是為了簡單起見,但您可能想嘗試並限制您授予的權限以獲得更好的安全性。最後,點擊底部的“建立策略”按鈕。現在您可以看到您的角色中列出的新政策:

IAM — 新內聯策略

接下來,我們需要新增正確的信任策略,因此按一下「信任關係」選項卡,然後按一下「編輯信任策略」。在開啟的編輯器中,將所有文字替換為以下內容:

雲觀察

為了追蹤連接器和 Lambda 函數的運作情況,我們需要一個地方來保存日誌。我們將使用 CloudWatch,因此需要建立一個日誌組。前往 CloudWatch 儀表板,選擇左側的“日誌”,然後選擇“日誌組”。點擊右側的“建立日誌組”,為日誌組指定名稱和保留設置,然後點擊底部的“建立”即可完成。很簡單!

MSK 安全集團

在創建 Kafka 連接器之前要解決的另一件事是配置我們的安全群組以允許內部通訊。前往 MSK 叢集的配置,按一下「屬性」選項卡,然後按一下套用的安全性群組。如果您遵循本文,您應該只應用一個安全群組。 

進入 EC2 儀表板和安全性群組設定後,您需要點擊「編輯入站規則」按鈕來新增所需的規則。現在,點擊左下角的“新增規則”按鈕,選擇規則類型“所有流量”,然後在自訂“來源”搜尋框中找到您的安全性群組。確保選擇與您目前正在編輯的安全群組相同的安全群組。請注意,安全群組的名稱顯示在頁面左上角的導覽列中。最後,點擊“儲存規則”,您就應該完成設定了。

 

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

返回頂端