kafka消费者能正常消费,能从kafka集群中取到数据,消费者的逻辑是这样的。 KafkaStream<byte[], byte[]> streamt = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = streamt.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message())); // now launch all the threads
executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}但是全部数据消费完之后,再执行到 while (it.hasNext())时,就会出现这个问题:2016-10-23 16:36:04 [main-SendThread(localhost:2181):1414357] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 0ms
2016-10-23 16:36:07 [main-SendThread(localhost:2181):1417025] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms
2016-10-23 16:36:10 [main-SendThread(localhost:2181):1419692] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms
2016-10-23 16:36:12 [main-SendThread(localhost:2181):1422359] - [DEBUG] Got ping response for sessionid: 0x157f072a16d000f after 1ms而且每隔1秒打印一次这个,程序不往下进行了。 是不是kafka连不上zookeeper呢 ?大神们帮忙看看啊,,。。这个不知道怎么解决了。。
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货