1. Avro 介绍
Avro 是一种数据序列化系统,由 Apache Foundation 开发并维护。它提供了丰富的数据结构类型,并且可以用于编码结构化数据。Avro 主要被用于 Apache Hadoop 这样的大数据处理系统中,它可以提供高效而且紧凑的序列化以及反序列化操作。
Avro 的特点包括:
- 基于 JSON 的模式定义:Avro 使用 JSON 格式定义数据结构,这使得模式易于阅读和编写。
- 编码与解码时模式独立:Avro 的一大特点是模式用于编码和解码,这允许对数据结构进行演化而无需修改所有读取数据的应用。这个特性对于长期存储大量数据的系统来说非常有用。
- 高效:Avro 的二进制编码非常紧凑,这使得 Avro 特别适合于大数据场景。它没有其他一些序列化系统的大量开销,比如标记字段名。
- 支持多种语言:Avro 支持许多编程语言,包括 Java, C, C++, C#, Python 和 Ruby 等。
总的来说,Avro 是一个强大的工具,可以有效地处理复杂的数据结构,并且它的设计理念适应于大数据处理,能有效地处理大规模数据的序列化和反序列化操作。
2. Avro 结合 kafka
在分布式数据流处理系统中,Avro 和 Kafka 的结合带来了很多重要的好处:
- 模式演化: Avro 支持模式演化,这就意味着你可以在不中断服务的情况下修改你的数据模式。这对于实时数据流处理系统来说非常重要,因为它允许系统在不停机的情况下进行修改和升级。
- 高效的序列化: Avro 提供了一种高效且紧凑的二进制数据格式,这对于 Kafka 这样的数据流处理系统来说非常重要。它可以帮助减少网络传输的开销和存储的需求。
- 跨语言支持: Avro 支持多种编程语言,这对于在多种语言编写的服务中使用 Kafka 的系统来说非常有利。这就意味着,无论服务是用 Java、Python 还是其他语言编写的,都可以通过 Avro 和 Kafka 进行有效的数据交换。
- 明确的数据契约: 由于 Avro 使用明确定义的模式,它提供了一种明确的数据契约,可以在生产者和消费者之间提供清晰的数据期望。这有助于提高系统的健壮性,并减少因数据不一致而产生的问题。
因此,结合 Avro 和 Kafka 可以帮助构建一个健壮、高效且易于维护和演化的实时数据流处理系统。
3. Avro json schema
UserBehavior.avsc, 注意 : 创建的文件后缀名一定要是 avsc
这里是一个 avro 的json描述, 请你进行解读
{
"namespace": "com.hnbian", // 要生成 类的路径
"type": "record", // 类型 avro 使用 record
"name": "UserBehavior", // 会自动生成对应的类名 UserBehavior.java
"fields": [ // 要指定的字段
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "categoryId", "type": "int"},
{"name": "behavior", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
添加 avro json schema 以后,编译 (mvn compile) 项目后将会自动在 com.hnbian
包下生成 UserBehavior.java 类
类型说明:
- 除了上面写的类型, avro 还支持其他多种类型
- 基本类型(Primitive Types):
null
: 表示空值或无值boolean
: 表示布尔值,true 或 falseint
: 表示32位整数long
: 表示64位整数float
: 表示单精度浮点数double
: 表示双精度浮点数bytes
: 表示字节序列string
: 表示 Unicode 字符序列
- 复合类型(Complex Types):
record
: 表示一组带名字(名称必须唯一)的字段,每个字段都有自己的类型。这与大多数编程语言中的类或结构体类似。enum
: 表示一组有名字的值,类似于 Java 的枚举。array
: 表示一种类型的连续值。map
: 表示一组键值对,其中键是 string 类型,值可以是任意类型。union
: 表示可以是多种类型的值,例如,一个字段的类型可以是 int 或 null。fixed
: 表示一个具有固定数目的字节的值。
以下是一个增加了其他类型的简单示例:
{
"namespace": "com.hnbian",
"type": "record",
"name": "UserBehavior",
"fields": [
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "categoryId", "type": "int"},
{"name": "behavior", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "isPurchased", "type": "boolean"}, // 新添加的 boolean 类型字段
{"name": "itemPrice", "type": "double"}, // 新添加的 double 类型字段
{
"name": "comments",
"type": { // 新添加的 array 类型字段
"type": "array",
"items": "string"
}
},
{
"name": "attributes",
"type": { // 新添加的 map 类型字段
"type": "map",
"values": "string"
}
},
{
"name": "rating",
"type": ["null", "int"] // 新添加的 union 类型字段
}
]
}
4. maven 依赖与编译
4.1 maven 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Kafka_avro_consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
</dependencies>
<!--使用插件可以在 Maven 构建生命周期中生成 Avro schemas 的 Java 类-->
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.2 进行项目编译
编译后的 UserBehavior 类
5. 测试数据
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
6. 自定义序列化类
包含定义序列化与反序列化两个方法
package com.hnbian;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* 自定义序列化和反序列化
*/
public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, UserBehavior userBehavior) {
// 创建序列化执行器
SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<>(userBehavior.getSchema());
// 创建一个流 用存储序列化后的二进制文件
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 创建二进制编码器
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
try {
// 数据入都流中
writer.write(userBehavior, encoder);
} catch (IOException e) {
e.printStackTrace();
}
return out.toByteArray();
}
@Override
public void close() {}
@Override
public UserBehavior deserialize(String s, byte[] bytes) {
// 用来保存结果数据
UserBehavior userBehavior = new UserBehavior();
// 创建输入流用来读取二进制文件
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
// 创建输入序列化执行器
SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<>(userBehavior.getSchema());
// 创建二进制解码器
BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
try {
// 数据读取
userBehavior = stockSpecificDatumReader.read(null, binaryDecoder);
} catch (IOException e) {
e.printStackTrace();
}
// 结果返回
return userBehavior;
}
}
7. 编写 kafka 生产者类
package com.hnbian;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* 生产测试数据
**/
public class UserBehaviorProducerKafka {
public static void main(String[] args) throws InterruptedException {
// 从 CSV 文件中获取数据
List<UserBehavior> data = getData();
// 创建 Kafka 配置文件
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:6667,node:6667,node3:6667");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value.serializer 需要指定自定义的序列化类,否则将无效
props.setProperty("value.serializer", SimpleAvroSchemaJava.class.getName());
// 创建 Kafka 生产者
KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<>(props);
// 遍历数据
for (UserBehavior userBehavior : data) {
// 创建 Kafka 消息
ProducerRecord<String, UserBehavior> producerRecord
= new ProducerRecord<>("test", userBehavior);
// 将消息发送到 Kafka
userBehaviorProducer.send(producerRecord);
System.out.println("数据写入成功" + data);
// 每次发送数据后,线程休眠1秒
Thread.sleep(1000);
}
}
// 从 CSV 文件中读取数据的函数
public static List<UserBehavior> getData() {
// 初始化一个存放 UserBehavior 对象的列表
ArrayList<UserBehavior> userBehaviors = new ArrayList<>();
try {
// 创建一个从 CSV 文件读取数据的 BufferedReader 对象
BufferedReader br = new BufferedReader(new FileReader("UserBehavior.csv"));
String line;
// 读取文件中的每一行数据
while ((line = br.readLine()) != null) {
// 使用逗号分隔每一行数据,得到一个字符串数组
String[] split = line.split(",");
// 使用字符串数组中的数据创建一个 UserBehavior 对象,并添加到列表中
userBehaviors.add(
new UserBehavior(
Long.parseLong(split[0])
, Long.parseLong(split[1])
, Integer.parseInt(split[2])
, split[3]
, Long.parseLong(split[4]))
);
}
} catch (Exception e) {
// 打印出任何异常信息
e.printStackTrace();
}
// 返回包含所有 UserBehavior 对象的列表
return userBehaviors;
}
}
8. 编写kafka 消费者类
package com.hnbian;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/** 消费数据 测试数据 **/
public class UserBehaviorConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", "node1:6667,node2:6667,node3:6667");
prop.put("group.id", "UserBehavior");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置反序列化类为自定义的avro反序列化类
prop.put("value.deserializer", SimpleAvroSchemaJava.class.getName());
KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
System.out.println(stringStockConsumerRecord.value());
}
}
}
}
9. 异常
在反序列化时可能遇见一些异常信息如下:
java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -53
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
请检查你的序列化与反序列化时使用的 schema 是否一致, 包括字段名称, 字段类型, 字段顺序等等。
10. 其他
在进行反序列化时还有一种情况, 如果Kafka中的数据字段比较多, 但是我们不需要那么多的数据, 只需要其中的某些字段, avro 在进行反序列化的定义schema 的时候可以按字段顺序反序列化部分字段, 但是中间不能有空出来的字段
如序列化 schema 如下:
{
"namespace": "com.hnbian",
"type": "record",
"name": "UserBehavior",
"fields": [
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "categoryId", "type": "int"},
{"name": "behavior", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
在反序列化时, 可以定义
{
"namespace": "com.hnbian",
"type": "record",
"name": "UserBehavior",
"fields": [
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "categoryId", "type": "int"}
]
}
# 减掉了最后两个字段
但是不能如下定义:
{
"namespace": "com.hnbian",
"type": "record",
"name": "UserBehavior",
"fields": [
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "behavior", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
# 减去了中间的 categoryId 字段
# 这时也会报反序列化异常 Failed to deserialize Avro record.