1:拉去管理kafka界面UI镜像
docker pull provectuslabs/kafka-ui
2:拉去管理kafka镜像
docker pull bitnami/kafka
3:docker-compose.yml
version : '3.8'
services : zookeeper-1 : container_name : zookeeper1image : bitnami/zookeeperports : - "2181:2181" environment : - ALLOW_ANONYMOUS_LOGIN=yeskafka-1 : container_name : kafka1image : bitnami/kafka ports : - "19092:19092" environment : - KAFKA_CFG_ZOOKEEPER_CONNECT=192.168.11.50: 2181 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT: //192.168.11.50: 19092 - KAFKA_CFG_LISTENERS=PLAINTEXT: //: 19092 - ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_BROKER_ID=1kafka-2 : container_name : kafka2image : bitnami/kafkaports : - "29092:29092" environment : - KAFKA_CFG_ZOOKEEPER_CONNECT=192.168.11.50: 2181 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT: //192.168.11.50: 29092 - KAFKA_CFG_LISTENERS=PLAINTEXT: //: 29092 - ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_BROKER_ID=2 depends_on : - kafka- 1 kafka-3 : container_name : kafka3image : bitnami/kafkaports : - "39092:39092" environment : - KAFKA_CFG_ZOOKEEPER_CONNECT=192.168.11.50: 2181 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT: //192.168.11.50: 39092 - KAFKA_CFG_LISTENERS=PLAINTEXT: //: 39092 - ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_BROKER_ID=3depends_on : - kafka- 1 - kafka- 2 kafka-ui : container_name : kafka- uiimage : provectuslabs/kafka- uiports : - "8080:8080" environment : - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.11.50: 19092 , 192.168.11.50: 29092 , 192.168.11.50: 39092
4:springboot项目发布和消费kafka
4-1:application.yml
server : port : 9088 spring : kafka : consumer : bootstrap-servers : localhost: 19092 , localhost: 29093 , localhost: 39092 group-id : test- groupauto-offset-reset : earliestproducer : bootstrap-servers : localhost: 19092 , localhost: 29092 , localhost: 39092 key-serializer : org.apache.kafka.common.serialization.StringSerializervalue-serializer : org.apache.kafka.common.serialization.StringSerializer
4-2:消费者
package com. example. kafkademo. config ; import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. kafka. annotation. KafkaListener ;
import org. springframework. stereotype. Service ;
@Service
public class KafkaConsumerService { private static final Logger LOGGER = LoggerFactory . getLogger ( KafkaConsumerService . class ) ; @KafkaListener ( topics = "topic" ) public void receiveMessage ( String message) { LOGGER . info ( "received message='{}'" , message) ; }
}
4-3:生产者
package com. example. kafkademo. config ; import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. stereotype. Service ;
@Service
public class KafkaProducerService { private static final Logger LOGGER = LoggerFactory . getLogger ( KafkaProducerService . class ) ; @Autowired private KafkaTemplate < String , String > kafkaTemplate; public void sendMessage ( String topic, String message) { LOGGER . info ( "sending message='{}' to topic='{}'" , message, topic) ; kafkaTemplate. send ( topic, message) ; }
}
4-4:controller
package com. example. kafkademo. controller ; import com. example. kafkademo. config. KafkaProducerService ;
import org. springframework. stereotype. Controller ;
import org. springframework. web. bind. annotation. PostMapping ;
import org. springframework. web. bind. annotation. RestController ; import javax. annotation. Resource ;
@RestController
public class KafkaController { @Resource KafkaProducerService kafkaProducerService; @PostMapping ( "/publish" ) public String publish ( String topic, String content) { kafkaProducerService. sendMessage ( "topic" , content) ; System . out. println ( "content" ) ; return content; }
}
4-5:pom
< ? xml version= "1.0" encoding= "UTF-8" ? >
< project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns: xsi= "http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation= "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion> 4.0 .0 < / modelVersion> < parent> < groupId> org. springframework. boot< / groupId> < artifactId> spring- boot- starter- parent< / artifactId> < version> 2.7 .16 < / version> < relativePath/ > < ! -- lookup parent from repository -- > < / parent> < groupId> com. example< / groupId> < artifactId> kafkaDemo< / artifactId> < version> 0.0 .1 - SNAPSHOT < / version> < name> kafkaDemo< / name> < description> kafkaDemo< / description> < properties> < java. version> 1.8 < / java. version> < / properties> < dependencies> < dependency> < groupId> org. springframework. boot< / groupId> < artifactId> spring- boot- starter< / artifactId> < / dependency> < dependency> < groupId> org. springframework. kafka< / groupId> < artifactId> spring- kafka< / artifactId> < / dependency> < dependency> < groupId> org. springframework. boot< / groupId> < artifactId> spring- boot- starter- test< / artifactId> < scope> test< / scope> < / dependency> < dependency> < groupId> org. springframework. kafka< / groupId> < artifactId> spring- kafka- test< / artifactId> < scope> test< / scope> < / dependency> < dependency> < groupId> org. springframework. boot< / groupId> < artifactId> spring- boot- starter- web< / artifactId> < / dependency> < / dependencies> < build> < plugins> < plugin> < groupId> org. springframework. boot< / groupId> < artifactId> spring- boot- maven- plugin< / artifactId> < / plugin> < / plugins> < / build> < / project>
5:命令行方式启动kafka
docker run -d --name kafka2 -p 19092 :19092 -e KAFKA_CFG_ZOOKEEPER_CONNECT = 192.168 .11.50:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS = PLAINTEXT://192.168.11.50:19092 -e KAFKA_CFG_LISTENERS = PLAINTEXT://:19092 -e ALLOW_PLAINTEXT_LISTENER = yes -e KAFKA_BROKER_ID = 2 bitnami/kafka
6:命令行方式启动kafka-ui
docker run -d --name kafka-ui -p 8080 :8080 -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS = 192.168 .11.50:19092 provectuslabs/kafka-ui
7:命令行方式启动zookeeper
docker run -d --name zookeeper -p 2181 :2181 -e ALLOW_ANONYMOUS_LOGIN = yes bitnami/zookeeper