PySpark DataFrame 写入 Elasticsearch

首先想到的是通过 JDBC 加驱动的方式写入 ES,因为 ES 本身有提供 JDBC 驱动。然而实际尝试后发现 ES SQL 并不支持更改或者写入,只支持有限的查询操作。不过 ES 已经通过 Hadoop 原生支持了 Spark,即 elasticsearch-hadoop 库。官方文档提供了 scala 的详细使用说明,并没有详细说明 pyspark 的使用,但通过拼接 java class 的配置参数,pyspark 其实也能一样调用 elasticsearch-hadoop

首先去下载 jar 包,可以点这里找到最近版本手动下载,或者添加 maven 配置

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-30_2.12</artifactId> 
  <version>7.16.3</version>
</dependency>

接下来在 pyspark 启动参数 --jars 中加入 elasticsearch-hadoop-xxx.jar 或者把 jar 包放入 classpath,这样就载入了对应 class。

spark 由于版本原因,2.x 中可用的多合一支持大包 elasticsearch-hadoop-7.16.2.jar 到 3.x 及以上就不兼容了,需要使用单独支持 spark 的 elasticsearch-spark-30.2.12.jar

启动 pyspark,假设有一个 DataFrame df 需写入本地一个 elasticsearch 服务。下边用一个 notebook 示例。

首先,elasticsearch 原生不支持 spark 中的 DecimalType 数据类型,所以需要把 DecimalType 类型的列都转换成浮点型 DoubleType,如上文函数 _cast_decimal_to_double 所示。

在上边的例子中,最后写入 dataframe 时各选项含义:

  • .format('es') 使用 elasticsearch-hadoop 写入 rdd 到 elasticsearch 服务中。实际上这里 'es' 只是 'org.elasticsearch.spark.sql' 注册的别名,官方文档中写全了是 .format('org.elasticsearch.spark.sql')
  • .mode('overwrite') 采用 spark 覆盖写入的方式,在 elasticsearch-hadoop 中体现的行为就是每次写入会先清空指定的 index,然后再写入新的记录;
  • 'es.resource': 'test_index'。这里指定 df 要被写入到名为 test_index 的 index 中。此处也可指定文档类型,如 'test_index/titanic' 就指定写入到 test_index 中,并且文档类型为 titanic
  • 'es.nodes.wan.only': 'true'。这里表示连接到 elasticsearch 集群时禁用地址嗅探,只通过配置项 es.nodes 提供的地址去连接。注意,如果要连接在内网环境中部署的 elasticsearch 而不打开本条设置,spark 客户端很可能会嗅探到错误的内网地址,从而一直卡住而无法连接。关于地址嗅探和其他有关的详细内容参看官方文档
  • es.nodeses.port 分别指定要连接 elasticsearch 服务的地址端口,其中 es.nodes 可以指定一个集群的多个地址;
  • es.net.http.auth.useres.net.http.auth.pass 指定用户名密码;

实际上除了 spark 中定义的 .mode('overwrite').mode('append'),elasticsearch-hadoop 本身也定义四种不同的写入行为(参见文档中选项 es.write.operation):

  • index,这是默认行为,新增文档记录,并由 elasticsearch 生成每条数据的 id,重复 id 的老数据会被替换;
  • create,同样是新增文档记录,但遇到重复 id 的老数据会抛异常;
  • update,根据 id 更新对应文档;
  • upsert,根据 id 合并多条记录,即新增不存在的记录,并更新已有的记录。注意这种方式还需配合使用 spark 的写入模式 .mode('append'),同时还必须指定 dataframe 中哪一列要作为 id 去更新旧的记录(选项 es.mapping.id)。比如 .option('es.mapping.id', 'UserID') 这里指定 UserID 列作为关键字段去更新 es 对应 index 中已有的文档记录;