@@ -191,15 +191,68 @@ def convert_allc_to_bigwig(input_allc_file,
191191 subprocess .check_call (shlex .split ("rm " + output_file + ".wig " + output_file + ".chrom_size" ))
192192
193193
194- def filter_allc_file (allc_file ,
195- output_file ,
196- mc_type = "CGN" ,
194+ def filter_allc_file (allc_files ,
195+ output_files ,
196+ num_procs = 1 ,
197+ mc_type = None ,
197198 chroms = None ,
198199 compress_output = False ,
199200 min_cov = 0 ,
201+ max_mismatch = None ,
202+ max_mismatch_frac = None ,
200203 buffer_line_number = 100000 ):
201204
202- mc_class = expand_nucleotide_code (mc_type )
205+ # User input checks
206+ if not isinstance (allc_files , list ):
207+ exit ("allc files must be a list of string(s)" )
208+ if not isinstance (output_files , list ):
209+ exit ("output files must be a list of string(s)" )
210+ if len (allc_files ) != len (output_files ):
211+ exit ("Number of allc files does not match number of output files" )
212+
213+ if num_procs > 1 :
214+ pool = multiprocessing .Pool (min (num_procs ,len (allc_files )))
215+ print_checkpoint ("Filtering allc files using "
216+ + str (min (num_procs ,len (allc_files )))
217+ + " node(s)." )
218+ for allc_file ,output_file in zip (allc_files ,output_files ):
219+ pool .apply_async (filter_allc_file_worker ,
220+ (),
221+ {"allc_file" :allc_file ,
222+ "output_file" :output_file ,
223+ "mc_type" :mc_type ,
224+ "chroms" :chroms ,
225+ "compress_output" :compress_output ,
226+ "min_cov" :min_cov ,
227+ "max_mismatch" :max_mismatch ,
228+ "max_mismatch_frac" :max_mismatch_frac
229+ })
230+ pool .close ()
231+ pool .join ()
232+ else :
233+ print_checkpoint ("Filtering allc files using single node." )
234+ for allc_file ,output_file in zip (allc_files ,output_files ):
235+ filter_allc_file_worker (allc_file = allc_file ,
236+ output_file = output_file ,
237+ mc_type = mc_type ,
238+ chroms = chroms ,
239+ compress_output = compress_output ,
240+ min_cov = min_cov ,
241+ max_mismatch = max_mismatch ,
242+ max_mismatch_frac = max_mismatch_frac )
243+
244+ def filter_allc_file_worker (allc_file ,
245+ output_file ,
246+ mc_type = None ,
247+ chroms = None ,
248+ compress_output = False ,
249+ min_cov = 0 ,
250+ max_mismatch = None ,
251+ max_mismatch_frac = None ,
252+ buffer_line_number = 100000 ):
253+
254+ if mc_type is not None :
255+ mc_class = expand_nucleotide_code (mc_type )
203256 # input & output
204257 f = open_allc_file (allc_file )
205258 if compress_output :
@@ -214,14 +267,48 @@ def filter_allc_file(allc_file,
214267 for line in f :
215268 fields = line .split ("\t " )
216269 if (chroms is None or fields [0 ] in chroms ) \
217- and fields [3 ] in mc_class \
270+ and ( mc_type is None or fields [3 ] in mc_class ) \
218271 and int (fields [5 ]) >= min_cov :
219- line_counts += 1
220- out += line
272+ pass
273+ else :
274+ continue
275+
276+ if max_mismatch is not None or max_mismatch_frac is not None :
277+ try :
278+ matches = map (int ,fields [7 ].split ("," ))
279+ mismatches = map (int ,fields [8 ].split ("," ))
280+ except :
281+ print_error ("allc file " + allc_file + "does not contain SNP information used "
282+ + "for applying mismatch-based filtering!\n " )
283+ pass_snp_filter = True
284+ if max_mismatch is not None :
285+ try :
286+ for index ,cutoff in enumerate (max_mismatch ):
287+ if mismatches [index ] > cutoff :
288+ pass_snp_filter = False
289+ break
290+ except :
291+ pass
292+ if max_mismatch_frac is not None :
293+ try :
294+ for index ,cutoff in enumerate (max_mismatch_frac ):
295+ cov = mismatches [index ] + matches [index ]
296+ if cov > 0 and float (mismatches [index ])/ float (cov ) > cutoff :
297+ pass_snp_filter = False
298+ break
299+ except :
300+ pass
301+ if not pass_snp_filter :
302+ continue
303+
304+ line_counts += 1
305+ out += line
306+ # print out once reach buffer limit
221307 if line_counts > buffer_line_number :
222308 output_fhandler .write (out )
223309 line_counts = 0
224310 out = ""
311+ # clear buffer
225312 if line_counts > 0 :
226313 output_fhandler .write (out )
227314 out = ""
@@ -382,16 +469,17 @@ def merge_allc_files_worker(allc_files,
382469 fhandles = []
383470 chrom_pointer = []
384471 chroms = set ([])
385- try :
472+ #try:
473+ if True :
386474 for index ,allc_file in enumerate (allc_files ):
387475 fhandles .append (open_allc_file (allc_file ))
388476 chrom_pointer .append (read_allc_index (allc_file ))
389477 for chrom in chrom_pointer [index ].keys ():
390478 chroms .add (chrom )
391- except :
392- for f in fhandles :
393- f .close ()
394- exit () # exit due to failure of openning all allc files at once
479+ # except:
480+ # for f in fhandles:
481+ # f.close()
482+ # exit() # exit due to failure of openning all allc files at once
395483 if query_chroms is not None :
396484 if isinstance (query_chroms ,list ):
397485 chroms = query_chroms
0 commit comments