需求:
公司需要改变Es所有索引的名字
使用re_index可以达到需求,但是会在转换的时候改变nested的类型为text,因此淘汰该方案。
github上有许多迁移数据的,但多是基于集群,目前还未搭建集群,所以,手写了以下脚本,供以后参考,欢迎指错。
elasticsearch 版本 5.5.3
import time
import math
import requests
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
from utils.db_field_util import get_es_dbs_name
# 数据源
FROM_ES = '192.168.0.140'
# 需要传入的ES主机
TO_ES = '192.168.0.157'
from_es = Elasticsearch(hosts=FROM_ES)
to_es = Elasticsearch(hosts=TO_ES)
def get_url(index):
return f"http://{FROM_ES}:9200/{index}/_mapping"
def get_body(index):
resp = requests.get(get_url(index))
return resp.json()[index]
# 建立索引
def create_index(index_name, body):
try:
to_es.indices.create(index_name, body)
return True
except:
print('索引已经存在')
return False
# 读取es数据
def async_es_data(index):
# 创建新的索引
create_flag = create_index(index_name=index.split('_').pop(), body=get_body(index))
if create_flag:
new_index = index.split('_').pop()
print('索引创建成功,开始插入数据!!!!!!!!!!!!')
else:
return
# 获取到总数
resp = from_es.search(
index=index,
body={
"query": {
"match_all": {}
}
}
)
total = resp['hits']['total']
actions = []
start_time = time.time() # 初始存入时间
for num, i in enumerate(get_page_data(total, index, 100), 1):
# print(i['hits']['hits'], total, num)
for h in i['hits']['hits']:
h['_index'] = new_index
actions.extend(i['hits']['hits'])
# 添加十次,存一次ES
if not num % 10:
result = bulk(to_es, actions, new_index, raise_on_error=True)
print(actions[0])
print(f'{num - 10}00~~{num}00条数据插入成功,已耗时{time.time() - start_time}s', result)
actions = []
if actions:
print(actions[0])
result = bulk(to_es, actions, new_index, raise_on_error=True)
print(f'最后{len(actions)}条数据插入成功,最终耗时{time.time() - start_time}s', result)
def get_page_data(total, index, size):
# 使用生成器,返回每页的数据
total_page = math.ceil(total / 100)
page = 0
while page < total_page:
yield from_es.search(
index=index,
body={
"query": {
"match_all": {}
},
"from": page * size,
"size": size
}
)
page += 1
# 打开注释,开始同步
# print(get_es_dbs_name())
# for i, j in enumerate(get_es_dbs_name(), 1):
# if '_' in j:
# print(i, j)
# print(len(get_es_dbs_name()))
for i in get_es_dbs_name():
if not i.endswith('iw'):
async_es_data(i)
# async_es_data('eai_cmsm')
# print('aa')
# 数据校验
def check_data(index):
resp_total = from_es.search(
index=index,
body={
"query": {
"match_all": {}
}
}
)
total = resp_total['hits']['total']
# 1000 条数据校验
print('总数据', total)
# 记录不同的id值
ids = []
new_index = index.split('_').pop()
for i_num, i in enumerate(get_page_data(total, index, 1000)):
if i_num > 8:
for h in i['hits']['hits']:
# 获取到老库的单个数据,与新数据库单条数据比较
# requests.get('http://192.168.0.140:9200/ahad/table/5d6c93a2a6632fa41c22e9d3')
old_data = requests.get(f'http://192.168.0.140:9200/{index}/table/{h["_id"]}')
old_data = old_data.json()
old_data['_index'] = new_index
new_data = requests.get(f'http://192.168.0.140:9200/{new_index}/table/{h["_id"]}')
if not old_data['_source'] == new_data.json()['_source']:
ids.append(h['_id'])
print(f'{i_num}000校验完毕,数据不同的id:', ids, '---------------------', len(ids))
print('最终不同的id', ids)
# check_data('msd_ahad')
网友评论