<kbd id="m0mea"><tt id="m0mea"></tt></kbd>
  • <s id="m0mea"><code id="m0mea"></code></s>
    <tbody id="m0mea"><u id="m0mea"></u></tbody><rt id="m0mea"><menu id="m0mea"></menu></rt>
  • <legend id="m0mea"><input id="m0mea"></input></legend>

    KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)
    2022-09-06 22:42:16


    文章目錄

    一、基礎集成
    1. 技術選型

    軟件/框架

    版本

    jdk

    1.8.0_202

    springboot

    2.5.4

    kafka server

    kafka_2.12-2.8.0

    kafka client

    2.7.1

    zookeeper

    3.7.0

    2. 導入依賴
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    3. kafka配置

    properties版本

    spring.application.name=springboot-kafka
    server.port=8080
    # kafka 配置
    spring.kafka.bootstrap-servers=node1:9092

    # producer 配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 生產者每個批次最多方多少條記錄
    spring.kafka.producer.batch-size=16384
    # 生產者一端總的可用緩沖區大小,此處設置為32M * 1024 * 1024
    spring.kafka.producer.buffer-memory=33544432

    # consumer 配置
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.group-id=springboot-consumer-02
    # earliest - 如果找不到當前消費者的有效偏移量,則自動重置向到最開始
    spring.kafka.consumer.auto-offset-reset=earliest
    # 消費者的偏移量是自動提交還是手動提交,此處自動提交偏移量
    spring.kafka.consumer.enable-auto-commit=true
    # 消費者偏移量自動提交時間間隔
    spring.kafka.consumer.auto-commit-interval=1000

    yml版本項目內部配置

    server:
    port: 8002
    spring:
    application:
    # 應用名稱
    name: ly-kafka
    profiles:
    # 環境配置
    active: dev
    cloud:
    nacos:
    discovery:
    # 服務注冊地址
    server-addr: nacos.server.com:8848
    config:
    # 配置中心地址
    server-addr: nacos.server.com:8848
    # 配置文件格式
    file-extension: yml
    # 共享配置
    shared-configs:
    - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

    nacos-config 服務端配置

    在這里插入代碼片
    4. auto-offset-reset 簡述

    關于
    auto.offset.reset 配置有3個值可以設置,分別如下:

    earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
    latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
    none: topic 各分區都存在已提交的 offset 時,從 offset 后開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;
    默認建議用 earliest, 設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。

    而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就不管了。

    none 這個設置沒有用過,兼容性太差,經常出問題。

    5. 新增一個訂單類

    模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其他服務消費

    package com.gblfy.kafka.entity;

    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;

    import java.time.LocalDateTime;

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class Order {
    /**
    * 訂單id
    */
    private long orderId;
    /**
    * 訂單號
    */
    private String orderNum;
    /**
    * 訂單創建時間
    */
    private LocalDateTime createTime;
    }
    6. 生產者(異步)
    package com.gblfy.lykafka.provider;

    import com.alibaba.fastjson.JSONObject;
    import com.gblfy.common.constant.KafkaTopicConstants;
    import com.gblfy.common.entity.Order;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;

    import java.time.LocalDateTime;

    /**
    * Kafka生產者
    *
    * @author gblfy
    * @date 2021-09-28
    */
    @Service
    public class KafkaProvider {
    private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
    // 構建一個訂單類
    Order order = Order.builder()
    .orderId(orderId)
    .orderNum(orderNum)
    .createTime(createTime)
    .build();
    // 發送消息,訂單類的 json 作為消息體
    ListenableFuture<SendResult<String, String>> future =
    kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));

    // 監聽回調
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    @Override
    public void onFailure(Throwable e) {
    log.info("發送消息失敗: {}", e.getMessage());
    }

    @Override
    public void onSuccess(SendResult<String, String> result) {
    RecordMetadata metadata = result.getRecordMetadata();
    log.info("發送的主題:{} ,發送的分區:{} ,發送的偏移量:{} ",
    metadata.topic(), metadata.partition(), metadata.offset());
    }
    });
    }
    }
    7. 消費者
    package com.gblfy.lykafka.controller;

    import com.gblfy.lykafka.provider.KafkaProvider;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    import java.time.LocalDateTime;

    @RestController
    @RequestMapping("/kafka")
    public class KafkaProviderController {

    @Autowired
    private KafkaProvider kafkaProvider;

    @GetMapping("/sendMQ")
    public String sendMQContent() {
    kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());
    return "OK";
    }
    }

    通過 @KafkaListener注解,我們可以指定需要監聽的 topic 以及 groupId, 注意,這里的 topics 是個數組,意味著我們可以指定多個 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

    注意:消息發布者的 TOPIC 需要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

    8. kafka配置類
    package com.gblfy.common.constant;

    public class KafkaTopicConstants {
    //kafka發送消息主題
    public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";

    // kafka消費者組需要和yml文件中的 kafka.consumer.group-id的值保持一致
    public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
    }
    9.單元測試

    新建單元測試,功能測試消息發布,以及消費。

    package com.gblfy.kafka;

    import com.gblfy.kafka.controller.KafkaProvider;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;

    import java.time.LocalDateTime;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;

    @SpringBootTest
    class KafkaSpringbootApplicationTests {

    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
    // 發送 1000 個消息
    for (int i = 0; i < 1000; i++) {
    long orderId = i+1;
    String orderNum = UUID.randomUUID().toString();
    kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
    }
    TimeUnit.MINUTES.sleep(1);
    }
    }
    9. 效果圖

    KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)_java


    KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)_ide_02

    10. 源碼地址

    ??https://gitee.com/gb_90/kafka-parent??

    11.微服務專欄

    ??https://gitee.com/gb_90/micro-service-parent??


    本文摘自 :https://blog.51cto.com/g


    更多科技新聞 ......

    日本成人三级A片
    <kbd id="m0mea"><tt id="m0mea"></tt></kbd>
  • <s id="m0mea"><code id="m0mea"></code></s>
    <tbody id="m0mea"><u id="m0mea"></u></tbody><rt id="m0mea"><menu id="m0mea"></menu></rt>
  • <legend id="m0mea"><input id="m0mea"></input></legend>