Skip to content

[ElasticSearch7] 批量导入

python
import random
from elasticsearch import Elasticsearch
from elasticsearch import helpers
 
es=Elasticsearch(hosts='http://localhost',port=9200)
levels=['info','debug','warn','error']
actions=[]

for i in range(100):
    level=levels[random.randrange(0,len(levels))]
    action={'_op_type':'index',#操作 index update create delete  
            '_index':'log_level',#index
            '_type':'doc',  #type
            '_source':{'level':level}}
    actions.append(action)
    
# 一, 使用bulk方式
helpers.bulk(client=es,actions=actions)

# 二,使用parallel_bulk方式
for success, info in parallel_bulk(es,actions):
if not success:
    print('A document failed:', info)

# 三,使用streaming_bulk
# streaming_bulk与parallel_bulk类似  需要遍历才会运行
# 都可以设置每个批次的大小,parallel_bulk还可以设置线程数  
for ok,response in helpers.streaming_bulk(es,actions):
    if not ok:
       print(response)
python
import csv
from elasticsearch import Elasticsearch
from elasticsearch import helpers


es=Elasticsearch(hosts='http://localhost',port=9200)

def gendata():
    with open('scripts/assets.csv', 'r', encoding='utf-8') as f:
        file = csv.reader(f)
        for line in file:
            yield {
                "_index": "assets",
                "_id":  line[0],
                "_source": {
                    "ip": line[0],
                    "ipv": line[1],
                    "ipPosition": line[2],
                    "level": line[3],
                    "status": line[4],
                    "domain": line[5].split(','),
                    "os": line[6],
                    "submitTime": line[7],
                    "hard": [],
                    "soft": []
                }
            }


def input_es():
    print(helpers.bulk(es, gendata()))