透過Kafka JDBC Sink Connector將串流資料匯入RDBMS,無需依賴Schema Registry

在當今的M2M(機器對機器)通信環境中,大量需求將來自異構IoT設備的數位數據流式傳輸到各種RDBMS,以便透過儀表板進行進一步分析,觸發不同事件以執行多種操作。為支援上述場景,Apache Kafka扮演著中央神經系統的角色,數據可從各種IoT設備輸入並持久化到各種類型的儲存庫,如RDBMS、雲端儲存等。此外,各種類型的數據管道可在數據到達Kafka主題之前或之後執行。透過使用Kafka JDBC sink連接器,我們可以持續將數據從Kafka主題流式傳輸到相應的RDBMS。

JDBC Sink Connector的最大難題

JDBC sink連接器的最大困難在於,它需要已降落在Kafka主題上的數據的schema知識。因此,Schema Registry必須作為單獨的組件與現有的Kafka叢集整合,以便將數據傳輸到RDBMS。因此,要將數據從Kafka主題下沉到RDBMS,生產者必須發布包含schema的消息/數據。schema定義了數據格式的結構。如果未提供schema,JDBC sink連接器將無法在從主題消耗消息後將消息映射到數據庫表的列。

透過利用Schema Registry,我們可以避免每次從生產者發送消息/載荷時都附帶schema,因為Schema Registry會在_schemas主題中儲存(或註冊)schemas,並根據在JDBC sink連接器的屬性文件中配置/提及的主題名稱進行綁定。

對於希望利用Oracle或Confluent的Schema Registry搭配開源Apache Kafka收集物聯網設備數據的中小型企業而言,授權費用可能是一大障礙。

本文將透過Java代碼片段展示如何不依賴Schema Registry,僅使用JDBC Sink連接器將數據持續從Apache Kafka主題流式傳輸至MySQL數據庫。

Apache Kafka與JDBC連接器

Apache Kafka並未隨附與特定供應商的RDBMS相對應的JDBC連接器,如同文件源和接收連接器。我們需自行實現或開發特定RDBMS的代碼,通過實施Apache Kafka的Connect API。但Confluent已開發、測試並支持JDBC Sink連接器,並最終在Confluent Community License下開源,因此我們已將JDBC Sink連接器與Apache Kafka整合。

即便我們發送錯誤的架構或根本不發送架構,Kafka主題也不會拋出任何異常,因為它接受所有消息或記錄作為鍵值對中的字節數組。在將整個消息傳輸至主題之前,生產者必須使用序列化器將消息轉換為字節數組。

以下是與從Apache Kafka消息生產者發布的有效負載或實際數據綁定的示例架構。

此外,這裡是消息生產者的Java代碼片段:

 

public class ProducerWithSchema {

private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";

public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}

public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);

producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();

}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}

return status;

}

public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}

}

當然,採用上述方法存在幾個瓶頸,例如:

  • 消息與模式之間的緊密耦合。
  • 每次都需將模式與實際數據結合。
  • 模式演進相關問題。
  • 程式碼維護性等。

為緩解或解決上述問題,引入了Schema Registry作為獨立組件,所有模式將在此部署/維護。在模式演進過程中進行兼容性檢查是必要的,以確保生產者-消費者契約得到維護,Schema Registry可利用於實現這一點。

您可以觀看以下視頻,了解如何使用JDBC sink connector在單節點Apache Kafka集群上,將數據持續從主題流轉到MySQL的特定表。

結論

至此,您應對JDBC連接器及將Apache Kafka與JDBC連接器捆綁的最大難點有更深入的理解。希望您喜歡這次閱讀。若您覺得此文有價值,請點讚與分享。

Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink