美文网首页
es 单主机,复制数据,迁移数据

es 单主机,复制数据,迁移数据

作者: 领带衬有黄金 | 来源:发表于2019-11-04 18:17 被阅读0次

需求:
公司需要改变Es所有索引的名字
使用re_index可以达到需求,但是会在转换的时候改变nested的类型为text,因此淘汰该方案。
github上有许多迁移数据的,但多是基于集群,目前还未搭建集群,所以,手写了以下脚本,供以后参考,欢迎指错。

elasticsearch   版本       5.5.3
import time

import math
import requests
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch

from utils.db_field_util import get_es_dbs_name

# 数据源
FROM_ES = '192.168.0.140'
# 需要传入的ES主机
TO_ES = '192.168.0.157'
from_es = Elasticsearch(hosts=FROM_ES)
to_es = Elasticsearch(hosts=TO_ES)


def get_url(index):
    return f"http://{FROM_ES}:9200/{index}/_mapping"


def get_body(index):
    resp = requests.get(get_url(index))
    return resp.json()[index]


# 建立索引
def create_index(index_name, body):
    try:
        to_es.indices.create(index_name, body)
        return True
    except:
        print('索引已经存在')
        return False


# 读取es数据
def async_es_data(index):
    # 创建新的索引
    create_flag = create_index(index_name=index.split('_').pop(), body=get_body(index))
    if create_flag:
        new_index = index.split('_').pop()
        print('索引创建成功,开始插入数据!!!!!!!!!!!!')
    else:
        return
        # 获取到总数
    resp = from_es.search(
        index=index,
        body={
            "query": {
                "match_all": {}
            }
        }
    )
    total = resp['hits']['total']
    actions = []
    start_time = time.time()  # 初始存入时间
    for num, i in enumerate(get_page_data(total, index, 100), 1):
        # print(i['hits']['hits'], total, num)
        for h in i['hits']['hits']:
            h['_index'] = new_index
        actions.extend(i['hits']['hits'])
        # 添加十次,存一次ES
        if not num % 10:
            result = bulk(to_es, actions, new_index, raise_on_error=True)
            print(actions[0])
            print(f'{num - 10}00~~{num}00条数据插入成功,已耗时{time.time() - start_time}s', result)
            actions = []
    if actions:
        print(actions[0])
        result = bulk(to_es, actions, new_index, raise_on_error=True)
        print(f'最后{len(actions)}条数据插入成功,最终耗时{time.time() - start_time}s', result)


def get_page_data(total, index, size):
    # 使用生成器,返回每页的数据
    total_page = math.ceil(total / 100)
    page = 0
    while page < total_page:
        yield from_es.search(
            index=index,
            body={
                "query": {
                    "match_all": {}
                },
                "from": page * size,
                "size": size
            }
        )
        page += 1


# 打开注释,开始同步
# print(get_es_dbs_name())
# for i, j in enumerate(get_es_dbs_name(), 1):
#     if '_' in j:
#         print(i, j)
# print(len(get_es_dbs_name()))

for i in get_es_dbs_name():
    if not i.endswith('iw'):
        async_es_data(i)


# async_es_data('eai_cmsm')


# print('aa')
# 数据校验
def check_data(index):
    resp_total = from_es.search(
        index=index,
        body={
            "query": {
                "match_all": {}
            }
        }
    )
    total = resp_total['hits']['total']
    # 1000 条数据校验
    print('总数据', total)
    # 记录不同的id值
    ids = []
    new_index = index.split('_').pop()
    for i_num, i in enumerate(get_page_data(total, index, 1000)):
        if i_num > 8:
            for h in i['hits']['hits']:
                # 获取到老库的单个数据,与新数据库单条数据比较
                # requests.get('http://192.168.0.140:9200/ahad/table/5d6c93a2a6632fa41c22e9d3')
                old_data = requests.get(f'http://192.168.0.140:9200/{index}/table/{h["_id"]}')
                old_data = old_data.json()
                old_data['_index'] = new_index
                new_data = requests.get(f'http://192.168.0.140:9200/{new_index}/table/{h["_id"]}')
                if not old_data['_source'] == new_data.json()['_source']:
                    ids.append(h['_id'])
        print(f'{i_num}000校验完毕,数据不同的id:', ids, '---------------------', len(ids))
    print('最终不同的id', ids)

# check_data('msd_ahad')

相关文章

  • es 单主机,复制数据,迁移数据

    需求:公司需要改变Es所有索引的名字使用re_index可以达到需求,但是会在转换的时候改变nested的类型为t...

  • 06 VMware vMotion和Storage vMotio

    1. 虚拟机迁移 迁移:将虚拟机从一台主机或数据存储移到另一台主机或数据存储 2. 迁移主机:vMotion 虚拟...

  • es

    数据迁移 es,es是一个开源的搜索引擎,es包括数据存储和数据检索,搜索数据会比 mysql 快很多,搜索接口是...

  • mysql数据批量同步导入elasticsearch

    背景: 对于数据的列表拉取响应变慢,数据需要从mysql迁移到es,从而数据从es进行读取; 解决思路: 由于数据...

  • ES索引查询匹配,简单记录

    存入数据代码,以及es删除索引数据 从es里查询数据------单匹配查询

  • Elasticsearch数据迁移与集群容灾

    本文讨论如何跨集群迁移ES数据以及如何实现ES的同城跨机房容灾和异地容灾。 跨集群数据迁移 在ES的生产实践中,往...

  • ES:reindex中的坑

    吐槽一个ES-reindex迁移的大坑 在做ES跨集群迁移的时候,用到了ES的reindex进行数据迁移,查了很多...

  • docker volume迁移

    应用容器化后,将数据路径持久化到宿主机中。当应用做迁移后,数据部分也需要进行迁移。 迁移思路:使用 --volum...

  • MySQL配置主从复制,主主复制

    为了保障数据的安全与稳定性,我们常用数据库的主从复制与主主复制来实现。主从复制为从机实时拷贝一份主机的数据,当主机...

  • Tair数据迁移三步走

    迁移的三个阶段 在多机房数据迁移中,整个过程分为三个阶段:历史数据迁移阶段、redolog迁移阶段、实时复制阶段。...

网友评论

      本文标题:es 单主机,复制数据,迁移数据

      本文链接:https://www.haomeiwen.com/subject/dsndbctx.html