package rizhichuli.sparkstreaming
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.util._
//import org.apache.spark.examples.streaming._
import org.apache.kafka.serializer.StringDecoder
import org.apache.spark.streaming.util._
import kafka.utils.VerifiableProperties //红色的为后加上的,并且有错,无法找到kafka下的utils等包
/**
* @author ${穆金星}
*/
object App {
def main(args : Array[String]) {
//if(args.length<4){
// System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
// System.exit(1)
// }
System.setProperty("hadoop.home.dir", "D:\\appbase\\hadoop-2.7.4\\hadoop-2.7.4")
val topic=Set("linuxSysteminfos")
val brokers="10.218.7.232:9092"
val sparkConf=new SparkConf().setMaster("local[2]").setAppName("App")
val sc=new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//ssc.sparkContext.setLogLevel("ERROR")
val kafkaParams=Map[String,String]("metadata.broker.list"->brokers,"serializer.class"->"kafka.serializer.StringEnvoder")
val kafkaStream=KafkaUtils.createDirectStream(ssc,kafkaParams,topic)
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}}
运行后出现的错误是17/09/20 22:35:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/20 22:35:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)添加kafka.utils.VerifiableProperties类后发现object utils is not a member of package org.apache.kafka但是确定kafka包已经包含在了maven中且其中有此类,忘各位大神能够帮忙解答
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.util._
//import org.apache.spark.examples.streaming._
import org.apache.kafka.serializer.StringDecoder
import org.apache.spark.streaming.util._
import kafka.utils.VerifiableProperties //红色的为后加上的,并且有错,无法找到kafka下的utils等包
/**
* @author ${穆金星}
*/
object App {
def main(args : Array[String]) {
//if(args.length<4){
// System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
// System.exit(1)
// }
System.setProperty("hadoop.home.dir", "D:\\appbase\\hadoop-2.7.4\\hadoop-2.7.4")
val topic=Set("linuxSysteminfos")
val brokers="10.218.7.232:9092"
val sparkConf=new SparkConf().setMaster("local[2]").setAppName("App")
val sc=new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//ssc.sparkContext.setLogLevel("ERROR")
val kafkaParams=Map[String,String]("metadata.broker.list"->brokers,"serializer.class"->"kafka.serializer.StringEnvoder")
val kafkaStream=KafkaUtils.createDirectStream(ssc,kafkaParams,topic)
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}}
运行后出现的错误是17/09/20 22:35:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/20 22:35:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)添加kafka.utils.VerifiableProperties类后发现object utils is not a member of package org.apache.kafka但是确定kafka包已经包含在了maven中且其中有此类,忘各位大神能够帮忙解答
解决方案 »
- 关于VMware ESX/ESXi Server Support Openstack
- 问一下,/var/lib/nova/instances/_base 目录有啥用
- 求助----安装glance时出错
- 请教关于keystone认证的问题:“'NoneType' object has no attribute 'has_service_catalog'”
- 使用neutron创建了一个router,可是网关端口总是down
- Spark中如何用Java构造Graph
- RDD的map过程中调用sqlContext,输出为空
- 服务器
- 新人求教一个生成镜像的问题
- 关于dockerfile在build时遇到mysql无法启动的问题
- 想购买云服务器做镜像(备份)网站,希望能方便同步原主机的数据
- 视频存储是否能再进一步优化
修改为下面试试,指定函数上的类型参数
val kafkaStream=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topic)