#!/usr/bin/env python import ephem import time import os import sys import urllib from MmdDb import Db from MmdLocation import Location def dms2degdec (lonlat): ''' convert a position argument from Deg:Min:Sec.Frac to Deg.Frac and round the result to 5 decimal points ''' parts = lonlat.split (':') if len (parts) != 3: return 0.0 degrees = float (parts[0]) fraction = (float (parts[1]) * 60.0 + float (parts[2])) / 3600 if parts[0][0] == '-': return round (degrees - fraction, 5) return round (degrees + fraction, 5) class Satellite: def __init__ (self, observer = False): self.name = 'RADIOSCAF-B' self.nickname = 'ARISSAT-1' self.tle_filename = '/usr/local/mmd/tles/current' self.db = Db () self._initSatellite (observer) def getCurrentSSP (self): ssp = self._loadCurrentSSP () return ssp['timestamp'], ssp['longitude'], ssp['latitude'] def getTrajectoryAsJavaArray (self, name, minutes = 180): astring = 'var {0} = new Array (\n'.format (name) astring = '{0}\tnew Array ('.format (astring) astring = '{0}\n\t\tnew OpenLayers.LonLat ({1}, {2}).transform (from, to),'.format ( astring, self.longitude, self.latitude ) last = 'east' if str (self.longitude)[0] == '-': last = 'west' for row in self._loadTrajectory (minutes): if str (row['longitude'])[0] == '-': if last == 'east': astring = astring[:-1] astring = '{0}\n\t\t),\n\tnew Array ('.format (astring) last = 'west' else: last = 'east' astring = '{0}\n\t\tnew OpenLayers.LonLat ({1}, {2}).transform (from, to),'.format ( astring, row['longitude'], row['latitude']) if astring[-1] == ',': astring = astring[:-1] astring = '{0}\n\t\t)\n\t);'.format (astring) return astring def cronJob (self): if self._updateTLEFile (): self._updateTrajectory () return self._computeTrajectory () def getLatestSSP (self): return self.db.satelliteGetLatestSSP () def getNextPass (self): ''' return 6-tuple with info for next pass ''' sat = ephem.readtle (self._tle[0], self._tle[1], self._tle[2]) observer = ephem.Observer () observer.date = time.strftime ('%Y/%m/%d %H:%M', time.gmtime ()) observer.long, observer.lat, observer.elevation = str (self.observer.longitude), str (self.observer.latitude), self.observer.altitude observer.pressure = 0 observer.horizon = '-0:34' return observer.next_pass (sat) def _loadTrajectory (self, minutes): return self.db.satelliteLoadTrajectory (time.strftime ('%s'), minutes) def _loadCurrentSSP (self): return self.db.satelliteLoadCurrentSSP (time.strftime ('%s')) def _initSatellite (self, observer = False): if not self._loadTLE (): return False if observer: self.observer = observer else: self.observer = Location () sat = ephem.readtle (self._tle[0], self._tle[1], self._tle[2]) observer = ephem.Observer () observer.long, observer.lat, observer.elevation = str (self.observer.longitude), str (self.observer.latitude), self.observer.altitude sat.compute (observer) self.longitude = dms2degdec (sat.sublong.__str__ ()) self.latitude = dms2degdec (sat.sublat.__str__ ()) self.altitude = round (float (sat.elevation.__str__ ()), 1) def _computeTrajectory (self): ''' computes 5 hours of future trajectory data, starting at the latest timestamp found in data base ''' if not self._loadTLE (): return False sat = ephem.readtle (self._tle[0], self._tle[1], self._tle[2]) graz = ephem.Observer () graz.long, graz.lat, graz.elevation = '15.44226', '47.06576', 376 latest = self.db.satelliteGetLatestSSP () if latest: timestamp = int (latest['timestamp']) + 60 else: timestamp = int (time.strftime ('%s')) + 60 while timestamp % 60 != 0: timestamp += 1 for i in range (60 * 5): graz.date = time.strftime ('%Y/%m/%d %H:%M:%S', time.gmtime (timestamp)) sat.compute (graz) self.db.satelliteInsertNewSSP (timestamp, dms2degdec (sat.sublong.__str__ ()), dms2degdec (sat.sublat.__str__ ())) timestamp += 60 return True def _loadTLE (self): try: tle = open (self.tle_filename, 'r') except IOError: return False self._tle = [] count = 0 for line in tle.readlines (): self._tle.append (line) count += 1 if count != 3: return False return True def _updateTLEFile (self): ''' store new TLE data in file with format YYYYMMDDhhmmss (Year Month Day hour minute second) and make symlink to self.tle_filename ''' if not self._getTLEFileFromNetwork (): return False filename = '/usr/local/mmd/tles/{0}'.format (time.strftime ('%Y%m%d%H%M%S')) for line in self.tle_data: if line.startswith (self.name): tle = open (filename, 'w') tle.write (line) tle.write (self.tle_data.next ()) tle.write (self.tle_data.next ()) tle.close () return self._compareAndLinkNewTLEFile (filename) return False def _createOrbtrackTLEFile (self): tle_file = open (self.tle_filename) orbtrack_file = open ("/var/www/hofos.at/mmd/static/tle.js", "w") orbtrack_file.write ("//Created by MmdSatellite.py\n//{0}\nPLib.tleData = \n[\n".format (time.strftime ("%Y-%m-%d - %H:%M:%S"))) count = 0 for line in tle_file.readlines (): l = line[:-2] if count == 2: orbtrack_file.write (' "{0}"'.format (l)) else: orbtrack_file.write (' "{0}",\n'.format (l)) count += 1 orbtrack_file.write ("\n];\n// EOF\n") tle_file.close () orbtrack_file.close () def _compareAndLinkNewTLEFile (self, filename): ''' we should use field 21-32 from first line of TLE to compare data; then use this date to generate a filename and update the database; ''' old_data = open (self.tle_filename).read () new_data = open (filename).read () if old_data == new_data: os.unlink (filename) return False os.unlink (self.tle_filename) os.symlink (filename, self.tle_filename) self._createOrbtrackTLEFile () return True def _updateTrajectory (self): self.db.satelliteDeleteObsoleteSSPs (time.strftime ('%s')) def _getTLEFileFromNetwork (self): ''' retreive TLE update from NORAD/celestrak ''' networkfile = 'http://www.celestrak.com/NORAD/elements/amateur.txt' try: self.tle_data = urllib.urlopen (networkfile) except IOError: return False return True if __name__ == "__main__": satellite = Satellite () # print satellite.longitude, satellite.latitude # print satellite.getTrajectoryAsJavaArray ('test', 100) try: assert satellite.cronJob (), 'executing cronjob failed' # satellite._createOrbtrackTLEFile () # next_pass = satellite.getNextPass () # assert next_pass, 'getting next pass failed' # print next_pass except AssertionError as e: print 'Error: {0}'.format (e) sys.exit (1) sys.exit (0) # vim: tw=0 ts=2 # EOF