MCPcopy
hub / github.com/dronekit/dronekit-python / MAVConnection

Class MAVConnection

dronekit/mavlink.py:114–334  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

112
113
114class MAVConnection(object):
115
116 def stop_threads(self):
117 if self.mavlink_thread_in is not None:
118 self.mavlink_thread_in.join()
119 self.mavlink_thread_in = None
120 if self.mavlink_thread_out is not None:
121 self.mavlink_thread_out.join()
122 self.mavlink_thread_out = None
123
124 def __init__(self, ip, baud=115200, target_system=0, source_system=255, use_native=False):
125 if ip.startswith("udpin:"):
126 self.master = mavudpin_multi(ip[6:], input=True, baud=baud, source_system=source_system)
127 else:
128 self.master = mavutil.mavlink_connection(ip, baud=baud, source_system=source_system)
129
130 # TODO get rid of "master" object as exposed,
131 # keep it private, expose something smaller for dronekit
132 self.out_queue = Queue()
133 self.master.mav = mavutil.mavlink.MAVLink(
134 MAVWriter(self.out_queue),
135 srcSystem=self.master.source_system,
136 use_native=use_native)
137
138 # Monkey-patch MAVLink object for fix_targets.
139 sendfn = self.master.mav.send
140
141 def newsendfn(mavmsg, *args, **kwargs):
142 self.fix_targets(mavmsg)
143 return sendfn(mavmsg, *args, **kwargs)
144
145 self.master.mav.send = newsendfn
146
147 # Targets
148 self.target_system = target_system
149
150 # Listeners.
151 self.loop_listeners = []
152 self.message_listeners = []
153
154 # Debug flag.
155 self._accept_input = True
156 self._alive = True
157 self._death_error = None
158
159 import atexit
160
161 def onexit():
162 self._alive = False
163 self.stop_threads()
164
165 atexit.register(onexit)
166
167 def mavlink_thread_out():
168 # Huge try catch in case we see http://bugs.python.org/issue1856
169 try:
170 while self._alive:
171 try:

Callers 2

connectFunction · 0.90
test_mavlinkFunction · 0.90

Calls

no outgoing calls

Tested by 1

test_mavlinkFunction · 0.72