在尝试用sparkstreaming消费kafka topic数据时,在生产环境上编译发现程序卡住不执行。在虚拟机环境一切正常。代码如下:
package kafkaimport org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.ACLimport scala.collection.mutable.ListBuffer
import org.apache.zookeeper.ZooDefs
import org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._object consumer_test { private val logger=LoggerFactory.getLogger(consumer_test.getClass) def readOffsets(topics: Seq[String], group: String, zkUtils: ZkUtils): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(group, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception => topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
0L)
} })
})
//println(topicPartOffsetMap.toMap.foreach(i=>println(i._2)))
topicPartOffsetMap.toMap
}
def persistOffsets(offsets: Seq[OffsetRange], group: String, storeEndOffset: Boolean, zkUtils: ZkUtils) = { offsets.foreach(or => {
val zKGroupTopicDirs = new ZKGroupTopicDirs(group, or.topic) val acls = new ListBuffer[ACL]()
val acl = new ACL()
acl.setId(ZooDefs.Ids.ANYONE_ID_UNSAFE)
acl.setPerms(ZooDefs.Perms.ALL)
acls += acl
val offsetPath = zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset zkUtils.updatePersistentPath(zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition,
offsetVal + "", acls.toList) })
}
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("example").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(5))
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.11.23:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "console-consumer-71817",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
) val topics = Array("cloudalarm2012") val zkUrl = "192.168.11.23:2181,192.168.11.24:2181,192.168.11.26:2181"
val ssessionTimeOut = 9999
val connectionTimeOut = 9999 val zkClientAndConnection = ZkUtils.createZkClientAndConnection(
zkUrl,
ssessionTimeOut,
connectionTimeOut
)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
//ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, readOffsets(topics, kafkaParams.apply("group.id").toString, zkUtils))
) inputDStream.foreachRDD((rdd, bacthTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => {
logger.warn(s"topic: ${offset.topic}---parttition: ${offset.partition}---fromOffset: ${offset.fromOffset}---untilOffset: ${offset.untilOffset}") val count = rdd.map(message => message.value()).count() logger.warn(s"count: $count")
rdd.coalesce(1).foreach(println) persistOffsets(offsetRanges.toSeq, kafkaParams.apply("group.id").toString, true, zkUtils)
})
})
ssc.start()
ssc.awaitTermination()
}}生产环境运行状态如图,会一直卡在那里无法执行下去
package kafkaimport org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.ACLimport scala.collection.mutable.ListBuffer
import org.apache.zookeeper.ZooDefs
import org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._object consumer_test { private val logger=LoggerFactory.getLogger(consumer_test.getClass) def readOffsets(topics: Seq[String], group: String, zkUtils: ZkUtils): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(group, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception => topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
0L)
} })
})
//println(topicPartOffsetMap.toMap.foreach(i=>println(i._2)))
topicPartOffsetMap.toMap
}
def persistOffsets(offsets: Seq[OffsetRange], group: String, storeEndOffset: Boolean, zkUtils: ZkUtils) = { offsets.foreach(or => {
val zKGroupTopicDirs = new ZKGroupTopicDirs(group, or.topic) val acls = new ListBuffer[ACL]()
val acl = new ACL()
acl.setId(ZooDefs.Ids.ANYONE_ID_UNSAFE)
acl.setPerms(ZooDefs.Perms.ALL)
acls += acl
val offsetPath = zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset zkUtils.updatePersistentPath(zKGroupTopicDirs.consumerOffsetDir + "/" + or.partition,
offsetVal + "", acls.toList) })
}
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("example").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(5))
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.11.23:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "console-consumer-71817",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
) val topics = Array("cloudalarm2012") val zkUrl = "192.168.11.23:2181,192.168.11.24:2181,192.168.11.26:2181"
val ssessionTimeOut = 9999
val connectionTimeOut = 9999 val zkClientAndConnection = ZkUtils.createZkClientAndConnection(
zkUrl,
ssessionTimeOut,
connectionTimeOut
)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
//ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, readOffsets(topics, kafkaParams.apply("group.id").toString, zkUtils))
) inputDStream.foreachRDD((rdd, bacthTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => {
logger.warn(s"topic: ${offset.topic}---parttition: ${offset.partition}---fromOffset: ${offset.fromOffset}---untilOffset: ${offset.untilOffset}") val count = rdd.map(message => message.value()).count() logger.warn(s"count: $count")
rdd.coalesce(1).foreach(println) persistOffsets(offsetRanges.toSeq, kafkaParams.apply("group.id").toString, true, zkUtils)
})
})
ssc.start()
ssc.awaitTermination()
}}生产环境运行状态如图,会一直卡在那里无法执行下去
解决方案 »
- openstack 安装glance的问题
- web service 项目 部署到云平台上 无法访问
- docker中的image如何让多个tag链接到同一个id上
- 关于Xen Hypercall 关于Xen源码 关于do_memory_op()
- 请教,什么情况下rds master 和replica 占用的空间不一样,没有lag?
- centos7.3中安装docker后启动docker,服务器ssh连不上,但是能ping通
- spark rdd不能嵌套问题,求大佬解答一下
- 讨论: 关于广播变量的用法
- 仓库;阿里云乌班图镜像没有Release文件
- docker 部署MySQL5.7 用yum 下载的 无法启动
- 新手,IDEA 远程提交任务失败
- docker挂载问题,求助
在这部分 inputDStream.foreachRDD[rdd=>print(rdd)} 获取rdd的相关属性