Source code for pymzml.file_classes.standardMzml

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interface for uncompressed mzML files.

@author: Manuel Koesters
"""
from __future__ import print_function
import codecs
import pymzml.spec as spec
import pymzml.regex_patterns as regex_patterns
from xml.etree.ElementTree import XML, iterparse
import re
import bisect


[docs]class StandardMzml(object): """ """
[docs] def __init__(self, path, encoding): """ Initalize Wrapper object for standard mzML files. Arguments: path (str) : path to the file encoding (str) : encoding of the file """ self.path = path self.file_handler = codecs.open( path, mode = 'r', encoding = encoding ) self.offset_dict = dict() self.spec_open = regex_patterns.SPECTRUM_OPEN_PATTERN self.spec_close = regex_patterns.SPECTRUM_CLOSE_PATTERN self._build_index(False, False)
# def __del__(self): # """ # """ # pass # # self.file_handler.close()
[docs] def __getitem__(self, identifier): """ Access the item with id 'identifier'. Either use linear, binary or interpolated search. Arguments: identifier (str): native id of the item to access Returns: data (str): text associated with the given identifier """ ############################################################################# # DOES NOT HOLD IF NUMBERS DONT START WITH ONE AND/OR DONT INCREASE BY ONE # # TODO FIXME # ############################################################################# self.file_handler.seek(0) spectrum = None if str(identifier).upper() == 'TIC': # print(str(identifier).upper()) found = False mzmliter = iter(iterparse(self.file_handler, events=['end'])) while found is False: event, element = next(mzmliter, ('STOP', 'STOP')) if event == 'end': if element.tag.endswith('}chromatogram'): if element.get('id') == 'TIC': found = True spectrum = spec.Chromatogram( element, measured_precision = 5e-6 ) elif event == 'STOP': raise StopIteration elif identifier in self.offset_dict: start = self.offset_dict[identifier] with open(self.path, 'rb') as seeker: seeker.seek(start[0]) start, end = self._read_to_spec_end(seeker) self.file_handler.seek(start, 0) data = self.file_handler.read(end - start) spectrum = spec.Spectrum( XML(data), measured_precision = 5e-6 ) elif type(identifier) == str: return self._search_string_identifier( identifier ) else: spectrum = self._interpol_search(identifier) return spectrum
[docs] def _build_index(self, from_scratch, use_spectra_sanity_check): """ Build an index. A list of offsets to which a file pointer can seek directly to access a particular spectrum or chromatogram without parsing the entire file. Args: from_scratch(bool): Whether or not to force building the index from scratch, by parsing the file, if no existing index can be found. use_spectra_sanity_check(bool): Whether or not to assume all data are spectra and follow the (scan=|nativeID=") pattern. Disable this if you have chromatograms or spectra with different ids. Returns: A file-like object used to access the indexed content by seeking to a particular offset for the file. """ # Declare the pre-seeker seeker = open(self.path, 'rb') # Reading last 1024 bytes to find chromatogram Pos and SpectrumIndex Pos index_list_offset_pattern = re.compile( b'<indexListOffset>(?P<indexListOffset>[0-9]*)</indexListOffset>' ) chromatogram_offset_pattern = re.compile( b'(?P<WTF>[nativeID|idRef])="TIC">(?P<offset>[0-9]*)</offset' ) self.offset_dict['TIC'] = None seeker.seek(0, 2) index_found = False spectrum_index_pattern = regex_patterns.SPECTRUM_INDEX_PATTERN for _ in range(1, 10): # max 10kbyte # some converters fail in writing a correct index # we found # a) the offset is always the same (silent fail hurray!) sanity_check_set = set() try: seeker.seek(-1024 * _, 1) except: break # File is smaller than 10kbytes ... for line in seeker: match = chromatogram_offset_pattern.search(line) if match: self.offset_dict['TIC'] = int( bytes.decode(match.group('offset')) ) match_spec = spectrum_index_pattern.search(line) if match_spec is not None: spec_byte_offset = int( bytes.decode(match_spec.group('offset')) ) sanity_check_set.add(spec_byte_offset) match = index_list_offset_pattern.search(line) if match: index_found = True # print(int(match.group('indexListOffset').decode('utf-8'))) # print(line) # exit(1) index_list_offset = int( match.group('indexListOffset').decode('utf-8') ) # break if index_found is True and \ self.offset_dict['TIC'] is not None: break if index_found is True: # Jumping to index list and slurpin all specOffsets seeker.seek(index_list_offset, 0) spectrum_index_pattern = regex_patterns.SPECTRUM_INDEX_PATTERN sim_index_pattern = regex_patterns.SIM_INDEX_PATTERN for line in seeker: match_spec = spectrum_index_pattern.search(line) if match_spec and match_spec.group('nativeID') == b'': match_spec = None match_sim = sim_index_pattern.search(line) if match_spec: offset = int(bytes.decode(match_spec.group('offset'))) native_id = int(bytes.decode(match_spec.group('nativeID'))) print(native_id, offset) self.offset_dict[native_id] = (offset) elif match_sim: offset = int(bytes.decode(match_sim.group('offset'))) native_id = bytes.decode(match_sim.group('nativeID')) native_id = regex_patterns.SPECTRUM_ID_PATTERN.search( native_id ).group(0) try: native_id = int(native_id) except: pass self.offset_dict[native_id] = (offset,) seeker.close()
def _read_to_spec_end(self, seeker): """ Read from current seeker position to the end of the next spectrum tag and return start and end postition Args: seeker (_io.BufferedReader): Reader instance used in calling function Returns: positions (tuple): tuple with start and end postion of the spectrum """ # start_pos = seeker.tell() chunk_size = 512 end_found = False start_pos = seeker.tell() while end_found is False: chunk_offset = seeker.tell() data_chunk = seeker.read(chunk_size) tag_end, seeker = self._read_until_tag_end(seeker) data_chunk += tag_end if regex_patterns.SPECTRUM_CLOSE_PATTERN.search(data_chunk): match = regex_patterns.SPECTRUM_CLOSE_PATTERN.search(data_chunk) relative_pos_in_chunk = match.end() end_pos = chunk_offset + relative_pos_in_chunk end_found = True return (start_pos, end_pos)
[docs] def _search_linear(self, seeker, index, chunk_size=8): """ Fallback to linear search if interpolated search fails. """ data = None i = 0 total_chunk_size = chunk_size * 512 spec_start = None spec_end = None i = 0 # print('target', index) while True: file_pointer = seeker.tell() data = seeker.read(total_chunk_size) string, seeker = self._read_until_tag_end(seeker) data += string spec_start = self.spec_open.search(data) if spec_start: spec_start_offset = file_pointer + spec_start.start() seeker.seek(spec_start_offset) current_index = int( re.search( b'[0-9]*$', spec_start.group('id') ).group() ) # print(current_index) spec_end = self.spec_close.search(data[spec_start.start():]) if spec_end: spec_end_offset = file_pointer + spec_end.end() + spec_start.start() seeker.seek(spec_end_offset) while spec_end is None: file_pointer = seeker.tell() data = seeker.read(total_chunk_size) string, seeker = self._read_until_tag_end(seeker) data += string spec_end = self.spec_close.search(data) if spec_end: spec_end_offset = file_pointer + spec_end.end() self.offset_dict[current_index] = (spec_start_offset, spec_end_offset) seeker.seek(spec_end_offset) break if current_index == index: seeker.seek(spec_start_offset) spec_string = seeker.read( spec_end_offset - spec_start_offset ) self.offset_dict[current_index] = ( spec_start_offset, spec_end_offset ) xml_string = XML(spec_string) seeker.close() return spec.Spectrum( xml_string, measured_precision=5e-6 )
def _search_string_identifier(self, search_string, chunk_size=8): with open(self.path, 'rb') as seeker: data = None total_chunk_size = chunk_size * 512 spec_start = None # NOTE: This needs to go intp regex_patterns.py regex_string = re.compile( "<\s*spectrum[^>]*index=\"[0-9]+\"\sid=\"({0})\"\sdefaultArrayLength=\"[0-9]+\">".format( "".join( ['.*', search_string, '.*'] ) ).encode() ) search_string = search_string.encode() while True: file_pointer = seeker.tell() data = seeker.read(total_chunk_size) string, seeker = self._read_until_tag_end(seeker, byte_mode=True) data += string spec_start = regex_string.search(data) if spec_start: spec_start_offset = file_pointer + spec_start.start() current_index = spec_start.group(1) if search_string in current_index: seeker.seek(spec_start_offset) start, end = self._read_to_spec_end(seeker) seeker.seek(start) spec_string = seeker.read(end-start) xml_string = XML(spec_string) return spec.Spectrum( xml_string, measured_precision=5e-6 ) elif len(data) == 0: raise Exception('cant find specified string') def _read_until_tag_end(self, seeker, max_search_len=12, byte_mode=False): """ Help make sure no splitted text appear in chunked data, so regex always find <spectrum ...> and </spectrum> """ count = 0 string = b'' curr_byte = '' while count < max_search_len and curr_byte != b'>' and curr_byte != b'<' and curr_byte != b' ': curr_byte = seeker.read(1) string += curr_byte count += 1 return string, seeker
[docs] def read(self, size=-1): """ Read binary data from file handler. Keyword Arguments: size (int): Number of bytes to read from file, -1 to read to end of file Returns: data (str): byte string of len size of input data """ return self.file_handler.read(size)
def close(self): """ """ self.file_handler.close()
if __name__ == '__main__': print(__doc__)