CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)

一、安装jdk(已安装可以不用管)

Kafka需要依赖jdk,这里先安装jdk

【1】卸载jdk

有时候会因为各种原因我们需要卸载jdk,这里也记录一下

# 查看安装的jdk
rpm -qa|grep -i jdk
image-20220718093407015
# 卸载jdk(根据自己的包名来进行卸载)
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.332.b09-1.el7_9.x86_64
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64
rpm -e --nodeps java-1.8.0-openjdk-devel-1.8.0.332.b09-1.el7_9.x86_64
# 卸载后查看jdk版本,发现找不到,卸载成功
java -version

【2】安装jdk

官网下载:Java Downloads | Oracle 找到 tar.gz 结尾的进行下载,上传到服务器进行安装配置

# 解压(我的解压目录为:/software/jdk/jdk1.8.0_333)
tar -zxvf jdk-8u333-linux-x64.tar.gz

# 打开配置文件进行配置
vim /etc/profile

# 添加如下配置(根据自己的路径来)
export JAVA_HOME=/software/jdk/jdk1.8.0_333
export JRE_HOME=/$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

# 使配置文件生效
source /etc/profile

# 查看是否安装成功
java -version

二、下载Kafka

官网:Apache Downloads

image-20220715155557393

根据对应的版本,使用wget下载,如果下载速度太慢,可以在Windows上通过下载器下载,然后上传到服务器上

wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz --no-check-certificate

上传后进行解压

tar -axvf kafka_2.13-3.2.0.tgz

注,解压路径不宜过长,否则后面启动的时候可能会报错,所以这里我将kafka_2.13-3.2.0重命名为kafka

mv kafka_2.13-3.2.0 kafka

解压后目录结构如下:

image-20220720134330084

三、配置Kafka

这里只是单机kafka的配置,并不是集群的配置

【1】配置kafka的log路径

打开config目录下的server.properties文件修改配置

# 打开配置文件
vim config/server.properties

# 配置日志存放位置(启动的时候会自己在相应的目录下创建kafka-logs文件夹)
log.dirs=/software/kafka/kafka-logs

【2】配置外网访问

还是修改server.properties文件,找到下面配置进行修改,位置大概在34行左右

# 放开注解
listeners=PLAINTEXT://:9092

# 修改注解,改成自己服务器ip
advertised.listeners=PLAINTEXT://42.104.249.49:9092
image-20220720150648166

【3】配置zookeeper数据路径

先在解压目录下创建zookeeper文件夹用来存放数据,再打开config目录下的zookeeper.properties文件修改配置

# 打开配置文件
vim config/zookeeper.properties

# 配置数据存放位置(启动的时候会自己在相应的目录下创建zookeeper文件夹)
dataDir=/software/kafka/zookeeper
image-20220720171618278

【4】放通端口

# 放通9092端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent

# 重启防火墙
firewall-cmd --reload

注意这里除了放通centos的端口,还要将安全组的端口也放通,比如我这里用的是腾讯云,就要上腾讯云的控制台开放相应的安全组端口

四、启动Kafka

# 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动kafka
bin/kafka-server-start.sh config/server.properties

# 生产消息(创建名为testTopic的主题)
bin/kafka-console-producer.sh --topic testTopic --bootstrap-server localhost:9092

# 监听消息(重开一个终端监听testTopic主题的消息,在生产消息的终端发消息,此终端收消息)
bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server localhost:9092

在监听消息的终端能够收到消息,则说明安装配置成功

五、SpringBoot连接测试

目录结构如下:

image-20220720165641782

【1】引入pom依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

【2】配置application.yml

bootstrap-servers配置为自己服务器的IP

spring:
  kafka:
    # 消费者
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      bootstrap-servers: 42.104.249.49:9092
    # 生产者
    producer:
      bootstrap-servers: 42.104.249.49:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

【3】生产消息

生产消息通过controller访问接口来进行测试

@RestController
public class KafkaProducerController {
    
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @GetMapping("/send")
    public void send() {
        // testTopic为主题,和上面终端测试的保持一致,方便测试,onestar为要发送的消息
        kafkaTemplate.send("testTopic","onestar");
    }
}

【4】监听消费消息

@Component
@Slf4j
public class KafkaConsumer {
    /**
     * kafka的监听器,topic为"testTopic",消费者组默认为配置文件里面的
     */
    @KafkaListener(topics = {"testTopic"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            log.info("message:{}", msg);
        }
    }
}

【5】测试

为了方便查看,我们可以将上面的监听消息终端开着,然后运行代码,通过访问 http://localhost:8080/send 进行测试

image-20220720165435959

从打印的日志可以看到消息已经消费了,并且在监听消息的终端也打印出消息

end
  • 作者:ONESTAR(联系作者)
  • 更新时间:2022-07-20 18:09
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论

    新增邮件回复功能,回复将会通过邮件形式提醒,请填写有效的邮件!