Young's blog Young's blog
首页
Spring
  • 前端文章1

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • HTML
  • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Young

首页
Spring
  • 前端文章1

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • HTML
  • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Hadoop

  • kafka

    • kafka3.0入门
    • kafka3.0 生产者
    • kafka3.0 broker
    • Kafka 消费者
    • Kafka-Kraft 模式
    • Kafka 整体流程
    • Kafka 幂等性和事务
    • Kafka 外部系统集成 Springboot
      • 1.1 Flume 生产者
      • 1.2 Flume 消费者
      • 2.1 SpringBoot 生产者
      • 2.2 SpringBoot 消费者
    • Kafka生产调优手册
  • Flume

  • hive

  • scala

  • spark

  • 大数据
  • kafka
andanyang
2023-09-24
目录

Kafka 外部系统集成 Springboot

# 第 1 章 集成 Flume

Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 Flume 的消费者。

 集成 Flume

# 1.1 Flume 生产者

Flume生产者

(1)启动 kafka 集群

(2)启动 kafka 消费者

[andanyoung@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
1

(4)配置 Flume

在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf

[andanyoung@hadoop102 flume]$ mkdir jobs
[andanyoung@hadoop102 flume]$ vim jobs/file_to_kafka.conf
1
2

配置文件内容如下

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

(5)启动 Flume

[andanyoung@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &
1

(6)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况

(7)观察 kafka 消费者,能够看到消费的 hello 数据

# 1.2 Flume 消费者

Flume消费者

(1)配置 Flume

在 hadoop102 节点的 Flume 的/opt/module/flume/jobs 目录下创建 kafka_to_file.conf

[andanyoung@hadoop102 jobs]$ vim kafka_to_file.conf
1

配置文件内容如下

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id

# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4 配置 sink
a1.sinks.k1.type = logger

# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

(2)启动 Flume

[andanyoung@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
1

(3)启动 kafka 生产者

[andanyoung@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
1

并输入数据,例如:hello world

(4)观察控制台输出的日志

# 第 2 章 集成 SpringBoot

<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
</dependency
1
2
3
4

# 2.1 SpringBoot 生产者

(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息

# 应用名称
spring.application.name=andanyoung_springboot_kafka
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyse-rializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
1
2
3
4
5
6
7

(2)创建 controller 从浏览器接收数据, 并写入指定的 topic

@RestController
public class ProducerController {

    @Autowired
    KafkaTemplate<String, String> kafka;

    @RequestMapping("/andanyoung")
    public String data(String msg){
        // 通过kafka发送出去
        kafka.send("first", msg);

        return "ok";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

(3)在浏览器中给/andanyoung 接口发送数据

http://localhost:8080/andanyoung?msg=hello

# 2.2 SpringBoot 消费者

# =========消费者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserial
izer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserial
izer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=andanyoung
# =========消费者配置结束=========
1
2
3
4
5
6
7
8
9
10
11

(2)创建类消费 Kafka 中指定 topic 的数据

@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = "first")
    public void consumerTopic(String msg){
        System.out.println("收到消息:" + msg);
    }
}

1
2
3
4
5
6
7
8
9
编辑 (opens new window)
上次更新: 2024/04/19, 08:52:45
Kafka 幂等性和事务
Kafka生产调优手册

← Kafka 幂等性和事务 Kafka生产调优手册→

最近更新
01
idea 热部署插件 JRebel 安装及破解,不生效问题解决
04-10
02
spark中代码的执行位置(Driver or Executer)
12-12
03
大数据技术之 SparkStreaming
12-12
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Young | MIT License
浙ICP备20002744号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式