java api如何获取kafka所有Topic列表(TopicCommand,只打印),想放到list里。有大神 可以提供下思路吗

解决方案 »

  1.   

    http://www.lai18.com/content/5058470.html
      

  2.   


    public static List<String> getTopicList(String prefix, String postfix) {
    ZkUtils zkUtils = ZkUtils.apply(zookeeperStr, 30000, 30000, JaasUtils.isZkSecurityEnabled());

    List<String> allTopicList = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
    List<String> topicList = allTopicList.stream()
    .filter(topic -> topic.startsWith(prefix) && topic.endsWith(postfix))
    .collect(Collectors.toList());

    return topicList;
    }
      

  3.   


    import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;/**
     * 消息消费者
     * 
     * @author:涂有
     * @date 2017年6月1日 上午11:27:10
     */
    public class MsgConsumer2 { public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// key反序列化方式
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// value反系列化方式
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交方式
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
    "192.168.100.115:2092,192.168.100.115:2093,192.168.100.115:2094");// 指定broker地址,来找到group的coordinator
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tuyou");// 指定用户组 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    System.out.println(consumer.listTopics().keySet());
    }
    }