MCPcopy
hub / github.com/apache/caldera / Link

Class Link

app/objects/secondclass/c_link.py:91–308  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

89
90
91class Link(BaseObject):
92
93 schema = LinkSchema()
94 display_schema = LinkSchema(exclude=['jitter'])
95 load_schema = LinkSchema(exclude=['decide', 'pid', 'facts', 'unique', 'collect', 'finish', 'visibility',
96 'output', 'used.unique'])
97
98 RESERVED = dict(origin_link_id='#{origin_link_id}')
99
100 EVENT_EXCHANGE = 'link'
101 EVENT_QUEUE_STATUS_CHANGED = 'status_changed'
102
103 @property
104 def raw_command(self):
105 return self.decode_bytes(self.command) if self.command else ''
106
107 @property
108 def unique(self):
109 return self.hash('%s' % self.id)
110
111 @property
112 def pin(self):
113 return self._pin
114
115 @pin.setter
116 def pin(self, p):
117 self._pin = p
118
119 @property
120 def states(self):
121 return dict(HIGH_VIZ=-5,
122 UNTRUSTED=-4,
123 EXECUTE=-3,
124 DISCARD=-2,
125 PAUSE=-1,
126 SUCCESS=0,
127 ERROR=1,
128 TIMEOUT=124)
129
130 @property
131 def status(self):
132 return self._status
133
134 @property
135 def display(self):
136 dump = LinkSchema(exclude=['jitter']).dump(self)
137 dump['command'] = self.decode_bytes(dump['command'])
138 dump['plaintext_command'] = self.decode_bytes(dump['plaintext_command'])
139 return dump
140
141 @status.setter
142 def status(self, value):
143 previous_status = getattr(self, '_status', NO_STATUS_SET)
144
145 self._status = value
146
147 if previous_status is NO_STATUS_SET:
148 return

Calls 1

LinkSchemaClass · 0.85