血崩!! 今天写完一个多线程读取文件,发现多线程反而比但线程慢多了;最后还是改成了多进程版本。多线程向多进程的转换也非常的方便,
为什么Python多线程反而更慢了?
原因就在于 GIL ,在 Cpython 解释器(Python语言的主流解释器)中,有一把全局解释锁(Global Interpreter Lock),在解释器解释执行 Python 代码时,先要得到这把锁,意味着,任何时候只可能有一个线程在执行代码,其它线程要想获得 CPU 执行代码指令,就必须先获得这把锁,如果锁被其它线程占用了,那么该线程就只能等待,直到占有该锁的线程释放锁才有执行代码指令的可能。
因此,这也就是为什么两个线程一起执行反而更加慢的原因,因为同一时刻,只有一个线程在运行,其它线程只能等待,即使是多核CPU,也没办法让多个线程「并行」地同时执行代码,只能是交替执行,因为多线程涉及到上下文切换、锁机制处理(获取锁,释放锁等),所以,多线程执行不快反慢。
1.文件分块类
- 定义一个分块类
- 根据线程数对文件进行分块
- 获取每个线程需要读取的坐标
定义类的时候class PartitionFile(object)
这种方式表示PartitionFile
继承自Object
类
1 2 3 4 5
| class PartitionFile(object): def __init__(self, fileName, threadNum): self.fileName = fileName self.blockNum = threadNum
|
定义对应的文件分块方法partion
,最后返回每个区块文件的指针数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| def partion(self): fd = open(self.fileName, 'r') fd.seek(0, 2) fileSize = fd.tell() Pos_list = [] blockSize = int(fileSize/self.blockNum) start_Pos = 0 for i in range(self.blockNum): if i == self.blockNum-1: end_Pos = fileSize-1 Pos_list.append((start_Pos, end_Pos)) break end_Pos = start_Pos+blockSize-1 Pos_list.append((start_Pos, end_Pos)) start_Pos = end_Pos+1 fd.close() return Pos_list
|
2.读取文件线程类
类初始化需要传递6个参数
- 线程编号
- 线程所属队列
- 文件名
- 文件区块其实指针
- 文件区块结束指针
- 自定义处理函数
读取文件线程也可以称作生产者,继承自Thread类,在初始化是调用,初始化一个线程
1 2 3 4 5 6 7 8 9
| class readThread(Thread): def __init__(self, thread_name, thread_queue, fileName, start_Pos, end_Pos, processFunction): super(readThread, self).__init__() self.name = thread_name self.queue = thread_queue self.start_Pos = start_Pos self.end_Pos = end_Pos self.fileName = fileName self.processFunction = processFunction
|
类中读取数据的函数reader
- 根据文件区块指针,进行一行一行读取
- 将读取的数据交给
processFunction
自定义函数处理,过滤掉一些行或者转化一下格式
- 将过滤后的结果存进队列中
self.queue.put(tmp, block=True)
主要是当队列中数据已经满了,还来不及取出时,阻塞当前线程,等待队列闲置空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| def reader(self): fd = open(self.fileName, 'r') if self.start_Pos != 0: fd.seek(self.start_Pos-1) if fd.read(1) != '\n': fd.readline() self.start_Pos = fd.tell() fd.seek(self.start_Pos) while self.start_Pos <= self.end_Pos: line = fd.readline() tmp = self.processFunction(line) if tmp: self.queue.put(tmp, block=True) self.start_Pos = fd.tell() fd.close() return
|
启动线程函数
当线程启动是调用读取函数进行文件读取
1 2 3 4 5
| def run(self): print("线程" + self.name+": 开始读取文件...") self.reader() print("线程" + self.name+": 读取完成...") return
|
3.获取队列数据线程类
也被称为消费者线程,
初始化
- 包含线程名
- 所属队列
- 以及初始化一个用于存放队列数据的数组
1 2 3 4 5 6
| class getThread(threading.Thread): def __init__(self, name, queue): super(getThread, self).__init__() self.name = name self.queue = queue self.out = []
|
获取队列数据run
方法
- 当线程启动时,就开始向队列取数据
- 如果在向队列取数据时,等待时间过长,之间关闭当前进程
Queue.get
函数默认有一个block=True
,的参数;当线程取不到数据时,就一直会进入等待
1 2 3 4 5 6
| def run(self): while True: try: self.out.append(self.queue.get(timeout=2)) except: break
|
4.函数封装
为了实现代码的可重复利用,可以将这几个模块一起封装成一个函数
defaultProcessFunction
默认自定义函数,不会对文件中行进行处理
- 定义生成者线程池
- 定义消费者生成池
- 等待所有线程池结束后,调用消费者线程的
getData
方法,获取所有数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import time import threading from queue import Queue import re
def defaultProcessFunction(line): return line
def readFileByThread(fileName, ThreadNum, processFunction=defaultProcessFunction): out = [] workQueue = Queue() readThreads = [] getThreads = [] pos_list = PartitionFile(fileName, ThreadNum).partion() for i in range(len(pos_list)): postion = pos_list[i] mythread = readThread(str(i), workQueue, fileName, postion[0], postion[1], processFunction) mythread.start() getdataThread = getThread(str(i), workQueue) getdataThread.start() readThreads.append(mythread) getThreads.append(getdataThread) for i in readThreads: i.join() for i in getThreads: i.join() for i in getThreads: out += i.getData() return out
|
5.性能测试
由于测试文件只有20万行,进程间的调度也会消耗时间,所有有可能出现多进程比单进程慢一丢丢的情况
1 2 3 4 5
| if __name__ == "__main__": start_time = time.clock() readFileByThread(sys.argv[1], int(sys.argv[2])) end_time = time.clock() print('Cost Time is {:.2f}'.format(end_time-start_time))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 线程0: 开始读取文件... 线程0: 读取完成... Cost Time is 2.61
线程0: 开始读取文件... 线程1: 开始读取文件... 线程2: 开始读取文件... 线程3: 开始读取文件... 线程2: 读取完成... 线程1: 读取完成... 线程0: 读取完成... 线程3: 读取完成... Gbar_A01 Cost Time is 17.77
|
6.源代码
https://github.com/BiocottonHub/zpliuCode/blob/master/Hi-c/readFileByThread.py
7.参考
- Queue队列
- 多线程爬取数据
- super父类
- 文件指针
- 文件分块
- python继承类
- Queue
- Queue取数据
- 子进程不结束