广播变量里面的值是从redis 里面读取出来的,当redis 里面的数据变化之后,我也要更新spark 的广播变量,不知道有没有大佬们有没有处理过类似的问题呢?能否提供一些相关的处理方式?
解决方案 »
- keystone 同步数据库keystone-manage db_sync出错
- 戳破云计算真相:网络带宽有限 让“雾计算”来帮忙
- Tizen首次到访中国,上海的开发者小伙伴儿们,准备好接驾了吗?
- spark1.02怎么实现读取hbase的数据
- 使用docker搭建gitlab,发现项目不能导入,求指导!
- Docker安装问题
- 一个spark的ClassNotFound的问题
- 求教:docker 镜像制作:由平时安装操作系统的镜像转换成docker镜像的方法
- Spark分析Log文件的方式
- docker run 运行jar包,无法生成文件,代码如下
- 配置docker镜像源好像没有生效,请大神指导下
- 0基础想学习服务器的知识
我们都是通过Redis代替广播变量的你这反过来没有啥意义啊
@volatile private var broadcast:Broadcast[List[String]] = null
private var lastUpdatedTime:Date = Calendar.getInstance.getTime() //解析配置文件中的Columns配置
def getProperties(filePath:String="/home/xxx/test.properties"):List[String]={
val fileStream = new FileInputStream(filePath)
val prop = new Properties()
prop.load(fileStream)
val value = prop.getProperty("columns").split(",").toList
println("value is *******************"+value)
value
} def getInstance(sc:SparkContext,filePath:String="/home/z672898/lzw/test.properties"):Broadcast[List[String]] ={
if(broadcast==null){
synchronized{
if(broadcast==null)
broadcast = sc.broadcast(getProperties(filePath))
}
}
broadcast
} def updateAndGet(sc:SparkContext,block:Boolean=false,filePath:String="/home/z672898/lzw/test.properties"): Broadcast[List[String]] ={
val currentTime = Calendar.getInstance().getTime
//1min = 60s = 60000ms
val date_diff = currentTime.getTime -lastUpdatedTime.getTime
//3min update
if(broadcast==null||date_diff>60000){
if(broadcast != null){
/**
* unpersist(blocking):把广播变量从集群中所有保存该广播变量的工作节点的内存中移除
* 布尔类型的blocking参数指定该操作是堵塞直至变量已经从所有节点删除,还是作为异步非堵塞操作执行.
* 如果希望立刻释放内存,应该把这个参数设置为True
*/
broadcast.unpersist(block)
}
val columns = getProperties(filePath)
println("other broadcast:****************"+columns)
broadcast = sc.broadcast(columns)
//更新时间
lastUpdatedTime = Calendar.getInstance().getTime
}
broadcast
}
// 读写序列化
def writeObject(out:ObjectOutputStream): Unit ={
out.writeObject(broadcast)
} def readObject(in:ObjectInputStream): Unit ={
in.readObject().asInstanceOf[Broadcast[List[String]]]
}
}