域名读取方式
方式一:直接在脚本中的HOSTS列表中添加域名。
方式二:读取文件中的域名,需在当前目录下创建hosts.txt
文件(默认使用此方式)。
python脚本
建议使用python3执行脚本
# -*- encoding: utf-8 -*-
# requires a recent enough python with idna support in socket
# pyopenssl, cryptography and idna
# 优化人:wanhebin
import datetime
#获取系统当前时间
Startime=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
from OpenSSL import SSL
from cryptography import x509
from cryptography.x509.oid import NameOID
import idna
import time
import json
import urllib
import requests
from socket import socket
from collections import namedtuple
#定义发送钉钉告警的函数
def send_to_ding(access_token,content):
header = {
"Content-Type": "application/json",
"charset": "utf-8"
}
data = {
"msgtype": "text",
"text": {
"content": content
},
"at":{
#这里填写你要@的人的电话
"atMobiles":['18856489168'],
"isAtALL":False
}
}
sendData = json.dumps(data)
request = urllib.request.Request(access_token,data = sendData.encode(encoding='UTF8'),headers = header)
urlopen = urllib.request.urlopen(request)
print(urlopen.read())
#钉钉机器人的链接
access_token="https://oapi.dingtalk.com/robot/send?access_token=fa8df9eea17bf6342a99c02334bfa96978bead4bce761ee252a82b949e5e298f"
HostInfo = namedtuple(field_names='cert hostname peername', typename='HostInfo')
#要监控的地址,可以填写多个
'''
HOSTS = [
('www.baidu.com', 443),
('www.taobao.com', 443),
('www.jd.com', 443)
]
'''
HOSTS = []
with open('./hosts.txt','r') as f:
for i in f.readlines():
HOSTS.append(eval(i.split("=")[-1]))
#获取证书信息
def verify_cert(cert, hostname):
# verify notAfter/notBefore, CA trusted, servername/sni/hostname
cert.has_expired()
# service_identity.pyopenssl.verify_hostname(client_ssl, hostname)
# issuer
def get_certificate(hostname, port):
hostname_idna = idna.encode(hostname)
sock = socket()
sock.connect((hostname, port))
peername = sock.getpeername()
ctx = SSL.Context(SSL.SSLv23_METHOD) # most compatible
ctx.check_hostname = False
ctx.verify_mode = SSL.VERIFY_NONE
sock_ssl = SSL.Connection(ctx, sock)
sock_ssl.set_connect_state()
sock_ssl.set_tlsext_host_name(hostname_idna)
sock_ssl.do_handshake()
cert = sock_ssl.get_peer_certificate()
crypto_cert = cert.to_cryptography()
sock_ssl.close()
sock.close()
return HostInfo(cert=crypto_cert, peername=peername, hostname=hostname)
def print_basic_info(hostinfo):
#计算证书剩余的天数
start = time.mktime(time.strptime(str(Startime), '%Y-%m-%d %H:%M:%S'))
end = time.mktime(time.strptime(str(hostinfo.cert.not_valid_after), '%Y-%m-%d %H:%M:%S'))
count_days = int((end - start) / (24 * 60 * 60))
s = '''{hostname}
\t起始时间: {startime}
\t到期时间: {endtime}
\t剩余过期天数:{count}
'''.format(
hostname=hostinfo.hostname,
# peername=hostinfo.peername,
# commonname=get_common_name(hostinfo.cert),
# SAN=get_alt_names(hostinfo.cert),
# issuer=get_issuer(hostinfo.cert),
startime=hostinfo.cert.not_valid_before,
endtime=hostinfo.cert.not_valid_after,
count=count_days
)
print(s)
text='提醒:域名证书即将到期!\n以下域名SSL证书有效期天数小于30天,请及时续签:' \
'%s' % (s)
#对天数进行半段,并执行钉钉告警函数
if count_days < 30:
send_to_ding(access_token,text)
def check_it_out(hostname, port):
hostinfo = get_certificate(hostname, port)
print_basic_info(hostinfo)
import concurrent.futures
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
for hostinfo in e.map(lambda x: get_certificate(x[0], x[1]), HOSTS):
print_basic_info(hostinfo)