HTTPS代理明文截获之证书伪造(Python) - Go语言中文社区

HTTPS代理明文截获之证书伪造(Python)


为了防止网络中的明文信息的在传输过程中被截获,SSL/TLS为各种应用协议加密封装提供了一个不错的解决方案,作为一种公开的加密协议,采用非对称加密的方式来传输密钥无疑为数据通信提供了不错的安全保障。由于现今的计算机还未达到在有限时间破解出私钥(RSA),所以想成功的截获明文信息,只能通过中间人伪造证书这种方式了,但是SSL/TLS协议在设计时就考虑到这一点,采用了CA机构证书签名的方式来防止这个问题,因此要想做到真正的做到证书伪造,除非你有可信任的CA机构的私钥(或者应用本身设计存在缺陷),这样可以为自己签发证书,正常情况下我们是不可能弄到这些的。说了这么多其实想说明SSL/TLS协议至少目前来说还是安全的,所以想截获HTTPS的明文,你先得导入事先自己制作好的CA根证书(怎么制作根证书以及签发证书等等,可以查看下Openssl的文档),废话不多说具体看代码,只是通过一个简单https代理来体现,代码比较简单,作用只是实现HTTPS的证书伪造的具体过程。

 

 

import os
import socket  
import threading  
import re  
import time  
import random
import ssl  
from M2Crypto import X509, EVP, RSA, ASN1  


CACerFile='ca.cer'  
  
CAKeyFile='ca.key'  

StoreFolder='certs/'

mutex = threading.Lock()

def gen_rand_serial(len):
    num=''
    nlist= random.sample(['1','2','3','4','5','6','7','9','0','a', 'b', 'c', 'd','e','f'], len)
    for n in nlist :
        num+=str(n) 
	return int(num.encode('hex'),16)
	
def mk_cert():  
    
    serial = gen_rand_serial(4)
    cert = X509.X509()   
    cert.set_serial_number(serial)  
    cert.set_version(2)  
    mk_cert_valid(cert)  
    cert.add_ext(X509.new_extension('nsComment', 'SSL sever'))  
    return cert  
  
def mk_cert_valid(cert, days=180):  
  
    t = long(time.time())  
    now = ASN1.ASN1_UTCTIME()  
    now.set_time(t - 24*60*60)  
    expire = ASN1.ASN1_UTCTIME()  
    expire.set_time(t + days * 24 * 60 * 60)  
    cert.set_not_before(now)  
    cert.set_not_after(expire)  
  
  
def mk_request(bits, cn='localhost'):  
  
    pk = EVP.PKey()  
    x = X509.Request()  
    rsa = RSA.gen_key(bits, 65537, lambda: None)  
    pk.assign_rsa(rsa)  
    x.set_pubkey(pk)  
    name = x.get_subject()  
    name.C = "CN"  
    name.CN = cn  
    name.ST = 'TS'  
    name.O = 'TS'  
    name.OU = 'TS'  
    x.sign(pk,'sha1')  
    return x, pk  
  
def mk_self_cert(cacert_file, ca_key_file, cn):  
  
    cert_req, pk2 = mk_request(2048, cn=cn)  
      
    if cacert_file and ca_key_file:  
        cacert = X509.load_cert(cacert_file)  
        pk1 = EVP.load_key(ca_key_file)  
    else:  
        cacert = None  
        pk1 = None  
      
    cert = mk_cert()  
    cert.set_subject(cert_req.get_subject())  
    cert.set_pubkey(cert_req.get_pubkey())  
      
    if cacert and pk1:  
        cert.set_issuer(cacert.get_issuer())  
        cert.sign(pk1, 'sha256')  
    else:  
        cert.set_issuer(cert.get_subject())  
        cert.sign(pk2, 'sha256')  
          
    with open(StoreFolder+cn+'.cer', 'w') as f:  
        f.write(cert.as_pem())  
    with open(StoreFolder+cn+'.key', 'w') as f:  
        f.write(pk2.as_pem(None))  
  
  
def RecviceMessage(ss):  
    head=''  
    method=''  
    isFrist=True  
    while (True):  
        try:  
            buf= ss.recv(2048)        
            if(len(buf)>0):   
                head+=buf  
            else:  
                break  
            if isFrist:  
                i=head.find(' ')  
                method=head[0:i]  
                isFrist=False  
        except Exception,e:    
                print "Recvice Browser Data Fail"  
                break  
                            
        if("rnrn" in head):   
            patten=method+'+( http| https)(://)+([^/])+(/)'              
            reobj = re.compile(patten)    
            result, number = reobj.subn(method+' /', head)  
            req=result.replace("Proxy-Connection:","Connection:")  
            print req  
  
            if method=="CONNECT":  
				#t = threading.Thread(target=FakeHttps,args=(result,ss))  
				#t.start()  
                FakeHttps(result,ss)  
            else:  
                #t = threading.Thread(target=ForWardHttp,args=(result,ss))  
                #t.start()
                ForWardHttp(result,ss)
                  
def ForWardHttp(msg,ss):  
    host=''  
    port=80  
    patten2=r'(Host: )+(S+)'  
    searchObj2 = re.search( patten2, msg, re.M|re.I)  
    if searchObj2:    
         host=searchObj2.group(2)  
         #print host  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   
    if(isinstance(ss,ssl.SSLSocket)):   
       sock=ssl.wrap_socket(sock)  
       port=443  
          
    try:  
        ip=socket.gethostbyname(host)   
        sock.connect((ip, port))  
    except Exception,e:  
        print e,"Connect Fail"  
        return   
          
    sock.send(msg)  
    while (True):  
        try:  
            rec=sock.recv(2048)  
            if(len(rec)>0):  
                ss.send(rec)  
            else:  
                break  
                 
        except Exception,e:  
            print e,'Recvice Data Fail'  
            sock.close()  
            ss.close()  
            break  
  
def FakeHttps(result,ss):  
    index=result.find(':')  
      
    Host=result[len('CONNECT '):index]  
      
    ss.send('HTTP/1.1 200 Connection Establishedrnrn')  
    
    mutex.acquire()
    mk_self_cert(CACerFile,CAKeyFile,Host)                    #Make a self signed certificate  
    mutex.release()
    #os.system('FakeSSL.exe '+Host)
    conn = ssl.wrap_socket(ss,keyfile=StoreFolder+Host+".key",certfile=StoreFolder+Host+".cer",server_side=True)  
  
    RecviceMessage(conn)  
  
  
if __name__ == "__main__":  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    sock.bind(('0.0.0.0',8080))  
    sock.listen(50)
    while(True):  
        clientSock, address = sock.accept()   
        t = threading.Thread(target=RecviceMessage,args=(clientSock,))  
        t.start()  

 

 

 

 

 

 

 

设置代理,打开google,发现网站证书已经被成功替换

截获到的明文请求信息

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/terry1201/article/details/47975207
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-08 17:09:55
  • 阅读 ( 1855 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