## ############################################################################## # # @file example.py # @author The LibrePilot Project, http://www.librepilot.org Copyright (C) 2017. # The OpenPilot Team, http://www.openpilot.org Copyright (C) 2011. # @brief Base classes for python UAVObject # # @see The GNU Public License (GPL) Version 3 # #############################################################################/ # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # import logging import serial import traceback import sys from librepilot.uavtalk.uavobject import * from librepilot.uavtalk.uavtalk import * from librepilot.uavtalk.objectManager import * from librepilot.uavtalk.connectionManager import * def _hex02(value): return "%02X" % value class UavtalkDemo(): def __init__(self): self.nbUpdates = 0 self.lastRateCalc = time.time() self.updateRate = 0 self.objMan = None self.connMan = None def setup(self, port): print "Opening Port \"%s\"" % port if port[:3].upper() == "COM": _port = int(port[3:]) - 1 else: _port = port serPort = serial.Serial(_port, 57600, timeout=.5) if not serPort.isOpen(): raise IOError("Failed to open serial port") print "Creating UavTalk" self.uavTalk = UavTalk(serPort, None) print "Starting ObjectManager" self.objMan = ObjManager(self.uavTalk) self.objMan.importDefinitions() print "Starting UavTalk" self.uavTalk.start() print "Starting ConnectionManager" self.connMan = ConnectionManager(self.uavTalk, self.objMan) print "Connecting...", self.connMan.connect() print "Connected" print "Getting all Data" self.objMan.requestAllObjUpdate() # print "SN:", # sn = self.objMan.FirmwareIAPObj.CPUSerial.value # print "".join(map(_hex02, sn)) def stop(self): if self.uavTalk: print "Stopping UavTalk" self.uavTalk.stop() def showAttitudeViaObserver(self): print "Request fast periodic updates for AttitudeState" self.objMan.AttitudeState.metadata.telemetryUpdateMode = UAVMetaDataObject.UpdateMode.PERIODIC self.objMan.AttitudeState.metadata.telemetryUpdatePeriod.value = 50 self.objMan.AttitudeState.metadata.updated() print "Install Observer for AttitudeState updates\n" self.objMan.regObjectObserver(self.objMan.AttitudeState, self, "_onAttitudeUpdate") # Spin until we get interrupted while True: time.sleep(1) def showAttitudeViaWait(self): print "Request fast periodic updates for AttitudeState" self.objMan.AttitudeState.metadata.telemetryUpdateMode = UAVMetaDataObject.UpdateMode.PERIODIC self.objMan.AttitudeState.metadata.telemetryUpdatePeriod.value = 50 self.objMan.AttitudeState.metadata.updated() while True: self.objMan.AttitudeState.waitUpdate() self._onAttitudeUpdate(self.objMan.AttitudeState) def showAttitudeViaGet(self): while True: self.objMan.AttitudeState.getUpdate() self._onAttitudeUpdate(self.objMan.AttitudeState) def _onAttitudeUpdate(self, args): self.nbUpdates += 1 now = time.time() if now - self.lastRateCalc > 1: self.updateRate = self.nbUpdates / (now - self.lastRateCalc) self.lastRateCalc = now self.nbUpdates = 0 if self.nbUpdates & 1: dot = "." else: dot = " " print " %s Rate: %02.1f Hz " % (dot, self.updateRate), roll = self.objMan.AttitudeState.Roll.value print "RPY: %f %f %f " % (self.objMan.AttitudeState.Roll.value, self.objMan.AttitudeState.Pitch.value, self.objMan.AttitudeState.Yaw.value) + " " # return # print "Roll: %f " % roll, # i = roll/90 # if i<-1: i=-1 # if i>1: i= 1 # i = int((i+1)*15) # print "-"*i+"*"+"-"*(30-i)+" \r", def driveServo(self): print "Taking control of self.actuatorCmd" self.objMan.ActuatorCommand.metadata.access = UAVMetaDataObject.Access.READONLY self.objMan.ActuatorCommand.metadata.updated() while True: self.objMan.ActuatorCommand.Channel.value[0] = 1000 self.objMan.ActuatorCommand.updated() time.sleep(1) self.objMan.ActuatorCommand.Channel.value[0] = 2000 self.objMan.ActuatorCommand.updated() time.sleep(1) def printUsage(): appName = os.path.basename(sys.argv[0]) print print "usage:" print " %s port o|w|g|s" % appName print " o: Show Attitude using an \"observer\"" print " w: Show Attitude waiting for updates from flight" print " g: Show Attitude performing get operations" print " s: Drive Servo" print print " for example: %s COM30 o" % appName print if __name__ == '__main__': if len(sys.argv) != 3: print "ERROR: Incorrect number of arguments" printUsage() sys.exit(2) port, option = sys.argv[1:] if option not in ["o", "w", "g", "s"]: print "ERROR: Invalid option" printUsage() sys.exit(2) # Log everything, and send it to stderr. logging.basicConfig(level=logging.INFO) try: demo = UavtalkDemo() demo.setup(port) if option == "o": demo.showAttitudeViaObserver() # will not return elif option == "w": demo.showAttitudeViaWait() # will not return if option == "g": demo.showAttitudeViaGet() # will not return if option == "s": demo.driveServo() # will not return except KeyboardInterrupt: pass except Exception, e: print print "An error occured: ", e print traceback.print_exc() print try: demo.stop() except Exception: pass