项目中需要使用spark来实现之前通过纯java代码写的一个ETL功能,这个功能主要是从kafka读取数据,然后进行ETL,最后保存到elasticsearch。其中保存到elasticsearch的时候,有一些额外的操作,比如要查看由日期拼接的索引是否存在,如果不存在那么要使用指定的模板创建一个索引,最后再往elasticsearch插入数据。
     在网上查了一下spark集成elasticsearch的一些文章,基本上是要使用elasticsearch-hadoop这个组件来操作es,这个组件貌似只能对指定索引进行查询和插入,使用模板创建索引,查询索引是否存在这一类的操作好像没有。
     所以我的疑问就是在spark中能否使用原生的elasticsearch java client来操作elasticsearch(比如Java High Level REST Client,TransportClient)?如果不能用是什么原因?如果可以用有什么需要注意的地方? 
     同样的,还有其他一些第三方的调用,比如数据库的连接访问、restful api的访问、是否都可以脱离spark,使用原生的java api来调用?

解决方案 »

  1.   

    可以,你foreachPartition类的算子,然后在算子内,使用javaClient遍历数据进行查询。建议是用mapPartition实现过滤,然后再用elasticsearch-hadoop提供的方法进行写入。性能会好一点
      

  2.   


    我用的是structured Streaming,foreachPartition貌似不能用,mappartition可以用,structured Streaming 的writeStream的foreach也是按partition来处理的,效果和foreachPartition应该一样吧?
      

  3.   


    我用的是structured Streaming,foreachPartition貌似不能用,mappartition可以用,structured Streaming 的writeStream的foreach也是按partition来处理的,效果和foreachPartition应该一样吧?不能用foreachPartition,那你就要通过单例工厂去创建JavaClient,并做后续的操作了。