@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据
props.put("zookeeper.connect", "Master:2181");
props.put("zk.connectiontimeout.ms", "10000");
props.put("group.id", "test-consumer-group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1); // 一次从主题中获取一个数据 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get("test").get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
System.out.println("consummer...");
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
producer正常运行,但是consumer没有取到数据,"group.id", "test-consumer-group"是从conf中的consumer来的,消费者还需要其他配置?程序卡在while(iterator.hasNext()){ 这一行了,怎么解决能让它继续运行?
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货