项目中需要使用spark来实现之前通过纯java代码写的一个ETL功能,这个功能主要是从kafka读取数据,然后进行ETL,最后保存到elasticsearch。其中保存到elasticsearch的时候,有一些额外的操作,比如要查看由日期拼接的索引是否存在,如果不存在那么要使用指定的模板创建一个索引,最后再往elasticsearch插入数据。
在网上查了一下spark集成elasticsearch的一些文章,基本上是要使用elasticsearch-hadoop这个组件来操作es,这个组件貌似只能对指定索引进行查询和插入,使用模板创建索引,查询索引是否存在这一类的操作好像没有。
所以我的疑问就是在spark中能否使用原生的elasticsearch java client来操作elasticsearch(比如Java High Level REST Client,TransportClient)?如果不能用是什么原因?如果可以用有什么需要注意的地方?
同样的,还有其他一些第三方的调用,比如数据库的连接访问、restful api的访问、是否都可以脱离spark,使用原生的java api来调用?
在网上查了一下spark集成elasticsearch的一些文章,基本上是要使用elasticsearch-hadoop这个组件来操作es,这个组件貌似只能对指定索引进行查询和插入,使用模板创建索引,查询索引是否存在这一类的操作好像没有。
所以我的疑问就是在spark中能否使用原生的elasticsearch java client来操作elasticsearch(比如Java High Level REST Client,TransportClient)?如果不能用是什么原因?如果可以用有什么需要注意的地方?
同样的,还有其他一些第三方的调用,比如数据库的连接访问、restful api的访问、是否都可以脱离spark,使用原生的java api来调用?
解决方案 »
- openstack配置时提示需要两张网卡,但电脑只有一张网卡怎么办啊?
- config里面的问题,帮忙解决下
- 新手关于云计算技术的几个疑问 OpenStack & Hadoop
- openstack创建实例失败,scheduler.log报错误如下,怎么解决啊?
- pause frame的问题
- hadoop的各个守护节点都”正常“启动 但50030和50070打不开 还发现每个节点日志都有错误
- 虚拟机动态迁移,有没有采用后拷贝算法实现的?
- spark streaming, kafka导入数据到es性能调优
- AWS-CPP-SDK 上传文件失败
- 想进入财富2000强?至少你要有它来帮忙
- SparkSQL读取Hbase表,操作DataFrame时报错
- centos7 不小心把网卡的id修改了怎么办,不想重装系统。求好心人给解决一下。
我用的是structured Streaming,foreachPartition貌似不能用,mappartition可以用,structured Streaming 的writeStream的foreach也是按partition来处理的,效果和foreachPartition应该一样吧?
我用的是structured Streaming,foreachPartition貌似不能用,mappartition可以用,structured Streaming 的writeStream的foreach也是按partition来处理的,效果和foreachPartition应该一样吧?不能用foreachPartition,那你就要通过单例工厂去创建JavaClient,并做后续的操作了。