在当今的M2M(机器对机器)通信环境中,从异构的物联网设备向各种RDBMS实时传输数字数据的需求巨大,以便通过仪表板进行进一步分析,并触发不同事件执行多种操作。为支持这些场景,Apache Kafka扮演着中枢神经系统的角色,能够从众多物联网设备摄取数据,并将其持久化到各种类型的存储库,如RDBMS、云存储等。此外,在数据到达Kafka主题前后,可以执行多种类型的数据管道。通过使用Kafka JDBC Sink连接器,我们可以持续地将数据从Kafka主题流式传输到相应的RDBMS中。
JDBC Sink连接器最大的难点
JDBC Sink连接器面临的最大难题在于,它需要预先了解已落入Kafka主题的数据架构。因此,必须将Schema Registry作为独立组件与现有的Kafka集群集成,以便将数据传输至RDBMS。为此,要将数据从Kafka主题下沉到RDBMS,生产者必须发布包含架构的消息/数据。架构定义了数据格式的结构。若未提供架构,JDBC Sink连接器将无法在消费主题消息后,将消息映射到数据库表的列上。
借助Schema Registry,我们可以避免每次都从生产者发送带有消息/载荷的架构,因为Schema Registry会在_schemas
主题中存储(或注册)架构,并根据在JDBC Sink连接器的属性文件中配置/提及的主题名称进行绑定。
对于希望利用Oracle或Confluent的Schema Registry结合开源Apache Kafka收集物联网设备数据的中小型企业而言,许可成本可能是一大障碍。
本文将通过Java代码示例,展示如何在不使用Schema Registry的情况下,利用JDBC Sink连接器将数据持续流式传输到Apache Kafka主题至MySQL数据库。
Apache Kafka与JDBC连接器
Apache Kafka并未自带针对特定供应商的关系型数据库管理系统(RDBMS)的JDBC连接器,类似于文件源和接收连接器。我们需要自行实现或开发特定RDBMS的代码,通过实现Apache Kafka的Connect API来完成。但Confluent已经开发、测试并支持了JDBC Sink连接器,并最终在Confluent社区许可下开源,因此我们已将JDBC Sink连接器与Apache Kafka集成。
即使我们发送了错误的模式或根本没有模式,Kafka主题也不会抛出异常,因为Kafka主题接受所有消息或记录作为键值对中的字节数组。在将整个消息传输到主题之前,生产者必须使用序列化器将消息转换为字节数组。
以下是与有效负载或实际数据绑定的示例模式,这些数据需由Apache Kafka消息生产者发布。
同时,以下是消息生产者的Java代码片段:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO 自动生成的方法存根
new ProducerWithSchema().sendMsgToTopic();
}
}
当然,上述方法存在一些瓶颈,例如:
- 消息与模式之间的紧密耦合。
- 每次都需要将模式与实际数据结合。
- 模式演化的问题。
- 代码维护性等。
为了缓解或解决上述问题,引入了Schema Registry作为一个独立组件,所有模式将在此部署/维护。在模式演化过程中进行兼容性检查是必要的,以确保生产者-消费者合约得到遵守,Schema Registry可用于实现这一点。
您可以观看以下视频,了解数据如何通过JDBC sink连接器在单节点Apache Kafka集群上持续从主题流式传输到MySQL的特定表。
结论
至此,您应该对JDBC连接器以及将Apache Kafka与JDBC连接器捆绑在一起的最大困难有了更好的理解。希望您喜欢这篇文章。如果您觉得这篇内容有价值,请点赞和分享。
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink