Agile Data Science 2.0书中代码.
By:Roy.LiuLast updated:2019-08-27
先记录下来,工作中有可能会用到, pyspark+elasticsearch,github地址:https://github.com/rjurney/Agile_Data_Code_2
要把数据从PySpark 写入Elasticsearch 中(或者从Elasticsearch 读取数据到PySpark 中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。在我准备好的映像中,我们已经为本项目配置好了PySpark,因此你不用再做什么就可以加载这个库了。如果你是手动安装的,我们也可以通过安装脚本获得类似的配置。
让PySpark 数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark 保存到Elasticsearch 中:
搜索数据
搜索结果
要把数据从PySpark 写入Elasticsearch 中(或者从Elasticsearch 读取数据到PySpark 中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。在我准备好的映像中,我们已经为本项目配置好了PySpark,因此你不用再做什么就可以加载这个库了。如果你是手动安装的,我们也可以通过安装脚本获得类似的配置。
让PySpark 数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark 保存到Elasticsearch 中:
csv_lines = sc.textFile("data/example.csv")
data = csv_lines.map(lambda line: line.split(","))
schema_data = data.map(
lambda x: ('ignored_key', {'name': x[0], 'company': x[1], 'title': x[2]})
)
schema_data.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource": "agile_data_science/executives"})
搜索数据
curl 'localhost:9200/agile_data_science/executives/_search?q=name:Russell*&pretty'
搜索结果
{
"took": 19,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1.0,
"hits": [
{
"_index": "agile_data_science",
"_type": "executives",
"_id": "AVrfrAbdfdS5Z0IiIt78",
"_score": 1.0,
"_source": {
"company": "Relato",
"name": "Russell Jurney",
"title": "CEO"
}
},
{
"_index": "agile_data_science",
"_type": "executives",
"_id": "AVrfrAbdfdS5Z0IiIt79",
"_score": 1.0,
"_source": {
"company": "Data Syndrome",
"name": "Russell Jurney",
"title": "Principal Consultant"
}
}
]
}
}
From:一号门

COMMENTS