ES日志收集定期清理与备份

按天清理索引

$ crontab -e

## 每日凌晨1点定时删除30天之前的`logstash-YYYY.MM.DD`索引
0 1 * * * /home/kibana/indexClean.sh es.zyouwei.com logstash -30

`bash indexClean.sh

#!/bin/sh

before $1 day

index=”$2-“date -d "$3 day " +%Y.%m.%d

beginDate=date "+%Y-%m-%d %H:%M:%S" echo “$beginDate: begin to deleting index: $index” >> /tmp/indexClean.log

result=curl --connect-timeout 10 -m 20 -XDELETE $1/$index

endDate=date "+%Y-%m-%d %H:%M:%S" echo “$endDate: exec result is : $result” >> /tmp/indexClean.log


## 按天关闭索引

```bash
$ crontab -e

## 每日凌晨1点05分定时关闭7天之前的`logstash-YYYY.MM.DD`索引
5 1 * * * /home/kibana/indexClose.sh es.zyouwei.com logstash -7

`bash indexClose.sh

#!/bin/sh

before 1 week

index=”$2-“date -d "$3 day " +%Y.%m.%d

beginDate=date "+%Y-%m-%d %H:%M:%S" echo “$beginDate: begin to closing index: $index” >> /tmp/indexClose.log

result=curl --connect-timeout 10 -m 20 -XPOST $1/$index/_close

endDate=date "+%Y-%m-%d %H:%M:%S" echo “$endDate: exec result is : $result” >> /tmp/indexClose.log


## 按月清理索引

```bash
$ crontab -e

## 每月1号凌晨2点定时删除3月之前的`logstash-srv-YYYY.MM`索引
0 2 1 * * /home/kibana/indexCleanByMonth.sh es.zyouwei.com logstash-srv -3

`bash indexCleanByMonth.sh

#!/bin/sh

before $3 month

index=”$2-“date -d "$3 month " +%Y.%m

beginDate=date "+%Y-%m-%d %H:%M:%S" echo “$beginDate: begin to deleting index: $index” >> /tmp/indexCleanByMonth.log

result=curl --connect-timeout 10 -m 20 -XDELETE $1/$index

endDate=date "+%Y-%m-%d %H:%M:%S" echo “$endDate: exec result is : $result” >> /tmp/indexCleanByMonth.log


## 备份索引到其它ES集群

```bash
$ crontab -e
0 3 * * * /home/kibana/indexBackup.sh

