@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Properties props = new Properties();
 props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据
props.put("zookeeper.connect", "Master:2181");
props.put("zk.connectiontimeout.ms", "10000");
props.put("group.id", "test-consumer-group");

// Create the connection to the cluster
  ConsumerConfig consumerConfig = new ConsumerConfig(props);
  ConsumerConnector consumerConnector =  Consumer.createJavaConsumerConnector(consumerConfig);
 
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put("test", 1); // 一次从主题中获取一个数据           Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumerConnector.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get("test").get(0);// 获取每次接收到的这个数据  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator(); 
         System.out.println("consummer...");
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  

}
producer正常运行,但是consumer没有取到数据,"group.id", "test-consumer-group"是从conf中的consumer来的,消费者还需要其他配置?程序卡在while(iterator.hasNext()){  这一行了,怎么解决能让它继续运行?