多进程2.0

在使用多线程并没有实质性的提速后,果断放弃了多线程;最后还是使用多进程的方式处理这个50G的大文件

多线程和多进程区别

  1. 进程是分配资源的基本单位;
  2. 线程是系统调度和分派的基本单位。
  3. 属于同一进程的线程,堆是共享的,栈是私有的。
  4. 属于同一进程的所有线程都具有相同的地址空间。

多进程的优点:

①编程相对容易;通常不需要考虑锁和同步资源的问题。
②更强的容错性:比起多线程的一个好处是一个进程崩溃了不会影响其他进程。
③有内核保证的隔离:数据和错误隔离。 对于使用如C/C++这些语言编写的本地代码,错误隔离是非常有用的:采用多进程架构的程序一般可以做到一定程度的自恢复;(master守护进程监控所有worker进程,发现进程挂掉后将其重启)。

多线程的优点:

①创建速度快,方便高效的数据共享
共享数据:多线程间可以共享同一虚拟地址空间;多进程间的数据共享就需要用到共享内存、信号量等IPC技术。
②较轻的上下文切换开销 - 不用切换地址空间,不用更改寄存器,不用刷新TLB。
③提供非均质的服务。如果全都是计算任务,但每个任务的耗时不都为1s,而是1ms-1s之间波动;这样,多线程相比多进程的优势就体现出来,它能有效降低“简单任务被复杂任务压住”的概率。

1.文件分块类

通过初始化一个文件分块类,并调用partion函数,获得每个进程所需要读取的区间

pos_list = PartitionFile(fileName, ProcessNum).partion() # *存放所有文件指针坐标*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PartitionFile(object):
def __init__(self, fileName, jobsNum):
self.fileName = fileName
self.blockNum = jobsNum

def partion(self):
fd = open(self.fileName, 'r')
fd.seek(0, 2) # 移动文件指针到文件尾,用于获取文件大小
fileSize = fd.tell() # 获取文件字符数
Pos_list = [] # 指针坐标,数组
blockSize = int(fileSize/self.blockNum)
start_Pos = 0 # 文件初始指针
for i in range(self.blockNum):
if i == self.blockNum-1:
end_Pos = fileSize-1 # 最后一个文件区块为文件结尾
Pos_list.append((start_Pos, end_Pos))
break
end_Pos = start_Pos+blockSize-1 # 均匀分配每个区块
Pos_list.append((start_Pos, end_Pos))
start_Pos = end_Pos+1 # 下一个区块的开始坐标
fd.close()
return Pos_list

2.读取进程

因为python中字典和列表都是引用类型数据,所以使用函数对其直接操作,可以改变原字典;这里只有文件的读取,不涉及子进程间的通信,所以不使用Queue

  • 进程类中定义一个self.outData用于存储每行结果
  • processFunction函数会对每行结果进行处理后存进self.outData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def reader(self):
fd = open(self.fileName, 'r')
if self.start_Pos != 0:
fd.seek(self.start_Pos-1)
if fd.read(1) != '\n': # 当前初始位置不是行首,移动到下一行行首
fd.readline()
self.start_Pos = fd.tell()
fd.seek(self.start_Pos) # 将文件指针定位到区块的行首
while self.start_Pos <= self.end_Pos: # 开始按行读取文件并且进行操作
tmp = fd.readline()
processFunction(tmp, self.outData) # 将每行结果存进字典内
self.start_Pos = fd.tell() # 读完一行后,自动调整开始位置
fd.close()
return

子进程写入文件

这里没有直接使用生产者-消费者模型,是因为基本没有子进程间的通信;只有主进程和子进程的通信;我当时还直接将所有的子进程读到的文件数据写入Queue,然后再从主进程中读取,结果最后子进程一直被挂起了。原因可能是,队列容量达到系统上限,生成者太多了,消费者只有主进程一个

参考:https://www.zhihu.com/question/63265466

  • 遍历self.outData数据,将结果写入文件
  • 由于涉及到多个进程对文件的写,这里使用了文件锁
  • 将写入的文件名通过Queue传递给主进程
