热搜:前端 nest neovim nvim

SpringBoot3 配置文件整合 Apache Avro

lxf2023-06-23 03:26:53

1、介绍

Apache Avro 是一种高效的数据序列化系统,用于在不同的应用程序和平台之间传输和存储数据。它提供了一种紧凑且高效的二进制数据编码格式,相比其他常见的序列化方式,Avro能够实现更快的序列化和更小的数据存储。

而Confluent Schema Registry是由Confluent公司提供的一个开源组件,旨在解决分布式系统中的数据模式演化和兼容性的问题。它是建立在Apache Avro之上的一个服务,可以用于集中管理和存储Avro数据的模式(Schema),确保分布式系统中的数据一致性和兼容性。它广泛应用于事件流处理平台(如Kafka),为数据流的可靠性和互操作性提供了支持。

本文将介绍如何在Spring Boot应用程序中整合Apache Avro和Confluent Schema Registry,以实现高效的数据序列化和管理。

本文代码示例: GitHub仓库地址


2、Confluent Schema

1、下载

软件下载地址:Previous Versions - Confluent

本次使用:confluent-community-7.3.3 社区版,下载上传至Linux解压。

2、修改配置

修改配置文件:在 confluent-7.3.3/etc 文件夹下

confluent-7.3.3/etc/schema-registry/schema-registry.properties

# 配置Confluent Schema Registry 服务的访问IP和端口
listeners=http://0.0.0.0:8081

# 修改 Kafka集群指定引导服务器
kafkastore.bootstrap.servers=PLAINTEXT://xx.xx.xx.xx:9092,xx.xx.xx.xx:9192
# kafkastore.connection.url 配置zookeeper地址方式已弃用 

# 存储 schema 的 topic
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

3、启动

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4、验证

curl -X POST 'http://localhost:8081/subjects/topic-test/versions' \ 
	 -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     -d '{"schema": "{\"type\": \"string\"}"}'

 返回结果:{"id":1}

2、Springboot整合

1、引入xml

    <dependencies>    
		<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.7</version>
        </dependency>

        <!--avro-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.confluent/kafka-schema-registry-client -->
        <!--schema-registry-client-->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>7.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
    </dependencies>

    <!--schema-registry-client 远程仓库-->
    <repositories>
        <!-- other maven repositories the project -->
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

2、导入Avro构建插件

  <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
          <execution>
              <phase>generate-sources</phase>
              <goals>
                  <goal>schema</goal>
              </goals>
              <configuration>
                  <!--schema 文件所在目录-->
                  <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                  <!--根据schema 文件生成的类文件目录-->
                  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
              </configuration>
          </execution>
      </executions>
  </plugin>

3、生成实体类

  • 在 ${project.basedir}/src/main/resources/avro/ 下创建 user.avsc文件。

  • 通过 Maven mvn compile 命令生产实体类

{
  "namespace": "com.jinunn.kraft.avro",  // 实体类存放路径
  "type": "record",
  "name": "User",     // 实体类文件名
  "fields": [       // 实体类属性
    {
      "name": "id",  // 属性名
      "type": "int"  // 类型
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}

SpringBoot3 配置文件整合 Apache Avro

4、SpringBoot配置文件

server:
  port: 8086

spring:
  application:
    name: kafka

  kafka:
  	# 集群地址
    bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9192,xx.xx.xx.xx:9292
    producer:
      # 设置key的序列化类
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      # 设置value的序列化类
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      # ack策略
      # 0:生产者发送消息就不管了,效率高,但是容易丢数据,且没有重试机制
      # 1:消息发送到Leader并落盘后就返回,如果Leader挂了并且Follower还没有同步数据就会丢失数据
      #-1:消息要所有副本都拷贝才返回,保证数据不丢失(但是有可能重复消费)
      acks: 1
      # 失败重试次数
      retries: 3
      # 批量提交的数据大小
      batch-size: 16384
      # 生产者暂存数据的缓冲区大小
      buffer-memory: 33554432
    consumer:
      # key的反序列化类
      key-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      # 是否自动提交偏移量,如果要手动确认消息,就要设置为false
      enable-auto-commit: false
      # 消费消息后间隔多长时间提交偏移量(ms)
      auto-commit-interval: 100
      # 默认的消费者组,如果不指定就会用这个
      group-id: groupId
      # kafka意外宕机时的消息消费策略
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
    listener:
      # 手动确认消息
      ack-mode: manual_immediate
      # 消费者运行的线程数
      concurrency: 2
    properties:
      # confluent schema 地址
      schema:
        registry:
          url: http://xx.xx.xx.xx:8081

5、消息生产者

@RestController
@RequestMapping("/send")
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class Producer {

    private final KafkaTemplate<String, User> kafkaTemplate;

    @GetMapping("/test")
    public void sendMsg() {
        for (int i = 0; i < 10; i++) {
            User user = new User();
            user.setId(i);
            user.setName("Jin" + i);
            user.setAge(35 + i);
            kafkaTemplate.send(AvroConsumer.TOPIC_NAME, user);
        }
    }
}

6、消息消费者

@Slf4j
@Component
public class AvroConsumer {

    public static final String TOPIC_NAME = "test";

    @KafkaListener(topics = TOPIC_NAME, groupId = "test-group")
    public void consume(ConsumerRecord<String, User> record, Acknowledgment ack) {
        log.info("value #=>:{}", record.value());
        // 手动提交ack
        ack.acknowledge();
    }
}

7、结果

SpringBoot3 配置文件整合 Apache Avro

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!