spark
import pandas as pd
df = pd.read_csv('./titanic.csv').fillna('').convert_dtypes()
df.Age = df.Age.replace({'': 0})
df = spark.createDataFrame(df)
# 所有 Decimal 类型的列转换为 Double 类型
from pyspark import sql
from pyspark.sql import types as T, functions as F
def _cast_decimal_to_double(table: sql.DataFrame) -> sql.DataFrame:
return table.select(
*(
F.col(field.name).cast(T.DoubleType())
if isinstance(field.dataType, T.DecimalType)
else F.col(field.name)
for field in table.schema
)
)
df = _cast_decimal_to_double(df)
df.write.format('es') \
.mode('overwrite') \
.options(**{
'es.resource': 'test_index',
'es.nodes.wan.only': 'true',
'es.nodes': 'localhost',
'es.port': 9200,
'es.net.http.auth.user': 'elastic',
'es.net.http.auth.pass': 'elastic'
}) \
.save()