#!/usr/bin/env python # $Id$ """ A module for reading and writing FITS files and manipulating their contents. A module for reading and writing Flexible Image Transport System (FITS) files. This file format was endorsed by the International Astronomical Union in 1999 and mandated by NASA as the standard format for storing high energy astrophysics data. For details of the FITS standard, see the NASA/Science Office of Standards and Technology publication, NOST 100-2.0. License: http://www.stsci.edu/resources/software_hardware/pyraf/LICENSE For detailed examples of usage, see the I{PyFITS User's Manual} available from U{http://www.stsci.edu/resources/software_hardware/pyfits/Users_Manual1.pdf} Epydoc markup used for all docstrings in this module. @group Header-related Classes: Card, CardList, _Card_with_continue, Header, _Hierarch @group HDU Classes: _AllHDU, BinTableHDU, _CorruptedHDU, _ExtensionHDU, GroupsHDU, ImageHDU, _ImageBaseHDU, PrimaryHDU, TableHDU, _TableBaseHDU, _TempHDU, _ValidHDU @group Table-related Classes: ColDefs, Column, FITS_rec, _FormatP, _FormatX, _VLF """ """ Do you mean: "Profits"? - Google Search, when asked for "PyFITS" """ import re, os, tempfile, exceptions import operator import __builtin__ import urllib import tempfile import gzip import zipfile import numpy as np from numpy import char as chararray import rec from numpy import memmap as Memmap from string import maketrans import types import signal import threading import sys import warnings # Module variables _blockLen = 2880 # the FITS block size _python_mode = {'readonly':'rb', 'copyonwrite':'rb', 'update':'rb+', 'append':'ab+'} # open modes _memmap_mode = {'readonly':'r', 'copyonwrite':'c', 'update':'r+'} TRUE = True # deprecated FALSE = False # deprecated _INDENT = " " DELAYED = "delayed" # used for lazy instantiation of data ASCIITNULL = 0 # value for ASCII table cell with value = TNULL # this can be reset by user. _isInt = "isinstance(val, (int, long, np.integer))" # Warnings routines _showwarning = warnings.showwarning def showwarning(message, category, filename, lineno, file=None): if file is None: file = sys.stdout _showwarning(message, category, filename, lineno, file) def formatwarning(message, category, filename, lineno): return str(message)+'\n' warnings.showwarning = showwarning warnings.formatwarning = formatwarning warnings.filterwarnings('always',category=UserWarning,append=True) # Functions def _padLength(stringLen): """Bytes needed to pad the input stringLen to the next FITS block.""" return (_blockLen - stringLen%_blockLen) % _blockLen def _tmpName(input): """Create a temporary file name which should not already exist. Use the directory of the input file and the base name of the mktemp() output. """ dirName = os.path.dirname(input) if dirName != '': dirName += '/' _name = dirName + os.path.basename(tempfile.mktemp()) if not os.path.exists(_name): return _name else: raise _name, "exists" class VerifyError(exceptions.Exception): """Verify exception class.""" pass class _ErrList(list): """Verification errors list class. It has a nested list structure constructed by error messages generated by verifications at different class levels. """ def __init__(self, val, unit="Element"): list.__init__(self, val) self.unit = unit def __str__(self, tab=0): """Print out nested structure with corresponding indentations. A tricky use of __str__, since normally __str__ has only one argument. """ result = "" element = 0 # go through the list twice, first time print out all top level messages for item in self: if not isinstance(item, _ErrList): result += _INDENT*tab+"%s\n" % item # second time go through the next level items, each of the next level # must present, even it has nothing. for item in self: if isinstance(item, _ErrList): _dummy = item.__str__(tab=tab+1) # print out a message only if there is something if _dummy.strip(): if self.unit: result += _INDENT*tab+"%s %s:\n" % (self.unit, element) result += _dummy element += 1 return result class _Verify: """Shared methods for verification.""" def run_option(self, option="warn", err_text="", fix_text="Fixed.", fix = "pass", fixable=1): """Execute the verification with selected option.""" _text = err_text if not fixable: option = 'unfixable' if option in ['warn', 'exception']: #raise VerifyError, _text #elif option == 'warn': pass # fix the value elif option == 'unfixable': _text = "Unfixable error: %s" % _text else: exec(fix) #if option != 'silentfix': _text += ' ' + fix_text return _text def verify (self, option='warn'): """Wrapper for _verify.""" _option = option.lower() if _option not in ['fix', 'silentfix', 'ignore', 'warn', 'exception']: raise ValueError, 'Option %s not recognized.' % option if (_option == "ignore"): return x = str(self._verify(_option)).rstrip() if _option in ['fix', 'silentfix'] and x.find('Unfixable') != -1: raise VerifyError, '\n'+x if (_option != "silentfix"and _option != 'exception') and x: warnings.warn('Output verification result:') warnings.warn(x) if _option == 'exception' and x: raise VerifyError, '\n'+x def _pad(input): """Pad balnk space to the input string to be multiple of 80.""" _len = len(input) if _len == Card.length: return input elif _len > Card.length: strlen = _len % Card.length if strlen == 0: return input else: return input + ' ' * (Card.length-strlen) # minimum length is 80 else: strlen = _len % Card.length return input + ' ' * (Card.length-strlen) def _floatFormat(value): """Format the floating number to make sure it gets the decimal point.""" valueStr = "%.16G" % value if "." not in valueStr and "E" not in valueStr: valueStr += ".0" return valueStr class Undefined: """Undefined value.""" pass class Delayed: """Delayed file-reading data.""" def __init__(self, hdu=None, field=None): self.hdu = hdu self.field = field # translation table for floating value string _fix_table = maketrans('de', 'DE') _fix_table2 = maketrans('dD', 'eE') class Card(_Verify): # string length of a card length = 80 # String for a FITS standard compliant (FSC) keyword. _keywd_FSC = r'[A-Z0-9_-]* *$' _keywd_FSC_RE = re.compile(_keywd_FSC) # A number sub-string, either an integer or a float in fixed or # scientific notation. One for FSC and one for non-FSC (NFSC) format: # NFSC allows lower case of DE for exponent, allows space between sign, # digits, exponent sign, and exponents _digits_FSC = r'(\.\d+|\d+(\.\d*)?)([DE][+-]?\d+)?' _digits_NFSC = r'(\.\d+|\d+(\.\d*)?) *([deDE] *[+-]? *\d+)?' _numr_FSC = r'[+-]?' + _digits_FSC _numr_NFSC = r'[+-]? *' + _digits_NFSC # This regex helps delete leading zeros from numbers, otherwise # Python might evaluate them as octal values. _number_FSC_RE = re.compile(r'(?P[+-])?0*(?P' + _digits_FSC+')') _number_NFSC_RE = re.compile(r'(?P[+-])? *0*(?P' + _digits_NFSC + ')') # FSC commentary card string which must contain printable ASCII characters. _ASCII_text = r'[ -~]*$' _comment_FSC_RE = re.compile(_ASCII_text) # Checks for a valid value/comment string. It returns a match object # for a valid value/comment string. # The valu group will return a match if a FITS string, boolean, # number, or complex value is found, otherwise it will return # None, meaning the keyword is undefined. The comment field will # return a match if the comment separator is found, though the # comment maybe an empty string. _value_FSC_RE = re.compile( r'(?P *' r'(?P' # The regex is not correct for all cases, but # it comes pretty darn close. It appears to find the # end of a string rather well, but will accept # strings with an odd number of single quotes, # instead of issuing an error. The FITS standard # appears vague on this issue and only states that a # string should not end with two single quotes, # whereas it should not end with an even number of # quotes to be precise. # # Note that a non-greedy match is done for a string, # since a greedy match will find a single-quote after # the comment separator resulting in an incorrect # match. r'\'(?P([ -~]+?|\'\'|)) *?\'(?=$|/| )|' r'(?P[FT])|' r'(?P' + _numr_FSC + ')|' r'(?P\( *' r'(?P' + _numr_FSC + ') *, *(?P' + _numr_FSC + ') *\))' r')? *)' r'(?P' r'(?P/ *)' r'(?P[!-~][ -~]*)?' r')?$') _value_NFSC_RE = re.compile( r'(?P *' r'(?P' r'\'(?P([ -~]+?|\'\'|)) *?\'(?=$|/| )|' r'(?P[FT])|' r'(?P' + _numr_NFSC + ')|' r'(?P\( *' r'(?P' + _numr_NFSC + ') *, *(?P' + _numr_NFSC + ') *\))' r')? *)' r'(?P' r'(?P/ *)' r'(?P.*)' r')?$') # keys of commentary cards _commentaryKeys = ['', 'COMMENT', 'HISTORY'] def __init__(self, key='', value='', comment=''): """Construct a card from key, value, and (optionally) comment. Any specifed arguments, except defaults, must be compliant to FITS standard. key: keyword name, default=''. value: keyword value, default=''. comment: comment, default=''. """ if key != '' or value != '' or comment != '': self._setkey(key) self._setvalue(value) self._setcomment(comment) # for commentary cards, value can only be strings and there # is no comment if self.key in Card._commentaryKeys: if not isinstance(self.value, str): raise ValueError, 'Value in a commentary card must be a string' else: self.__dict__['_cardimage'] = ' '*80 def __repr__(self): return self._cardimage def __getattr__(self, name): """ instanciate specified attribute object.""" if name == '_cardimage': self.ascardimage() elif name == 'key': self._extractKey() elif name in ['value', 'comment']: self._extractValueComment(name) else: raise AttributeError, name return getattr(self, name) def _setkey(self, val): """Set the key attribute, surrogate for the __setattr__ key case.""" if isinstance(val, str): val = val.strip() if len(val) <= 8: val = val.upper() if val == 'END': raise ValueError, "keyword 'END' not allowed" self._checkKey(val) else: if val[:8].upper() == 'HIERARCH': val = val[8:].strip() self.__class__ = _Hierarch else: raise ValueError, 'keyword name %s is too long (> 8), use HIERARCH.' % val else: raise ValueError, 'keyword name %s is not a string' % val self.__dict__['key'] = val def _setvalue(self, val): """Set the value attribute.""" if isinstance(val, (str, int, long, float, complex, bool, Undefined, np.floating, np.integer, np.complexfloating)): if isinstance(val, str): self._checkText(val) self.__dict__['_valueModified'] = 1 else: raise ValueError, 'Illegal value %s' % str(val) self.__dict__['value'] = val def _setcomment(self, val): """Set the comment attribute.""" if isinstance(val,str): self._checkText(val) else: if val is not None: raise ValueError, 'comment %s is not a string' % val self.__dict__['comment'] = val def __setattr__(self, name, val): if name == 'key': raise SyntaxError, 'keyword name cannot be reset.' elif name == 'value': self._setvalue(val) elif name == 'comment': self._setcomment(val) else: raise AttributeError, name # When an attribute (value or comment) is changed, will reconstructe # the card image. self._ascardimage() def ascardimage(self, option='silentfix'): """Generate a (new) card image from the attributes: key, value, and comment, or from raw string. option: verification option, default=silentfix. """ # Only if the card image already exist (to avoid infinite loop), # fix it first. if self.__dict__.has_key('_cardimage'): self._check(option) self._ascardimage() return self.__dict__['_cardimage'] def _ascardimage(self): """Generate a (new) card image from the attributes: key, value, and comment. Core code for ascardimage. """ # keyword string if self.__dict__.has_key('key') or self.__dict__.has_key('_cardimage'): if isinstance(self, _Hierarch): keyStr = 'HIERARCH %s ' % self.key else: keyStr = '%-8s' % self.key else: keyStr = ' '*8 # value string # check if both value and _cardimage attributes are missing, # to avoid infinite loops if not (self.__dict__.has_key('value') or self.__dict__.has_key('_cardimage')): valStr = '' # string value should occupies at least 8 columns, unless it is # a null string elif isinstance(self.value, str): if self.value == '': valStr = "''" else: _expValStr = self.value.replace("'","''") valStr = "'%-8s'" % _expValStr valStr = '%-20s' % valStr # must be before int checking since bool is also int elif isinstance(self.value ,(bool,np.bool_)): valStr = '%20s' % `self.value`[0] elif isinstance(self.value , (int, long, np.integer)): valStr = '%20d' % self.value # XXX need to consider platform dependence of the format (e.g. E-009 vs. E-09) elif isinstance(self.value, (float, np.floating)): if self._valueModified: valStr = '%20s' % _floatFormat(self.value) else: valStr = '%20s' % self._valuestring elif isinstance(self.value, (complex,np.complexfloating)): if self._valueModified: _tmp = '(' + _floatFormat(self.value.real) + ', ' + _floatFormat(self.value.imag) + ')' valStr = '%20s' % _tmp else: valStr = '%20s' % self._valuestring elif isinstance(self.value, Undefined): valStr = '' # conserve space for HIERARCH cards if isinstance(self, _Hierarch): valStr = valStr.strip() # comment string if keyStr.strip() in Card._commentaryKeys: # do NOT use self.key commentStr = '' elif self.__dict__.has_key('comment') or self.__dict__.has_key('_cardimage'): if self.comment in [None, '']: commentStr = '' else: commentStr = ' / ' + self.comment else: commentStr = '' # equal sign string eqStr = '= ' if keyStr.strip() in Card._commentaryKeys: # not using self.key eqStr = '' if self.__dict__.has_key('value'): valStr = str(self.value) # put all parts together output = keyStr + eqStr + valStr + commentStr # need this in case card-with-continue's value is shortened if not isinstance(self, _Hierarch): self.__class__ = Card else: # does not support CONTINUE for HIERARCH if len(keyStr + eqStr + valStr) > Card.length: raise ValueError, "The keyword %s with its value is too long." % self.key if len(output) <= Card.length: output = "%-80s" % output # longstring case (CONTINUE card) else: # try not to use CONTINUE if the string value can fit in one line. # Instead, just truncate the comment if isinstance(self.value, str) and len(valStr) > (Card.length-10): self.__class__ = _Card_with_continue output = self._breakup_strings() else: warnings.warn('card is too long, comment is truncated.') output = output[:Card.length] self.__dict__['_cardimage'] = output def _checkText(self, val): """Verify val to be printable ASCII text.""" if Card._comment_FSC_RE.match(val) is None: self.__dict__['_err_text'] = 'Unprintable string %s' % repr(val) self.__dict__['_fixable'] = 0 raise ValueError, self._err_text def _checkKey(self, val): """Verify the keyword to be FITS standard.""" # use repr (not str) in case of control character if Card._keywd_FSC_RE.match(val) is None: self.__dict__['_err_text'] = 'Illegal keyword name %s' % repr(val) self.__dict__['_fixable'] = 0 raise ValueError, self._err_text def _extractKey(self): """Returns the keyword name parsed from the card image.""" head = self._getKeyString() if isinstance(self, _Hierarch): self.__dict__['key'] = head.strip() else: self.__dict__['key'] = head.strip().upper() def _extractValueComment(self, name): """Exatrct the keyword value or comment from the card image.""" # for commentary cards, no need to parse further if self.key in Card._commentaryKeys: self.__dict__['value'] = self._cardimage[8:].rstrip() self.__dict__['comment'] = '' return valu = self._check(option='parse') if name == 'value': if valu is None: raise ValueError, "Unparsable card, fix it first with .verify('fix')." if valu.group('bool') != None: _val = valu.group('bool')=='T' elif valu.group('strg') != None: _val = re.sub("''", "'", valu.group('strg')) elif valu.group('numr') != None: # Check for numbers with leading 0s. numr = Card._number_NFSC_RE.match(valu.group('numr')) _digt = numr.group('digt').translate(_fix_table2, ' ') if numr.group('sign') == None: _val = eval(_digt) else: _val = eval(numr.group('sign')+_digt) elif valu.group('cplx') != None: # Check for numbers with leading 0s. real = Card._number_NFSC_RE.match(valu.group('real')) _rdigt = real.group('digt').translate(_fix_table2, ' ') if real.group('sign') == None: _val = eval(_rdigt) else: _val = eval(real.group('sign')+_rdigt) imag = Card._number_NFSC_RE.match(valu.group('imag')) _idigt = imag.group('digt').translate(_fix_table2, ' ') if imag.group('sign') == None: _val += eval(_idigt)*1j else: _val += eval(imag.group('sign') + _idigt)*1j else: _val = UNDEFINED self.__dict__['value'] = _val if '_valuestring' not in self.__dict__: self.__dict__['_valuestring'] = valu.group('valu') if '_valueModified' not in self.__dict__: self.__dict__['_valueModified'] = 0 elif name == 'comment': self.__dict__['comment'] = '' if valu is not None: _comm = valu.group('comm') if isinstance(_comm, str): self.__dict__['comment'] = _comm.rstrip() def _fixValue(self, input): """Fix the card image for fixable non-standard compliance.""" _valStr = None # for the unparsable case if input is None: _tmp = self._getValueCommentString() try: slashLoc = _tmp.index("/") self.__dict__['value'] = _tmp[:slashLoc].strip() self.__dict__['comment'] = _tmp[slashLoc+1:].strip() except: self.__dict__['value'] = _tmp.strip() elif input.group('numr') != None: numr = Card._number_NFSC_RE.match(input.group('numr')) _valStr = numr.group('digt').translate(_fix_table, ' ') if numr.group('sign') is not None: _valStr = numr.group('sign')+_valStr elif input.group('cplx') != None: real = Card._number_NFSC_RE.match(input.group('real')) _realStr = real.group('digt').translate(_fix_table, ' ') if real.group('sign') is not None: _realStr = real.group('sign')+_realStr imag = Card._number_NFSC_RE.match(input.group('imag')) _imagStr = imag.group('digt').translate(_fix_table, ' ') if imag.group('sign') is not None: _imagStr = imag.group('sign') + _imagStr _valStr = '(' + _realStr + ', ' + _imagStr + ')' self.__dict__['_valuestring'] = _valStr self._ascardimage() def _locateEq(self): """Locate the equal sign in the card image before column 10 and return its location. It returns None if equal sign is not present, or it is a commentary card. """ # no equal sign for commentary cards (i.e. part of the string value) _key = self._cardimage[:8].strip().upper() if _key in Card._commentaryKeys: eqLoc = None else: if _key == 'HIERARCH': _limit = Card.length else: _limit = 10 try: eqLoc = self._cardimage[:_limit].index("=") except: eqLoc = None return eqLoc def _getKeyString(self): """Locate the equal sign in the card image and return the string before the equal sign. If there is no equal sign, return the string before column 9. """ eqLoc = self._locateEq() if eqLoc is None: eqLoc = 8 _start = 0 if self._cardimage[:8].upper() == 'HIERARCH': _start = 8 self.__class__ = _Hierarch return self._cardimage[_start:eqLoc] def _getValueCommentString(self): """Locate the equal sign in the card image and return the string after the equal sign. If there is no equal sign, return the string after column 8. """ eqLoc = self._locateEq() if eqLoc is None: eqLoc = 7 return self._cardimage[eqLoc+1:] def _check(self, option='ignore'): """Verify the card image with the specified option. """ self.__dict__['_err_text'] = '' self.__dict__['_fix_text'] = '' self.__dict__['_fixable'] = 1 if option == 'ignore': return elif option == 'parse': # check the value only, no need to check key and comment for 'parse' result = Card._value_NFSC_RE.match(self._getValueCommentString()) # if not parsable (i.e. everything else) result = None return result else: # verify the equal sign position if self.key not in Card._commentaryKeys and self._cardimage.find('=') != 8: if option in ['exception', 'warn']: self.__dict__['_err_text'] = 'Card image is not FITS standard (equal sign not at column 8).' raise ValueError, self._err_text + '\n%s' % self._cardimage elif option in ['fix', 'silentfix']: result = self._check('parse') self._fixValue(result) if option == 'fix': self.__dict__['_fix_text'] = 'Fixed card to be FITS standard.: %s' % self.key # verify the key, it is never fixable # always fix silently the case where "=" is before column 9, # since there is no way to communicate back to the _keylist. self._checkKey(self.key) # verify the value, it may be fixable result = Card._value_FSC_RE.match(self._getValueCommentString()) if result is not None or self.key in Card._commentaryKeys: return result else: if option in ['fix', 'silentfix']: result = self._check('parse') self._fixValue(result) if option == 'fix': self.__dict__['_fix_text'] = 'Fixed card to be FITS standard.: %s' % self.key else: self.__dict__['_err_text'] = 'Card image is not FITS standard (unparsable value string).' raise ValueError, self._err_text + '\n%s' % self._cardimage # verify the comment (string), it is never fixable if result is not None: _str = result.group('comm') if _str is not None: self._checkText(_str) def fromstring(self, input): """Construct a Card object from a (raw) string. It will pad the string if it is not the length of a card image (80 columns). If the card image is longer than 80, assume it contains CONTINUE card(s). """ self.__dict__['_cardimage'] = _pad(input) if self._cardimage[:8].upper() == 'HIERARCH': self.__class__ = _Hierarch # for card image longer than 80, assume it contains CONTINUE card(s). elif len(self._cardimage) > Card.length: self.__class__ = _Card_with_continue # remove the key/value/comment attributes, some of them may not exist for name in ['key', 'value', 'comment', '_valueModified']: if self.__dict__.has_key(name): delattr(self, name) return self def _ncards(self): return len(self._cardimage) / Card.length def _verify(self, option='warn'): """Card class verification method.""" _err = _ErrList([]) try: self._check(option) except ValueError: # Trapping the ValueError raised by _check method. Want execution to continue while printing # exception message. pass _err.append(self.run_option(option, err_text=self._err_text, fix_text=self._fix_text, fixable=self._fixable)) return _err class _Hierarch(Card): """Cards begins with HIERARCH which allows keyword name longer than 8 characters. """ def _verify(self, option='warn'): """No verification (for now).""" return _ErrList([]) class _Card_with_continue(Card): """Cards having more than one 80-char "physical" cards, the cards after the first one must start with CONTINUE and the whole card must have string value. """ def __str__(self): """Format a list of cards into a printable string.""" kard = self._cardimage output = '' for i in range(len(kard)/80): output += kard[i*80:(i+1)*80] + '\n' return output[:-1] def _extractValueComment(self, name): """Exatrct the keyword value or comment from the card image.""" longstring = '' ncards = self._ncards() for i in range(ncards): # take each 80-char card as a regular card and use its methods. _card = Card().fromstring(self._cardimage[i*80:(i+1)*80]) if i > 0 and _card.key != 'CONTINUE': raise ValueError, 'Long card image must have CONTINUE cards after the first card.' if not isinstance(_card.value, str): raise ValueError, 'Cards with CONTINUE must have string value.' if name == 'value': _val = re.sub("''", "'", _card.value).rstrip() # drop the ending "&" if _val[-1] == '&': _val = _val[:-1] longstring = longstring + _val elif name == 'comment': _comm = _card.comment if isinstance(_comm, str) and _comm != '': longstring = longstring + _comm.rstrip() + ' ' self.__dict__[name] = longstring.rstrip() def _breakup_strings(self): """Break up long string value/comment into CONTINUE cards. This is a primitive implementation, it will put the value string in one block and the comment string in another. Also, it does not break at the blank space between words. So it may not look pretty. """ val_len = 67 comm_len = 64 output = '' # do the value string valfmt = "'%-s&'" val = self.value.replace("'", "''") val_list = self._words_group(val, val_len) for i in range(len(val_list)): if i == 0: headstr = "%-8s= " % self.key else: headstr = "CONTINUE " valstr = valfmt % val_list[i] output = output + '%-80s' % (headstr + valstr) # do the comment string if self.comment is None: comm = '' else: comm = self.comment commfmt = "%-s" if not comm == '': nlines = len(comm) / comm_len + 1 comm_list = self._words_group(comm, comm_len) for i in comm_list: commstr = "CONTINUE '&' / " + commfmt % i output = output + '%-80s' % commstr return output def _words_group(self, input, strlen): """Split a long string into parts where each part is no longer than strlen and no word is cut into two pieces. But if there is one single word which is longer than strlen, then it will be split in the middle of the word. """ list = [] _nblanks = input.count(' ') nmax = max(_nblanks, len(input)/strlen+1) arr = chararray.array(input+' ', itemsize=1) # locations of the blanks blank_loc = np.nonzero(arr == ' ')[0] offset = 0 xoffset = 0 for i in range(nmax): try: loc = np.nonzero(blank_loc >= strlen+offset)[0][0] offset = blank_loc[loc-1] + 1 if loc == 0: offset = -1 except: offset = len(input) # check for one word longer than strlen, break in the middle if offset <= xoffset: offset = xoffset + strlen # collect the pieces in a list tmp = input[xoffset:offset] list.append(tmp) if len(input) == offset: break xoffset = offset return list class Header: """FITS header class.""" def __init__(self, cards=[]): """Construct a Header from a CardList. cards: A list of Cards, default=[]. """ # decide which kind of header it belongs to try: if cards[0].key == 'SIMPLE': if 'GROUPS' in cards._keylist and cards['GROUPS'].value == True: self._hdutype = GroupsHDU elif cards[0].value == True: self._hdutype = PrimaryHDU else: self._hdutype = _ValidHDU elif cards[0].key == 'XTENSION': xtension = cards[0].value.rstrip() if xtension == 'TABLE': self._hdutype = TableHDU elif xtension == 'IMAGE': self._hdutype = ImageHDU elif xtension in ('BINTABLE', 'A3DTABLE'): self._hdutype = BinTableHDU else: self._hdutype = _ExtensionHDU else: self._hdutype = _ValidHDU except: self._hdutype = _CorruptedHDU # populate the cardlist self.ascard = CardList(cards) def __getitem__ (self, key): """Get a header keyword value.""" return self.ascard[key].value def __setitem__ (self, key, value): """Set a header keyword value.""" self.ascard[key].value = value self._mod = 1 def __delitem__(self, key): """Delete card(s) with the name 'key'.""" # delete ALL cards with the same keyword name if isinstance(key, str): while 1: try: del self.ascard[key] self._mod = 1 except: return # for integer key only delete once else: del self.ascard[key] self._mod = 1 def __str__(self): return self.ascard.__str__() def ascardlist(self): """Returns a CardList.""" return self.ascard def items(self): """Return a list of all keyword-value pairs from the CardList.""" pairs = [] for card in self.ascard: pairs.append((card.key, card.value)) return pairs def has_key(self, key): """Check for existence of a keyword. Returns 1 if found, otherwise, 0. key: keyword name. If given an index, always returns 0. """ try: key = key.strip().upper() if key[:8] == 'HIERARCH': key = key[8:].strip() _index = self.ascard._keylist.index(key) return 1 except: return 0 def rename_key(self, oldkey, newkey, force=0): """Rename a card's keyword in the header. oldkey: old keyword, can be a name or index. newkey: new keyword, must be a string. force: if new key name already exist, force to have duplicate name. """ oldkey = oldkey.strip().upper() newkey = newkey.strip().upper() if newkey == 'CONTINUE': raise ValueError, 'Can not rename to CONTINUE' if newkey in Card._commentaryKeys or oldkey in Card._commentaryKeys: if not (newkey in Card._commentaryKeys and oldkey in Card._commentaryKeys): raise ValueError, 'Regular and commentary keys can not be renamed to each other.' elif (force == 0) and (newkey in self.ascard._keylist): raise ValueError, 'Intended keyword %s already exists in header.' % newkey _index = self.ascard.index_of(oldkey) _comment = self.ascard[_index].comment _value = self.ascard[_index].value self.ascard[_index] = Card(newkey, _value, _comment) # self.ascard[_index].__dict__['key']=newkey # self.ascard[_index].ascardimage() # self.ascard._keylist[_index] = newkey def get(self, key, default=None): """Get a keyword value from the CardList. If no keyword is found, return the default value. key: keyword name or index default: if no keyword is found, the value to be returned. """ try: return self[key] except KeyError: return default def update(self, key, value, comment=None, before=None, after=None): """Update one header card.""" """ If the keyword already exists, it's value/comment will be updated. If it does not exist, a new card will be created and it will be placed before or after the specified location. If no "before" or "after" is specified, it will be appended at the end. key: keyword name value: keyword value (to be used for updating) comment: keyword comment (to be used for updating), default=None. before: name of the keyword, or index of the Card before which the new card will be placed. The argument `before' takes precedence over `after' if both specified. default=None. after: name of the keyword, or index of the Card after which the new card will be placed. default=None. """ if self.has_key(key): j = self.ascard.index_of(key) if comment is not None: _comment = comment else: _comment = self.ascard[j].comment self.ascard[j] = Card(key, value, _comment) elif before != None or after != None: _card = Card(key, value, comment) self.ascard._pos_insert(_card, before=before, after=after) else: self.ascard.append(Card(key, value, comment)) self._mod = 1 def add_history(self, value, before=None, after=None): """Add a HISTORY card. value: History text to be added. before: [same as in update()] after: [same as in update()] """ self._add_commentary('history', value, before=before, after=after) def add_comment(self, value, before=None, after=None): """Add a COMMENT card. value: Comment text to be added. before: [same as in update()] after: [same as in update()] """ self._add_commentary('comment', value, before=before, after=after) def add_blank(self, value='', before=None, after=None): """Add a blank card. value: Text to be added. before: [same as in update()] after: [same as in update()] """ self._add_commentary(' ', value, before=before, after=after) def get_history(self): """Get all histories as a list of string texts.""" output = [] for _card in self.ascardlist(): if _card.key == 'HISTORY': output.append(_card.value) return output def get_comment(self): """Get all comments as a list of string texts.""" output = [] for _card in self.ascardlist(): if _card.key == 'COMMENT': output.append(_card.value) return output def _add_commentary(self, key, value, before=None, after=None): """Add a commentary card. If before and after are None, add to the last occurrence of cards of the same name (except blank card). If there is no card (or blank card), append at the end. """ new_card = Card(key, value) if before != None or after != None: self.ascard._pos_insert(new_card, before=before, after=after) else: if key[0] == ' ': useblanks = new_card._cardimage != ' '*80 self.ascard.append(new_card, useblanks=useblanks, bottom=1) else: try: _last = self.ascard.index_of(key, backward=1) self.ascard.insert(_last+1, new_card) except: self.ascard.append(new_card, bottom=1) self._mod = 1 def copy(self): """Make a copy of the Header.""" tmp = Header(self.ascard.copy()) # also copy the class tmp._hdutype = self._hdutype return tmp def _strip(self): """Strip cards specific to a certain kind of header. Strip cards like SIMPLE, BITPIX, etc. so the rest of the header can be used to reconstruct another kind of header. """ try: # have both SIMPLE and XTENSION to accomodate Extension # and Corrupted cases del self['SIMPLE'] del self['XTENSION'] del self['BITPIX'] _naxis = self['NAXIS'] if issubclass(self._hdutype, _TableBaseHDU): _tfields = self['TFIELDS'] del self['NAXIS'] for i in range(_naxis): del self['NAXIS'+`i+1`] if issubclass(self._hdutype, PrimaryHDU): del self['EXTEND'] del self['PCOUNT'] del self['GCOUNT'] if issubclass(self._hdutype, PrimaryHDU): del self['GROUPS'] if issubclass(self._hdutype, _ImageBaseHDU): del self['BSCALE'] del self['BZERO'] if issubclass(self._hdutype, _TableBaseHDU): del self['TFIELDS'] for name in ['TFORM', 'TSCAL', 'TZERO', 'TNULL', 'TTYPE', 'TUNIT']: for i in range(_tfields): del self[name+`i+1`] if issubclass(self._hdutype, BinTableHDU): for name in ['TDISP', 'TDIM', 'THEAP']: for i in range(_tfields): del self[name+`i+1`] if issubclass(self._hdutype, TableHDU): for i in range(_tfields): del self['TBCOL'+`i+1`] except KeyError: pass class CardList(list): """FITS header card list class.""" def __init__(self, cards=[], keylist=None): """Construct the CardList object from a list of Cards. cards: A list of Cards, default=[]. """ list.__init__(self, cards) self._cards = cards # if the key list is not supplied (as in reading in the FITS file), # it will be constructed from the card list. if keylist is None: self._keylist = [k.upper() for k in self.keys()] else: self._keylist = keylist # find out how many blank cards are *directly* before the END card self._blanks = 0 self.count_blanks() def __getitem__(self, key): """Get a Card by indexing or by the keyword name.""" _key = self.index_of(key) return super(CardList, self).__getitem__(_key) def __getslice__(self, start, end): _cards = super(CardList, self).__getslice__(start,end) result = CardList(_cards, self._keylist[start:end]) return result def __setitem__(self, key, value): """Set a Card by indexing or by the keyword name.""" if isinstance (value, Card): _key = self.index_of(key) # only set if the value is different from the old one if str(self[_key]) != str(value): super(CardList, self).__setitem__(_key, value) self._keylist[_key] = value.key.upper() self.count_blanks() self._mod = 1 else: raise SyntaxError, "%s is not a Card" % str(value) def __delitem__(self, key): """Delete a Card from the CardList.""" _key = self.index_of(key) super(CardList, self).__delitem__(_key) del self._keylist[_key] # update the keylist self.count_blanks() self._mod = 1 def count_blanks(self): """Find out how many blank cards are *directly* before the END card.""" for i in range(1, len(self)): if str(self[-i]) != ' '*Card.length: self._blanks = i - 1 break def append(self, card, useblanks=1, bottom=0): """Append a Card to the CardList. card: The Card to be appended. useblanks: Use any *extra* blank cards? default=1. If useblanks != 0, and if there are blank cards directly before END, it will use this space first, instead of appending after these blank cards, so the total space will not increase (default). When useblanks == 0, the card will be appended at the end, even if there are blank cards in front of END. bottom: If =0 (default) the card will be appended after the last non-commentary card. If =1, the card will be appended after the last non-blank card. """ if isinstance (card, Card): nc = len(self) - self._blanks i = nc - 1 if not bottom: for i in range(nc-1, -1, -1): # locate last non-commentary card if self[i].key not in Card._commentaryKeys: break super(CardList, self).insert(i+1, card) self._keylist.insert(i+1, card.key.upper()) if useblanks: self._use_blanks(card._ncards()) self.count_blanks() self._mod = 1 else: raise SyntaxError, "%s is not a Card" % str(card) def _pos_insert(self, card, before, after, useblanks=1): """Insert a Card to the location specified by before or after. The argument `before' takes precedence over `after' if both specified. They can be either a keyword name or index. """ if before != None: loc = self.index_of(before) self.insert(loc, card, useblanks=useblanks) elif after != None: loc = self.index_of(after) self.insert(loc+1, card, useblanks=useblanks) def insert(self, pos, card, useblanks=1): """Insert a Card to the CardList. pos: The position (index, keyword name will not be allowed) to insert. The new card will be inserted before it. card: The Card to be inserted. useblanks: Use any *extra* blank cards? default=1. If useblanks != 0, and if there are blank cards directly before END, it will use this space first, instead of appending after these blank cards, so the total space will not increase (default). When useblanks == 0, the card will be appended at the end, even if there are blank cards in front of END. """ if isinstance (card, Card): super(CardList, self).insert(pos, card) self._keylist.insert(pos, card.key) # update the keylist self.count_blanks() if useblanks: self._use_blanks(card._ncards()) self.count_blanks() self._mod = 1 else: raise SyntaxError, "%s is not a Card" % str(card) def _use_blanks(self, how_many): if self._blanks > 0: for i in range(min(self._blanks, how_many)): del self[-1] # it also delete the keylist item def keys(self): """Return a list of all keywords from the CardList.""" return map(lambda x: getattr(x,'key'), self) def index_of(self, key, backward=0): """Get the index of a keyword in the CardList. key: the keyword name (a string) or the index (an integer). backward: search the index from the END, i.e. backward? default=0. If backward = 1, search from the end. """ if isinstance(key, (int, long,np.integer)): return key elif isinstance(key, str): _key = key.strip().upper() if _key[:8] == 'HIERARCH': _key = _key[8:].strip() _keylist = self._keylist if backward: _keylist = self._keylist[:] # make a copy _keylist.reverse() try: _indx = _keylist.index(_key) if backward: _indx = len(_keylist) - _indx - 1 return _indx except ValueError: raise KeyError, 'Keyword %s not found.' % `key` else: raise KeyError, 'Illegal key data type %s' % type(key) def copy(self): """Make a (deep)copy of the CardList.""" cards = [None]*len(self) for i in range(len(self)): cards[i]=Card('').fromstring(repr(self[i])) return CardList(cards) def __repr__(self): """Format a list of cards into a string.""" block = '' for card in self: block = block + repr(card) return block def __str__(self): """Format a list of cards into a printable string.""" output = '' for card in self: output += str(card) + '\n' return output[:-1] # ----------------------------- HDU classes ------------------------------------ class _AllHDU(object): """Base class for all HDU (header data unit) classes.""" pass class _CorruptedHDU(_AllHDU): """A Corrupted HDU class.""" """ This class is used when one or more mandatory Cards are corrupted (unparsable), such as the 'BITPIX', 'NAXIS', or 'END' cards. A corrupted HDU usually means that the data size cannot be calculated or the 'END' card is not found. In the case of a missing 'END' card, the Header may also contain the binary data(*). (*) In future it may be possible to decipher where the last block of the Header ends, but this task may be difficult when the extension is a TableHDU containing ASCII data. """ def __init__(self, data=None, header=None): self._file, self._offset, self._datLoc = None, None, None self.header = header self.data = data self.name = None def size(self): """Returns the size (in bytes) of the HDU's data part.""" self._file.seek(0, 2) return self._file.tell() - self._datLoc def _summary(self): return "%-10s %-11s" % (self.name, "CorruptedHDU") def verify(self): pass class _ValidHDU(_AllHDU, _Verify): """Base class for all HDUs which are not corrupted.""" # 0.6.5.5 def size(self): """Size (in bytes) of the data portion of the HDU.""" size = 0 naxis = self.header.get('NAXIS', 0) if naxis > 0: size = 1 for j in range(naxis): size = size * self.header['NAXIS'+`j+1`] bitpix = self.header['BITPIX'] gcount = self.header.get('GCOUNT', 1) pcount = self.header.get('PCOUNT', 0) size = abs(bitpix) * gcount * (pcount + size) / 8 return size def copy(self): """Make a copy of the HDU, both header and data are copied.""" if self.data is not None: _data = self.data.copy() else: _data = None return self.__class__(data=_data, header=self.header.copy()) def writeto(self, name, output_verify='exception', clobber=False, classExtensions={}): """Write the HDU to a new file. This is a convenience method to provide a user easier output interface if only one HDU needs to be written to a file. name: output FITS file name to be written to. output_verify: output verification option, default='exception'. clobber: Overwrite the output file if exists, default = False. classExtensions: A dictionary that maps pyfits classes to extensions of those classes. When present in the dictionary, the extension class will be constructed in place of the pyfits class. """ if isinstance(self, _ExtensionHDU): if classExtensions.has_key(HDUList): hdulist = classExtensions[HDUList]([PrimaryHDU(),self]) else: hdulist = HDUList([PrimaryHDU(), self]) elif isinstance(self, PrimaryHDU): if classExtensions.has_key(HDUList): hdulist = classExtensions[HDUList]([self]) else: hdulist = HDUList([self]) hdulist.writeto(name, output_verify, clobber=clobber, classExtensions=classExtensions) def _verify(self, option='warn'): _err = _ErrList([], unit='Card') isValid = "val in [8, 16, 32, 64, -32, -64]" # Verify location and value of mandatory keywords. # Do the first card here, instead of in the respective HDU classes, # so the checking is in order, in case of required cards in wrong order. if isinstance(self, _ExtensionHDU): firstkey = 'XTENSION' firstval = self._xtn else: firstkey = 'SIMPLE' firstval = True self.req_cards(firstkey, '== 0', '', firstval, option, _err) self.req_cards('BITPIX', '== 1', _isInt+" and "+isValid, 8, option, _err) self.req_cards('NAXIS', '== 2', _isInt+" and val >= 0 and val <= 999", 0, option, _err) naxis = self.header.get('NAXIS', 0) if naxis < 1000: for j in range(3, naxis+3): self.req_cards('NAXIS'+`j-2`, '== '+`j`, _isInt+" and val>= 0", 1, option, _err) # verify each card for _card in self.header.ascard: _err.append(_card._verify(option)) return _err def req_cards(self, keywd, pos, test, fix_value, option, errlist): """Check the existence, location, and value of a required Card.""" """If pos = None, it can be anywhere. If the card does not exist, the new card will have the fix_value as its value when created. Also check the card's value by using the "test" argument. """ _err = errlist fix = '' cards = self.header.ascard try: _index = cards.index_of(keywd) except: _index = None fixable = fix_value is not None # if pos is a string, it must be of the syntax of "> n", # where n is an int if isinstance(pos, str): _parse = pos.split() if _parse[0] in ['>=', '==']: insert_pos = eval(_parse[1]) # if the card does not exist if _index is None: err_text = "'%s' card does not exist." % keywd fix_text = "Fixed by inserting a new '%s' card." % keywd if fixable: # use repr to accomodate both string and non-string types # Boolean is also OK in this constructor _card = "Card('%s', %s)" % (keywd, `fix_value`) fix = "self.header.ascard.insert(%d, %s)" % (insert_pos, _card) _err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix, fixable=fixable)) else: # if the supposed location is specified if pos is not None: test_pos = '_index '+ pos if not eval(test_pos): err_text = "'%s' card at the wrong place (card %d)." % (keywd, _index) fix_text = "Fixed by moving it to the right place (card %d)." % insert_pos fix = "_cards=self.header.ascard; dummy=_cards[%d]; del _cards[%d];_cards.insert(%d, dummy)" % (_index, _index, insert_pos) _err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix)) # if value checking is specified if test: val = self.header[keywd] if not eval(test): err_text = "'%s' card has invalid value '%s'." % (keywd, val) fix_text = "Fixed by setting a new value '%s'." % fix_value if fixable: fix = "self.header['%s'] = %s" % (keywd, `fix_value`) _err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix, fixable=fixable)) return _err class _TempHDU(_ValidHDU): """Temporary HDU, used when the file is first opened. This is to speed up the open. Any header will not be initialized till the HDU is accessed. """ def _getname(self): """Get the extname and extver from the header.""" re_extname = re.compile(r"EXTNAME\s*=\s*'([ -&(-~]*)'") re_extver = re.compile(r"EXTVER\s*=\s*(\d+)") mo = re_extname.search(self._raw) if mo: name = mo.group(1).rstrip() else: name = '' mo = re_extver.search(self._raw) if mo: extver = int(mo.group(1)) else: extver = 1 return name, extver def _getsize(self, block): """Get the size from the first block of the HDU.""" re_simple = re.compile(r'SIMPLE =\s*') re_bitpix = re.compile(r'BITPIX =\s*(-?\d+)') re_naxis = re.compile(r'NAXIS =\s*(\d+)') re_naxisn = re.compile(r'NAXIS(\d) =\s*(\d+)') re_gcount = re.compile(r'GCOUNT =\s*(-?\d+)') re_pcount = re.compile(r'PCOUNT =\s*(-?\d+)') re_groups = re.compile(r'GROUPS =\s*(T)') simple = re_simple.search(block[:80]) mo = re_bitpix.search(block) if mo is not None: bitpix = int(mo.group(1)) else: raise ValueError("BITPIX not found where expected") mo = re_gcount.search(block) if mo is not None: gcount = int(mo.group(1)) else: gcount = 1 mo = re_pcount.search(block) if mo is not None: pcount = int(mo.group(1)) else: pcount = 0 mo = re_groups.search(block) if mo and simple: groups = 1 else: groups = 0 mo = re_naxis.search(block) if mo is not None: naxis = int(mo.group(1)) pos = mo.end(0) else: raise ValueError("NAXIS not found where expected") if naxis == 0: datasize = 0 else: dims = [0]*naxis for i in range(naxis): mo = re_naxisn.search(block, pos) pos = mo.end(0) dims[int(mo.group(1))-1] = int(mo.group(2)) datasize = reduce(operator.mul, dims[groups:]) size = abs(bitpix) * gcount * (pcount + datasize) / 8 if simple and not groups: name = 'PRIMARY' else: name = '' return size, name def setupHDU(self, classExtensions={}): """Read one FITS HDU, data portions are not actually read here, but the beginning locations are computed. """ _cardList = [] _keyList = [] blocks = self._raw if (len(blocks) % _blockLen) != 0: raise IOError, 'Header size is not multiple of %d: %d' % (_blockLen, len(blocks)) elif (blocks[:8] not in ['SIMPLE ', 'XTENSION']): raise IOError, 'Block does not begin with SIMPLE or XTENSION' for i in range(0, len(blocks), Card.length): _card = Card('').fromstring(blocks[i:i+Card.length]) _key = _card.key if _key == 'END': break else: _cardList.append(_card) _keyList.append(_key) # Deal with CONTINUE cards # if a long string has CONTINUE cards, the "Card" is considered # to be more than one 80-char "physical" cards. _max = _keyList.count('CONTINUE') _start = 0 for i in range(_max): _where = _keyList[_start:].index('CONTINUE') + _start for nc in range(1, _max+1): if _where+nc >= len(_keyList): break if _cardList[_where+nc]._cardimage[:10].upper() != 'CONTINUE ': break # combine contiguous CONTINUE cards with its parent card if nc > 0: _longstring = _cardList[_where-1]._cardimage for c in _cardList[_where:_where+nc]: _longstring += c._cardimage _cardList[_where-1] = _Card_with_continue().fromstring(_longstring) del _cardList[_where:_where+nc] del _keyList[_where:_where+nc] _start = _where # if not the real CONTINUE card, skip to the next card to search # to avoid starting at the same CONTINUE card else: _start = _where + 1 if _keyList[_start:].count('CONTINUE') == 0: break # construct the Header object, using the cards. try: header = Header(CardList(_cardList, keylist=_keyList)) if classExtensions.has_key(header._hdutype): header._hdutype = classExtensions[header._hdutype] hdu = header._hdutype(data=DELAYED, header=header) # pass these attributes hdu._file = self._file hdu._hdrLoc = self._hdrLoc hdu._datLoc = self._datLoc hdu._datSpan = self._datSpan hdu._ffile = self._ffile hdu.name = self.name hdu._extver = self._extver hdu._new = 0 hdu.header._mod = 0 hdu.header.ascard._mod = 0 except: pass return hdu class _ExtensionHDU(_ValidHDU): """An extension HDU class. This class is the base class for the TableHDU, ImageHDU, and BinTableHDU classes. """ def __init__(self, data=None, header=None): self._file, self._offset, self._datLoc = None, None, None self.header = header self.data = data self._xtn = ' ' def __setattr__(self, attr, value): """Set an HDU attribute.""" if attr == 'name' and value: if not isinstance(value, str): raise TypeError, 'bad value type' value = value.upper() if self.header.has_key('EXTNAME'): self.header['EXTNAME'] = value else: self.header.ascard.append(Card('EXTNAME', value, 'extension name')) _ValidHDU.__setattr__(self,attr,value) def _verify(self, option='warn'): _err = _ValidHDU._verify(self, option=option) # Verify location and value of mandatory keywords. naxis = self.header.get('NAXIS', 0) self.req_cards('PCOUNT', '== '+`naxis+3`, _isInt+" and val >= 0", 0, option, _err) self.req_cards('GCOUNT', '== '+`naxis+4`, _isInt+" and val == 1", 1, option, _err) return _err # 0.8.8 def _iswholeline(indx, naxis): if isinstance(indx, (int, long,np.integer)): if indx >= 0 and indx < naxis: if naxis > 1: return _SinglePoint(1, indx) elif naxis == 1: return _OnePointAxis(1, 0) else: raise IndexError, 'Index %s out of range.' % indx elif isinstance(indx, slice): indx = _normalize_slice(indx, naxis) if (indx.start == 0) and (indx.stop == naxis) and (indx.step == 1): return _WholeLine(naxis, 0) else: if indx.step == 1: return _LineSlice(indx.stop-indx.start, indx.start) else: return _SteppedSlice((indx.stop-indx.start)/indx.step, indx.start) else: raise IndexError, 'Illegal index %s' % indx def _normalize_slice(input, naxis): """Set the slice's start/stop in the regular range.""" def _normalize(indx, npts): if indx < -npts: indx = 0 elif indx < 0: indx += npts elif indx > npts: indx = npts return indx _start = input.start if _start is None: _start = 0 elif isinstance(_start, (int, long,np.integer)): _start = _normalize(_start, naxis) else: raise IndexError, 'Illegal slice %s, start must be integer.' % input _stop = input.stop if _stop is None: _stop = naxis elif isinstance(_stop, (int, long,np.integer)): _stop = _normalize(_stop, naxis) else: raise IndexError, 'Illegal slice %s, stop must be integer.' % input if _stop < _start: raise IndexError, 'Illegal slice %s, stop < start.' % input _step = input.step if _step is None: _step = 1 elif isinstance(_step, (int, long, np.integer)): if _step <= 0: raise IndexError, 'Illegal slice %s, step must be positive.' % input else: raise IndexError, 'Illegal slice %s, step must be integer.' % input return slice(_start, _stop, _step) class _KeyType: def __init__(self, npts, offset): self.npts = npts self.offset = offset class _WholeLine(_KeyType): pass class _SinglePoint(_KeyType): pass class _OnePointAxis(_KeyType): pass class _LineSlice(_KeyType): pass class _SteppedSlice(_KeyType): pass class Section: """Image section.""" def __init__(self, hdu): self.hdu = hdu def __getitem__(self, key): dims = [] if not isinstance(key, tuple): key = (key,) naxis = self.hdu.header['NAXIS'] if naxis < len(key): raise IndexError, 'too many indices.' elif naxis > len(key): key = key + (slice(None),) * (naxis-len(key)) offset = 0 for i in range(naxis): _naxis = self.hdu.header['NAXIS'+`naxis-i`] indx = _iswholeline(key[i], _naxis) offset = offset * _naxis + indx.offset # all elements after the first WholeLine must be WholeLine or # OnePointAxis if isinstance(indx, (_WholeLine, _LineSlice)): dims.append(indx.npts) break elif isinstance(indx, _SteppedSlice): raise IndexError, 'Subsection data must be contiguous.' for j in range(i+1,naxis): _naxis = self.hdu.header['NAXIS'+`naxis-j`] indx = _iswholeline(key[j], _naxis) dims.append(indx.npts) if not isinstance(indx, _WholeLine): raise IndexError, 'Subsection data is not contiguous.' # the offset needs to multiply the length of all remaining axes else: offset *= _naxis if dims == []: dims = [1] npt = 1 for n in dims: npt *= n # Now, get the data (does not include bscale/bzero for now XXX) _bitpix = self.hdu.header['BITPIX'] code = _ImageBaseHDU.NumCode[_bitpix] self.hdu._file.seek(self.hdu._datLoc+offset*abs(_bitpix)/8) nelements = 1 for dim in dims: nelements = nelements*dim raw_data = np.fromfile(self.hdu._file, dtype=code, count=nelements, sep="") raw_data.shape = dims # raw_data._byteorder = 'big' raw_data.dtype = raw_data.dtype.newbyteorder(">") return raw_data class _ImageBaseHDU(_ValidHDU): """FITS image HDU base class.""" """Attributes: header: image header data: image data _file: file associated with array (None) _datLoc: starting byte location of data block in file (None) """ # mappings between FITS and numpy typecodes # NumCode = {8:'int8', 16:'int16', 32:'int32', 64:'int64', -32:'float32', -64:'float64'} # ImgCode = {' 0: _bitpix = self.header['BITPIX'] self._file.seek(self._datLoc) if isinstance(self, GroupsHDU): dims = self.size()*8/abs(_bitpix) else: dims = self._dimShape() code = _ImageBaseHDU.NumCode[self.header['BITPIX']] if self._ffile.memmap: self._ffile.code = code self._ffile.dims = dims self._ffile.offset = self._datLoc raw_data = self._ffile._mm else: nelements = 1 for x in range(len(dims)): nelements = nelements * dims[x] raw_data = np.fromfile(self._file, dtype=code, count=nelements,sep="") raw_data.shape=dims # print "raw_data.shape: ",raw_data.shape # raw_data._byteorder = 'big' raw_data.dtype = raw_data.dtype.newbyteorder('>') if (self._bzero != 0 or self._bscale != 1): if _bitpix > 16: # scale integers to Float64 self.data = np.array(raw_data, dtype=np.float64) elif _bitpix > 0: # scale integers to Float32 self.data = np.array(raw_data, dtype=np.float32) else: # floating point cases if self._ffile.memmap: self.data = raw_data.copy() # if not memmap, use the space already in memory else: self.data = raw_data if self._bscale != 1: np.multiply(self.data, self._bscale, self.data) if self._bzero != 0: self.data += self._bzero # delete the keywords BSCALE and BZERO after scaling del self.header['BSCALE'] del self.header['BZERO'] self.header['BITPIX'] = _ImageBaseHDU.ImgCode[self.data.dtype.name] else: self.data = raw_data try: return self.__dict__[attr] except KeyError: raise AttributeError(attr) def _dimShape(self): """Returns a tuple of image dimensions, reverse the order of NAXIS.""" naxis = self.header['NAXIS'] axes = naxis*[0] for j in range(naxis): axes[j] = self.header['NAXIS'+`j+1`] axes.reverse() # print "axes in _dimShape line 2081:",axes return tuple(axes) def _summary(self): """Summarize the HDU: name, dimensions, and formats.""" class_name = str(self.__class__) type = class_name[class_name.rfind('.')+1:-2] if type.find('_') != -1: type = type[type.find('_')+1:] # if data is touched, use data info. if 'data' in dir(self): if self.data is None: _shape, _format = (), '' else: # the shape will be in the order of NAXIS's which is the # reverse of the numarray shape if isinstance(self, GroupsHDU): _shape = list(self.data.data.shape)[1:] _format = `self.data._parent.field(0).dtype.name` else: _shape = list(self.data.shape) _format = `self.data.dtype.name` _shape.reverse() _shape = tuple(_shape) _format = _format[_format.rfind('.')+1:] # if data is not touched yet, use header info. else: _shape = () for j in range(self.header['NAXIS']): if isinstance(self, GroupsHDU) and j == 0: continue _shape += (self.header['NAXIS'+`j+1`],) _format = self.NumCode[self.header['BITPIX']] if isinstance(self, GroupsHDU): _gcount = ' %d Groups %d Parameters' % (self.header['GCOUNT'], self.header['PCOUNT']) else: _gcount = '' return "%-10s %-11s %5d %-12s %s%s" % \ (self.name, type, len(self.header.ascard), _shape, _format, _gcount) def scale(self, type=None, option="old", bscale=1, bzero=0): """Scale image data by using BSCALE/BZERO. Call to this method will scale self.data and update the keywords of BSCALE and BZERO in self.header. This method should only be used right before writing to the output file, as the data will be scaled and is therefore not very usable after the call. type (string): destination data type, use numpy attribute format, (e.g. 'uint8', 'int16', 'float32' etc.). If is None, use the current data type. option: how to scale the data: if "old", use the original BSCALE and BZERO values when the data was read/created. If "minmax", use the minimum and maximum of the data to scale. The option will be overwritten by any user specified bscale/bzero values. bscale/bzero: user specified BSCALE and BZERO values. """ if self.data is None: return # Determine the destination (numpy) data type if type is None: type = self.NumCode[self._bitpix] _type = getattr(np, type) # Determine how to scale the data # bscale and bzero takes priority if (bscale != 1 or bzero !=0): _scale = bscale _zero = bzero else: if option == 'old': _scale = self._bscale _zero = self._bzero elif option == 'minmax': if isinstance(_type, np.floating): _scale = 1 _zero = 0 else: # flat the shape temporarily to save memory dims = self.data.shape self.data.shape = self.data.size min = np.minimum.reduce(self.data) max = np.maximum.reduce(self.data) self.data.shape = dims if _type == np.uint8: # uint8 case _zero = min _scale = (max - min) / (2.**8 - 1) else: _zero = (max + min) / 2. # throw away -2^N _scale = (max - min) / (2.**(8*_type.bytes) - 2) # Do the scaling if _zero != 0: self.data += -_zero # 0.9.6.3 to avoid out of range error for BZERO = +32768 self.header.update('BZERO', _zero) else: del self.header['BZERO'] if _scale != 1: self.data /= _scale self.header.update('BSCALE', _scale) else: del self.header['BSCALE'] if self.data.dtype.type != _type: self.data = np.array(np.around(self.data), dtype=_type) #0.7.7.1 class PrimaryHDU(_ImageBaseHDU): """FITS primary HDU class.""" def __init__(self, data=None, header=None): """Construct a primary HDU. data: the data in the HDU, default=None. header: the header to be used (as a template), default=None. If header=None, a minimal Header will be provided. """ _ImageBaseHDU.__init__(self, data=data, header=header) self.name = 'PRIMARY' # insert the keywords EXTEND if header is None: dim = `self.header['NAXIS']` if dim == '0': dim = '' self.header.update('EXTEND', True, after='NAXIS'+dim) class ImageHDU(_ExtensionHDU, _ImageBaseHDU): """FITS image extension HDU class.""" def __init__(self, data=None, header=None, name=None): """Construct an image HDU. data: the data in the HDU, default=None. header: the header to be used (as a template), default=None. If header=None, a minimal Header will be provided. name: The name of the HDU, will be the value of the keywod EXTNAME, default=None. """ # no need to run _ExtensionHDU.__init__ since it is not doing anything. _ImageBaseHDU.__init__(self, data=data, header=header) self._xtn = 'IMAGE' self.header._hdutype = ImageHDU # insert the require keywords PCOUNT and GCOUNT dim = `self.header['NAXIS']` if dim == '0': dim = '' # set extension name if (name is None) and self.header.has_key('EXTNAME'): name = self.header['EXTNAME'] self.name = name def _verify(self, option='warn'): """ImageHDU verify method.""" _err = _ExtensionHDU._verify(self, option=option) self.req_cards('PCOUNT', None, _isInt+" and val == 0", 0, option, _err) return _err class GroupsHDU(PrimaryHDU): """FITS Random Groups HDU class.""" _dict = {8:'B', 16:'I', 32:'J', 64:'K', -32:'E', -64:'D'} def __init__(self, data=None, header=None, name=None): PrimaryHDU.__init__(self, data=data, header=header) self.header._hdutype = GroupsHDU self.name = name if self.header['NAXIS'] <= 0: self.header['NAXIS'] = 1 self.header.update('NAXIS1', 0, after='NAXIS') def __getattr__(self, attr): """Get the 'data' or 'columns' attribute. The data of random group FITS file will be like a binary table's data. """ if attr == 'data': # same code as in _TableBaseHDU size = self.size() if size: self._file.seek(self._datLoc) data = GroupData(_get_tbdata(self)) data._coldefs = self.columns data.formats = self.columns.formats data.parnames = self.columns._pnames else: data = None self.__dict__[attr] = data elif attr == 'columns': _cols = [] _pnames = [] _pcount = self.header['PCOUNT'] _format = GroupsHDU._dict[self.header['BITPIX']] for i in range(self.header['PCOUNT']): _bscale = self.header.get('PSCAL'+`i+1`, 1) _bzero = self.header.get('PZERO'+`i+1`, 0) _pnames.append(self.header['PTYPE'+`i+1`].lower()) _cols.append(Column(name='c'+`i+1`, format = _format, bscale = _bscale, bzero = _bzero)) data_shape = self._dimShape()[:-1] dat_format = `int(np.array(data_shape).sum())` + _format _bscale = self.header.get('BSCALE', 1) _bzero = self.header.get('BZERO', 0) _cols.append(Column(name='data', format = dat_format, bscale = _bscale, bzero = _bzero)) _coldefs = ColDefs(_cols) _coldefs._shape = self.header['GCOUNT'] _coldefs._dat_format = _fits2rec[_format] _coldefs._pnames = _pnames self.__dict__[attr] = _coldefs elif attr == '_theap': self.__dict__[attr] = 0 try: return self.__dict__[attr] except KeyError: raise AttributeError(attr) # 0.6.5.5 def size(self): """Returns the size (in bytes) of the HDU's data part.""" size = 0 naxis = self.header.get('NAXIS', 0) # for random group image, NAXIS1 should be 0, so we skip NAXIS1. if naxis > 1: size = 1 for j in range(1, naxis): size = size * self.header['NAXIS'+`j+1`] bitpix = self.header['BITPIX'] gcount = self.header.get('GCOUNT', 1) pcount = self.header.get('PCOUNT', 0) size = abs(bitpix) * gcount * (pcount + size) / 8 return size def _verify(self, option='warn'): _err = PrimaryHDU._verify(self, option=option) # Verify locations and values of mandatory keywords. self.req_cards('NAXIS', '== 2', _isInt+" and val >= 1 and val <= 999", 1, option, _err) self.req_cards('NAXIS1', '== 3', _isInt+" and val == 0", 0, option, _err) _after = self.header['NAXIS'] + 3 # if the card EXTEND exists, must be after it. try: _dum = self.header['EXTEND'] #_after += 1 except KeyError: pass _pos = '>= '+`_after` self.req_cards('GCOUNT', _pos, _isInt, 1, option, _err) self.req_cards('PCOUNT', _pos, _isInt, 0, option, _err) self.req_cards('GROUPS', _pos, 'val == True', True, option, _err) return _err # --------------------------Table related code---------------------------------- # lists of column/field definition common names and keyword names, make # sure to preserve the one-to-one correspondence when updating the list(s). # Use lists, instead of dictionaries so the names can be displayed in a # preferred order. _commonNames = ['name', 'format', 'unit', 'null', 'bscale', 'bzero', 'disp', 'start', 'dim'] _keyNames = ['TTYPE', 'TFORM', 'TUNIT', 'TNULL', 'TSCAL', 'TZERO', 'TDISP', 'TBCOL', 'TDIM'] # mapping from TFORM data type to numpy data type (code) _booltype = 'i1' _fits2rec = {'L':_booltype, 'B':'u1', 'I':'i2', 'E':'f4', 'D':'f8', 'J':'i4', 'A':'a', 'C':'c8', 'M':'c16', 'K':'i8'} # the reverse dictionary of the above _rec2fits = {} for key in _fits2rec.keys(): _rec2fits[_fits2rec[key]]=key class _FormatX(str): """For X format in binary tables.""" pass class _FormatP(str): """For P format in variable length table.""" pass # TFORM regular expression _tformat_re = re.compile(r'(?P^[0-9]*)(?P[A-Za-z])(?P