In [1]:
spark
Out[1]:

SparkSession

SparkContext

Spark UI

Version
v3.0.1
AppName
spark-shell
In [2]:
import pandas as pd
df = pd.read_csv('./titanic.csv').fillna('').convert_dtypes()
df.Age = df.Age.replace({'': 0})
df = spark.createDataFrame(df)
In [3]:
# 所有 Decimal 类型的列转换为 Double 类型
from pyspark import sql
from pyspark.sql import types as T, functions as F

def _cast_decimal_to_double(table: sql.DataFrame) -> sql.DataFrame:
    return table.select(
        *(
            F.col(field.name).cast(T.DoubleType())
            if isinstance(field.dataType, T.DecimalType)
            else F.col(field.name)
            for field in table.schema
        )
    )

df = _cast_decimal_to_double(df)
In [4]:
df.write.format('es') \
    .mode('overwrite') \
    .options(**{
        'es.resource': 'test_index',
        'es.nodes.wan.only': 'true',
        'es.nodes': 'localhost',
        'es.port': 9200,
        'es.net.http.auth.user': 'elastic',
        'es.net.http.auth.pass': 'elastic'
    }) \
    .save()