MCPcopy Index your code
hub / github.com/diffgram/diffgram / Notification

Class Notification

shared/database/notifications/notification.py:19–289  ·  view source on GitHub ↗

Base data table for notification.

Source from the content-addressed store, hash-verified

17
18
19class Notification(Base):
20 """
21 Base data table for notification.
22 """
23
24 __tablename__ = 'notification'
25 id = Column(Integer, primary_key=True)
26
27 status = Column(String(), default="sent")
28
29 channel_type = Column(String(), default='email')
30
31 type = Column(String(), default='default')
32
33 title = Column(String(), default='')
34
35 description = Column(String(), default='')
36
37 member_created_id = Column(Integer, ForeignKey('member.id'))
38 member_created = relationship("Member", foreign_keys=[member_created_id])
39
40 member_updated_id = Column(Integer, ForeignKey('member.id'))
41 member_updated = relationship("Member", foreign_keys=[member_updated_id])
42
43 # Default
44 notification_relation_id = Column(Integer, ForeignKey('notification_relation.id'))
45 notification_relation = relationship("NotificationRelation", foreign_keys=[notification_relation_id])
46
47 time_created = Column(DateTime, default=datetime.datetime.utcnow)
48 time_updated = Column(DateTime, onupdate=datetime.datetime.utcnow)
49
50 def serialize_for_list_view(self, session):
51
52 return {
53 'id': self.id,
54 'status': self.status,
55 'title': self.title,
56 'description': self.description,
57 'time_created': self.time_created,
58 'time_completed': self.time_completed,
59 'member_created_id': self.member_created_id,
60 }
61
62 def __generate_payload_for_task(self, session, start_time=None):
63 tasks = []
64 task = Task.get_by_id(session=session, task_id=self.notification_relation.task_id)
65 if task:
66 tasks.append(task)
67 time_column = Task.time_created
68 if self.type == 'task_completed':
69 time_column = Task.time_completed
70 if start_time:
71 tasks = session.query(Task).filter(Task.project_id == self.notification_relation.task.project_id,
72 time_column <= datetime.datetime.utcnow(),
73 time_column >= start_time).all()
74 payload = [task.serialize_builder_view_by_id(session=session) for task in tasks]
75 return payload
76

Callers 1

newMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected