首先想到的是通过 JDBC 加驱动的方式写入 ES,因为 ES 本身有提供 JDBC 驱动。然而实际尝试后发现 ES SQL 并不支持更改或者写入,只支持有限的查询操作。不过 ES 已经通过 Hadoop 原生支持了 Spark,即 elasticsearch-hadoop
库。官方文档提供了 scala 的详细使用说明,并没有详细说明 pyspark 的使用,但通过拼接 java class 的配置参数,pyspark 其实也能一样调用 elasticsearch-hadoop
。
首先去下载 jar 包,可以点这里找到最近版本手动下载,或者添加 maven 配置
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>7.16.3</version>
</dependency>
接下来在 pyspark 启动参数 --jars
中加入 elasticsearch-hadoop-xxx.jar
或者把 jar 包放入 classpath,这样就载入了对应 class。
spark 由于版本原因,2.x 中可用的多合一支持大包 elasticsearch-hadoop-7.16.2.jar
到 3.x 及以上就不兼容了,需要使用单独支持 spark 的 elasticsearch-spark-30.2.12.jar
。
启动 pyspark,假设有一个 DataFrame df
需写入本地一个 elasticsearch 服务。下边用一个 notebook 示例。
首先,elasticsearch 原生不支持 spark 中的 DecimalType
数据类型,所以需要把 DecimalType
类型的列都转换成浮点型 DoubleType
,如上文函数 _cast_decimal_to_double
所示。
在上边的例子中,最后写入 dataframe 时各选项含义:
.format('es')
使用 elasticsearch-hadoop 写入 rdd 到 elasticsearch 服务中。实际上这里'es'
只是'org.elasticsearch.spark.sql'
注册的别名,官方文档中写全了是.format('org.elasticsearch.spark.sql')
;.mode('overwrite')
采用 spark 覆盖写入的方式,在 elasticsearch-hadoop 中体现的行为就是每次写入会先清空指定的 index,然后再写入新的记录;'es.resource': 'test_index'
。这里指定df
要被写入到名为test_index
的 index 中。此处也可指定文档类型,如'test_index/titanic'
就指定写入到test_index
中,并且文档类型为titanic
;'es.nodes.wan.only': 'true'
。这里表示连接到 elasticsearch 集群时禁用地址嗅探,只通过配置项es.nodes
提供的地址去连接。注意,如果要连接在内网环境中部署的 elasticsearch 而不打开本条设置,spark 客户端很可能会嗅探到错误的内网地址,从而一直卡住而无法连接。关于地址嗅探和其他有关的详细内容参看官方文档;es.nodes
、es.port
分别指定要连接 elasticsearch 服务的地址端口,其中es.nodes
可以指定一个集群的多个地址;es.net.http.auth.user
、es.net.http.auth.pass
指定用户名密码;
实际上除了 spark 中定义的 .mode('overwrite')
、.mode('append')
,elasticsearch-hadoop 本身也定义四种不同的写入行为(参见文档中选项 es.write.operation
):
index
,这是默认行为,新增文档记录,并由 elasticsearch 生成每条数据的 id,重复 id 的老数据会被替换;create
,同样是新增文档记录,但遇到重复 id 的老数据会抛异常;update
,根据 id 更新对应文档;upsert
,根据 id 合并多条记录,即新增不存在的记录,并更新已有的记录。注意这种方式还需配合使用 spark 的写入模式.mode('append')
,同时还必须指定 dataframe 中哪一列要作为 id 去更新旧的记录(选项es.mapping.id
)。比如.option('es.mapping.id', 'UserID')
这里指定UserID
列作为关键字段去更新 es 对应 index 中已有的文档记录;