1
2
3
4
5
6
7
8
9
10
11
def run(self):
print("Process: " + self.name+": reading file...")
self.reader()
print("Process: " + self.name+": begin to write temporary data to file...")
for key in self.outData.keys():
self.queue.put(key, block=True, timeout=None)
with open(path+"/"+key, 'a') as File:
fcntl.flock(File.fileno(), fcntl.LOCK_EX)
for item in self.outData[key]:
File.write(item[0]+" "+key+" "+" ".join(item[1:])+"\n")
print("Process: " + self.name+": completed write...")

3.排序进程

  • 排序进程的文件名是,主进程从队列中获取得到的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def sortFile(self):
with open(path+"/"+self.fileName, 'r') as File:
for line in File.readlines():
line = line.split(" ")
try:
# [tmpDict[line[3]], line[1],line[2], '0', tmpDict[line[6]], line[4], line[5], '0']
# 都是同一条染色体对应的Chr1-Chr2 Chr1-Chr3
self.outData[line[5]].append(
[line[0], line[2], line[3], line[4], line[6], line[7]])
except KeyError:
self.outData[line[5]] = [
[line[0], line[2], line[3], line[4], line[6], line[7]]]
with open(path+"/"+self.fileName+"_sorted", 'w') as File:
sortKey = sorted(self.outData)
for key in sortKey:
for item in self.outData[key]:
File.write(item[0]+" "+self.fileName+" " +
" ".join(item[1:4])+" "+key+" "+item[-2]+" "+item[-1])

4.封装后的函数

  • 主进程通过队列的方式从子进程中获取染色体编号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
path = 'tmp'+str(int(time.time()))
mkdir(path)
workQueue = Queue() # 用于存放子进程文件数据
read_jobs = []
sort_jobs = []
chrosomes = []
pos_list = PartitionFile(fileName, ProcessNum).partion() # 存放所有文件指针坐标
for i in range(ProcessNum):
position = pos_list[i]
myprocess = readProcess(
str(i), fileName, workQueue, position[0], position[1], processFunction)
myprocess.start()
read_jobs.append(myprocess)
for i in read_jobs:
i.join()
while True:
try:
chrosomes.append(workQueue.get(block=True, timeout=1)) # 获取子进程数据
except:
break
for i in list(set(chrosomes)):
myprocess = sortProcess(str(i), i) # 排序进程
myprocess.start()
sort_jobs.append(myprocess)
for i in sort_jobs:
i.join()

5.性能测试

  • 单进程
1
2
3
4
5
6
7
8
9
10
Process: 0: reading file...
Process: 0: begin to write temporary data to file...
Process: 0: completed write...
sorting chrosome: Gbar_A01...
chrosome: Gbar_A01ok...
merge chrosomes to a single file...
completed!
there are some temporary file in directory: <./tmp1593916264>
if you can remove it by yourself!
Cost Time is 46.96
  • 四个进程·
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Process: 1: reading file...
Process: 0: reading file...
Process: 3: reading file...
Process: 2: reading file...
Process: 3: begin to write temporary data to file...
Process: 1: begin to write temporary data to file...
Process: 3: completed write...
Process: 2: begin to write temporary data to file...
Process: 0: begin to write temporary data to file...
Process: 1: completed write...
Process: 2: completed write...
Process: 0: completed write...
sorting chrosome: Gbar_A01...
chrosome: Gbar_A01 ok...
merge chrosomes to a single file...
completed!
there are some temporary file in directory: <./tmp1593916319>
if you can remove it by yourself!
Cost Time is 18.70

6.源代码

https://github.com/BiocottonHub/zpliuCode/blob/master/Hi-c/HiCProTojuicer.py

7.参考

  1. 子进程挂起
  2. 并发队列
  3. 文件分块
  4. 进程通信
  5. 多进程
  6. OS模块
  7. 文件锁
  8. 线程与进程区别
------ 本文结束 thankyou 感谢阅读 ------

欢迎扫一扫上面的微信二维码,订阅 codeHub 公众号