广播变量里面的值是从redis 里面读取出来的,当redis 里面的数据变化之后,我也要更新spark 的广播变量,不知道有没有大佬们有没有处理过类似的问题呢?能否提供一些相关的处理方式?

解决方案 »

  1.   

    我自己先顶一下,目前这个问题百度了很多 broadcast.doUnpersist(true);方法 这个方法是再driver 端执行的,也就是说整个spark 程序只会执行一边,当所有任务执行器再计算rdd 的时候都不会取执行这个broadcast.doUnpersist(true);  这个方法;所以我在想spark 又上面机制能让执行器强制中断,然后回到driver 端 执行我想执行的操作,然后执行器再恢复执行任务!  这句话可能表述的会违背sprak 的某些原则,但是我想要的类似的效果,不知道各位大牛们有没有相关的处理经验分享一下!
      

  2.   

    经过实验 sparkstreaming 使用广播变量 在yarn模式下是回报空异常的,所以还是使用别的方案
      

  3.   

    可以在spark的driver端创建一个线程,定时调用sc.broadcast,测试过,可用
      

  4.   

    在yarn 模式上会报空异常
      

  5.   

    在yarn模式上可以操作吗?我试过会报空异常的哦!可不可以贴你成功的代码瞧瞧
      

  6.   

    反正都是从Redis上读取的,用的时候直接从Redis读不就完了?为什么还要广播呢?
    我们都是通过Redis代替广播变量的你这反过来没有啥意义啊
      

  7.   

    因为不想反复从redis取数据,而且还想定时更新,所以想用广播变量并且能够动态修改
      

  8.   

    楼主有解决吗?我尝试在读取本地文件,序列化然后通过广播变量广播出去,并且定时更新。同样是spark on yarn模式,现在是无法读取更新的文件值,更奇怪的是spark-submit再次提交作业,读取出的文件中的值还是更新前的值。不知道哪里写的有问题,代码如下:object BroadcastWrapper {
      @volatile private var broadcast:Broadcast[List[String]] = null
      private var lastUpdatedTime:Date = Calendar.getInstance.getTime()  //解析配置文件中的Columns配置
      def getProperties(filePath:String="/home/xxx/test.properties"):List[String]={
        val fileStream = new FileInputStream(filePath)
        val prop = new Properties()
        prop.load(fileStream)
        val value = prop.getProperty("columns").split(",").toList
        println("value is *******************"+value)
        value
      }  def getInstance(sc:SparkContext,filePath:String="/home/z672898/lzw/test.properties"):Broadcast[List[String]] ={
        if(broadcast==null){
          synchronized{
            if(broadcast==null)
              broadcast = sc.broadcast(getProperties(filePath))
          }
        }
        broadcast
      }  def updateAndGet(sc:SparkContext,block:Boolean=false,filePath:String="/home/z672898/lzw/test.properties"): Broadcast[List[String]] ={
        val currentTime = Calendar.getInstance().getTime
        //1min = 60s = 60000ms
        val date_diff = currentTime.getTime -lastUpdatedTime.getTime
        //3min update
        if(broadcast==null||date_diff>60000){
          if(broadcast != null){
            /**
              * unpersist(blocking):把广播变量从集群中所有保存该广播变量的工作节点的内存中移除
              * 布尔类型的blocking参数指定该操作是堵塞直至变量已经从所有节点删除,还是作为异步非堵塞操作执行.
              * 如果希望立刻释放内存,应该把这个参数设置为True
              */
            broadcast.unpersist(block)
          }
          val columns = getProperties(filePath)
          println("other broadcast:****************"+columns)
          broadcast = sc.broadcast(columns)
          //更新时间
          lastUpdatedTime = Calendar.getInstance().getTime
        }
        broadcast
      }
      // 读写序列化
      def writeObject(out:ObjectOutputStream): Unit ={
        out.writeObject(broadcast)
      }  def readObject(in:ObjectInputStream): Unit ={
        in.readObject().asInstanceOf[Broadcast[List[String]]]
      }
    }
      

  9.   

    可以使用广播变量,但是好像只能在dstream.foreachRdd算子里面才能用,否则报空指针异常
      

  10.   

    可以实现的,从hdfs上更新文件,每天定时更新广播变量