消息中间件-Kafka3-kafkaJavaClient小例
- Kafak Java Client
private static final String KAFKA_TOPIC = "kafak-test";private static String bootstrapServers = "localhost:9092";private static AdminClient client = null;static {Properties config = new Properties();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);client = AdminClient.create(config);}
在pom.xml 添加kafka client依赖
- 客户端创建主题
@Test
public void createTopic() {try {NewTopic topic = new NewTopic(KAFKA_TOPIC, 1, (short) 1);// 提交创建topic请求client.createTopics(Collections.singleton(topic)).all().get();System.out.println("Topic created successfully");}catch (Exception e) {e.printStackTrace();}finally {if (client != null) client.close();}
}
- 客户端获取主题
@Test
public void fetchTopics() {try {ListTopicsResult result = client.listTopics();KafkaFuture<Set<String>> set = result.names();System.out.println(set.get());}catch (Exception e) {e.printStackTrace();}finally {if (client != null) client.close();}}
- 客户端生产者发送消息
@Test
public void produceMsg() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {// 创建生产者实例producer = new KafkaProducer<>(props);// 发送消息producer.send(new ProducerRecord<>(KAFKA_TOPIC, "MSG-1005","Hello, 1005!"), (metadata, exception) ->{if (exception == null) {System.out.println("消息发送成功,主题:" + metadata.topic() + ", 分区:" + metadata.partition());}else {exception.printStackTrace();}});}catch (Exception e) {e.printStackTrace();}finally {// 关闭生成者if (producer != null) producer.close();}
}
- 客户端消费者消费消息
@Test
public void consumeMsg() {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "kafka-consumer-group-001");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = null;try {// 创建消费者实例consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Arrays.asList(KAFKA_TOPIC));// 轮询消费消息while(true) {ConsumerRecords<String, String> records = consumer.poll(100); // 每100ms执行一次for (ConsumerRecord record : records) {System.out.printf("Offset: %d, Key: %s, Value: %s\n", record.offset(), record.key(), record.value());}}}catch (Exception e) {e.printStackTrace();}finally {if (consumer != null) consumer.close();}
}
后续将使用这些测试小例来调试Kafka源码,当然也可以执行Kafka自带的可执行脚本与kafka交互,进行源码分析,只是通过java代码的方式更加直观。