在网上看到了一个分析某些数据的一个帖子,于是就down下来运行,错误出了一大堆,本人新手一枚,求各位大神帮忙看看怎么解决这个问题以下是源码:
# -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContext
import reconf = SparkConf().setMaster("local").setAppName("MY First App")
sc = SparkContext(conf=conf)
csdnRDD = sc.textFile("data/test.txt")
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
tmprdd2 = tmprdd1.map(lambda x: x.split("___csdn_1quot")[0])
tmprdd4 = tmprdd2.filter(lambda x: re.match("\\w+([‐+.]\\w+)*@\\w+([‐.]\\w+)*\\.\\w+([‐.]\\w+)*", str(x)))
tmprdd5 = tmprdd4.map(lambda x: str(x).split("@")[1])
tmprdd6 = tmprdd5.map(lambda x: x.lower())
tmprdd7 = tmprdd6.map(lambda x: (x, 1)).cache()
num = tmprdd7.count()
tmprdd8 = tmprdd7.reduceByKey(lambda x, y: x + y).cache()
tmprdd9 = tmprdd8.map(lambda x: [x[1], x[0]])
tmprdd10 = tmprdd9.sortBy(ascending=False, numPartitions=None, keyfunc=lambda x: x)
res = tmprdd10.take(10)
for each in res:
    print(each, str((each[0] / num) * 100) + "%")以下是错误代码:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
    tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/03/15 22:13:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
    tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)17/03/15 22:13:23 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\workspace\Python_Spark\Spark_Study\test.py", line 15, in <module>
    num = tmprdd7.count()
  File "C:\Python27\lib\pyspark\rdd.py", line 1008, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\Python27\lib\pyspark\rdd.py", line 999, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "C:\Python27\lib\pyspark\rdd.py", line 873, in fold
    vals = self.mapPartitions(func).collect()
  File "C:\Python27\lib\pyspark\rdd.py", line 776, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
  File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
    tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

解决方案 »

  1.   

    上边错误代码没有贴完,这是剩下的:
    Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:911) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
      File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
      File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
        tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
    IndexError: list index out of range
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
      

  2.   

    我是参考这个帖子搭建的Python+spark,帖子结尾的程序也运行成功了
    http://blog.csdn.net/hjxinkkl/article/details/57083549?winzoom=1
      

  3.   

    你全是win环境
    代码没有什么太多的问题 spark环境检查 测试pyspark能否正常使用
    再像你这样提交spark作业
      

  4.   

    tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
    x.split("\t")会产生一个list,有些数据是异常异常,产生的list不一定会有三个元素,所以就会异常退出。
    你可以使用csdnRDD.map(lambda x:x.split("\t")).filter(lambda x:len(x)<3) 看看有哪一写异常数据,然后确定如何过滤掉这些异常数据。