`bash indexBackup.sh

#!/usr/bin/env python

-- coding:utf-8 --

author = ‘aqlu’

import urllib2 import json

import traceback

import logging import os from datetime import timedelta, datetime

logging.basicConfig(filename=os.path.join(os.getcwd(), ‘indexBackup.log’), level=logging.DEBUG, filemode=’a’, format=’%(asctime)s - %(levelname)s: %(message)s’) log = logging.getLogger(‘root’)

noinspection PyBroadException

def backup_index(index, src_es_host, dest_es_host, close_index_name): snapshot_name = ‘snapshot-‘ + index + ‘_’ + datetime.now().strftime(‘%Y%m%d-%H%M%S’)

# step.0 validate index
try:
    validate_url = 'http://' + src_es_host + '/' + index
    validate_request = urllib2.Request(validate_url)
    validate_request.get_method = lambda: 'HEAD'
    validate_response = urllib2.urlopen(validate_request)
except urllib2.HTTPError, e:
    if e.code == 404:
        log.info("index [%s] is not exists", index)
else:
    if validate_response.getcode() == 200:
        # step.1 backup index
        # print datetime.now().strftime('%Y-%m-%d %H:%M:%S'), ' begin to backup index [', indexName, ']'
        log.debug('begin to backup index [' + index + ']')
        backup_url = 'http://' + src_es_host + "/_snapshot/log_backup/" \
                     + snapshot_name + '?wait_for_completion=true'
        backup_param = {"indices": index}

        try:
            backup_request = urllib2.Request(backup_url, json.dumps(backup_param))
            backup_request.get_method = lambda: 'PUT'  # 设置HTTP的访问方式
            backup_response = urllib2.urlopen(backup_request)
            backup_result = json.loads(backup_response.read(), 'utf-8')
        except:
            log.exception('exception')
            send_alarm("告警:备份索引" + index + "异常【xxx】")
        else:
            log.debug("backup index end. result: %s", backup_result)

            if backup_result['snapshot']['state'] == 'SUCCESS':
                # if snapshot success then
                # step.2 delete index
                log.debug('begin to delete index [' + index + ']')
                try:
                    delete_url = 'http://' + src_es_host + "/" + index
                    delete_request = urllib2.Request(delete_url)
                    delete_request.get_method = lambda: 'DELETE'
                    delete_response = urllib2.urlopen(delete_request)
                    delete_result = json.loads(delete_response.read())
                    log.debug("delete index end. result: %s", delete_result)
                except:
                    log.exception('exception')
                    send_alarm("告警:删除索引" + index + "异常【xxx】")

                # step.3 restore index
                log.debug('begin to restore snapshot [' + snapshot_name + ']')
                try:
                    restore_url = 'http://' + dest_es_host + '/_snapshot/log_backup/' + snapshot_name \
                                  + '/_restore?wait_for_completion=true'
                    restore_request = urllib2.Request(restore_url)
                    restore_request.get_method = lambda: 'POST'
                    restore_response = urllib2.urlopen(restore_request)
                    restore_result = json.loads(restore_response.read())
                except:
                    log.exception('exception')
                    send_alarm("告警:还原快照" + snapshot_name + "异常【xxx】")
                else:
                    log.debug("restore snapshot end. result: %s", restore_result)

                    if restore_result['snapshot']['shards']['failed'] == 0:
                        # if no failed shards then
                        # step.4 delete snapshot
                        log.debug('begin to delete snapshot [' + snapshot_name + ']')
                        try:
                            del_snapshot_url = 'http://' + dest_es_host + '/_snapshot/log_backup/' + snapshot_name
                            del_snapshot_request = urllib2.Request(del_snapshot_url)
                            del_snapshot_request.get_method = lambda: 'DELETE'
                            del_snapshot_response = urllib2.urlopen(del_snapshot_request)
                            del_snapshot_result = json.loads(del_snapshot_response.read())
                            log.debug("delete snapshot end. result: %s", del_snapshot_result)

                            if close_index_name
                                # close index for before 1 month on backup cluster
                                close_index(close_index_name, dest_es_host)
                        except:
                            log.exception('exception')
                            send_alarm("告警:删除快照" + snapshot_name + "异常【xxx】")

noinspection PyBroadException

def close_index(index, esHost): log.debug(“begin to close index [%s]”, index) try: url = ‘http://‘ + esHost + ‘/‘ + index + ‘/_close’ request = urllib2.Request(url) request.get_method = lambda: ‘POST’ response = urllib2.urlopen(request) result = json.loads(response.read()) log.debug(“close index [%s] end. result: %s”, index, result) except: log.exception(‘exception’) send_alarm(“告警:关闭索引” + index + “异常【xxx】”)

noinspection PyBroadException

def send_alarm(content): log.debug(“send sms, content: [%s]”, content) try:

    ## TODO implements alarm as sms、call、dingTalk ......
    log.debug("send sms end. result: %s", result)
except:
    log.exception('exception')

before1Week = datetime.now() + timedelta(days=-7) before1month = datetime.now() + timedelta(days=-30) indexName = ‘logstash-‘ + before1Week.strftime(‘%Y.%m.%d’) closeIndexName = ‘logstash-‘ + before1month.strftime(‘%Y.%m.%d’) srcEsHost = ‘es.zyouwei.com’ destEsHost = ‘es-bak.zyouwei.com’

backup_index(indexName, srcEsHost, destEsHost, closeIndexName) `


   转载规则


《ES日志收集定期清理与备份》 Angus_Lu 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
GraphQL入门介绍(一) GraphQL入门介绍(一)
什么是GraphQLGraphQL是由FaceBook提出的一种基于API的查询语言(尽管它也支持修改数据)。它能够根据描述按需获取字段数据,不会有任何冗余信息。也能够通过一个请求一次获取多个资源。GraphQL最早的实现是由FaceBoo
2018-01-31 19:39:21
下一篇 
高可用HBASE搭建 高可用HBASE搭建
HDFS的高可用搭建不在本篇中描述,请参考《Hadoop HDFS与YARN高可用安装》。节点规划hostnameip安装服务zk1192.168.1.1zookeeperzk2192.168.1.2zookeeperzk3192.168.
2018-01-30 19:14:12
  目录