不具有通用性,留作纪念。
[root@GXB-CTRLCENTER python]# cat insert_pv.py #!/usr/bin/env python# -*- coding:utf-8 -*-from datetime import *from with_conn_to_db import conn_to_mysqlimport urllib2,jsonimport time###define yestoday 0-24 hours delta part##########today = date.today()yestoday = today - timedelta(days=1)#print today,yestodaya = str(yestoday) + ' ' + '00:00:00'b = str(today) + ' ' + '00:00:00'timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S")timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S")start_time = int(time.mktime(timeArray1)) * 1000end_time = int(time.mktime(timeArray2)) * 1000#####define es index and search part########server = 'http://elk.xkops.com:9200/'#stat_index = 'client-visit-*'index='client-*'#start_time = 1459146210879#stop_time = 1459147110879url = server + index + "/_search?pretty=true"query_date={ "query": { "filtered": { "query": { "query_string": { "query": "*", "analyze_wildcard": True } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": start_time, "lte": end_time, "format": "epoch_millis" } } } ], "must_not": [] } } } }, "size": 0, "aggs": { "2": { "terms": { "field": "visit_tenant_id", "size": 1000, "order": { "1": "desc" } }, "aggs": { "1": { "sum": { "field": "count" } }, "3": { "terms": { "field": "client_type", "size": 10000000, "order": { "1": "desc" } }, "aggs": { "1": { "sum": { "field": "count" } } } } } } }}query_date = json.dumps(query_date)req = urllib2.Request(url,query_date)response = urllib2.urlopen(req)page = response.read()#print pageresult = json.loads(page)###避免当天多次插入,插入前先删除#######sql = "delete from pv_stat where create_time = '%s'" % (yestoday)with conn_to_mysql('logstash') as db: db.execute(sql)for s in result['aggregations']['2']['buckets']: tenant_id = s['key'] type1 = s["3"]['buckets'][0]['key'] type1_count = s["3"]['buckets'][0]['doc_count'] sql = "insert into pv_stat(tenant_id,create_time,pv_count,client_type) values('%s','%s','%s','%s')" % (tenant_id,yestoday,type1_count,type1) #print sql with conn_to_mysql('logstash') as db: db.execute(sql) if len(s["3"]['buckets']) > 1: type2 = s["3"]['buckets'][1]['key'] type2_count = s["3"]['buckets'][1]['doc_count'] sql = "insert into pv_stat(tenant_id,create_time,pv_count,client_type) values('%s','%s','%s','%s')" % (tenant_id,yestoday,type2_count,type2) #print sql with conn_to_mysql('logstash') as db: db.execute(sql) else: continue