1.查询es上的文档内容
from pandasticsearch import DataFrame
df = DataFrame.from_es(url='http://192.168.12.133:9200', index='recommand_test')
df.print_schema() #数据结构
df.collect() #数据转成ROW结构
df.to_pandas() #数据转换成DataFrame结构
2.es上建立索引
from elasticsearch import Elasticsearch
es = Elasticsearch('192.168.12.133:9200')
mappings = { "mappings": { "type_doc_test": {
"properties": {
"vipid": { "type": "text", "index":True },
"recommand": { "type": "text", "index": True},
"score": { "type": "text", "index": "false"}} } } }
res = es.indices.create(index = 'recommand_test',body =mappings)
3.数据写入es
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch('192.168.12.133:9200')
ACTIONS = []
action1 ={"vipid": "1", "recommand":"a", "score" :"2.4"}
action2 ={"vipid": "2", "recommand":"b", "score" :"1.7" }
action = { "_index": "recommand_test",
"_type": "type_doc_test",
"_id":"A0",
"_source": { "vipid": 'A',"recommand": 'a', "score": '2.4'} }
action1 = { "_index": "recommand_test",
"_type": "type_doc_test",
"_id":"A1",
"_source": { "vipid": 'A',"recommand": 'b', "score": '1.7'} }
ACTIONS.append(action) #列明数据插入es
ACTIONS.append(action1)
helpers.bulk(es, ACTIONS)
recommand = {'col1': ['A', 'A'], 'col2': ['a', 'b'],'col3': ['2.4', '1.7']}
recommand=pd.DataFrame(recommand)
recommand.columns=['vipid','recommand','score']
from elasticsearch import helpers
es = Elasticsearch('192.168.12.133:9200')
action=[]
for i in range(len(recommand)):
action.append(
{"_index": "recommand_test",
"_type": "type_doc_test",
"_id":recommand.iloc[i]['vipid']+str(i),
"_source":recommand.iloc[i].to_json()})
helpers.bulk(es, action) #DataFrame插入es
4.删除es文档部分数据
from elasticsearch import Elasticsearch
es = Elasticsearch('192.168.12.133:9200')
doc = { "query": { "match": { "vipid": 'A' } } }
es.delete_by_query(index="recommand_test",body=doc)
5.删除es的文档
from elasticsearch import Elasticsearch
es = Elasticsearch('192.168.12.133:9200')
es.indices.delete(index="recommand_test")