java api如何获取kafka所有Topic列表(TopicCommand,只打印),想放到list里。有大神 可以提供下思路吗
解决方案 »
- 关于CriteriaBuilder的疑问!
- struts 跳转问题
- hibernate3.5 中的slf4j报错求高手解决
- Java可以调用WCF提供的NetTcpBing协议的服务吗?
- 时间紧迫,公司逼得紧,osworkflow整合spring 的问题
- 先前我搜索了关于该问题的疑答,好象效果还不是很满意的。
- 要作毕业论文了,是关于j2ee的,希望大家推荐一套开发J2EE软件
- spring中的init-method的不解
- Tomcat 的admin怎么不能登陆 ?
- 新手求助一个构造器的问题
- 项目本来用TOMCAT5.5跑然后换到了6.0抛错
- 我想把我的电脑做为我网站的服务器,如何把域名解析到我的电脑上
public static List<String> getTopicList(String prefix, String postfix) {
ZkUtils zkUtils = ZkUtils.apply(zookeeperStr, 30000, 30000, JaasUtils.isZkSecurityEnabled());
List<String> allTopicList = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
List<String> topicList = allTopicList.stream()
.filter(topic -> topic.startsWith(prefix) && topic.endsWith(postfix))
.collect(Collectors.toList());
return topicList;
}
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;/**
* 消息消费者
*
* @author:涂有
* @date 2017年6月1日 上午11:27:10
*/
public class MsgConsumer2 { public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// key反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// value反系列化方式
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交方式
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.100.115:2092,192.168.100.115:2093,192.168.100.115:2094");// 指定broker地址,来找到group的coordinator
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tuyou");// 指定用户组 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
System.out.println(consumer.listTopics().keySet());
}
}