MCPcopy
hub / github.com/InstaPy/InstaPy / share_with_pods_restriction

Function share_with_pods_restriction

instapy/pods_util.py:84–151  ·  view source on GitHub ↗

Keep track of already shared posts

(operation, postid, limit, logger)

Source from the content-addressed store, hash-verified

82
83
84def share_with_pods_restriction(operation, postid, limit, logger):
85 """Keep track of already shared posts"""
86 conn = None
87
88 try:
89 # get a DB and start a connection
90 db, id = get_database()
91 conn = sqlite3.connect(db)
92
93 with conn:
94 conn.row_factory = sqlite3.Row
95 cur = conn.cursor()
96
97 cur.execute(
98 "SELECT * FROM shareWithPodsRestriction WHERE profile_id=:id_var "
99 "AND postid=:name_var",
100 {"id_var": id, "name_var": postid},
101 )
102 data = cur.fetchone()
103 share_data = dict(data) if data else None
104
105 if operation == "write":
106 if share_data is None:
107 # write a new record
108 cur.execute(
109 "INSERT INTO shareWithPodsRestriction (profile_id, "
110 "postid, times) VALUES (?, ?, ?)",
111 (id, postid, 1),
112 )
113 else:
114 # update the existing record
115 share_data["times"] += 1
116 sql = (
117 "UPDATE shareWithPodsRestriction set times = ? WHERE "
118 "profile_id=? AND postid = ?"
119 )
120 cur.execute(sql, (share_data["times"], id, postid))
121
122 # commit the latest changes
123 conn.commit()
124
125 elif operation == "read":
126 if share_data is None:
127 return False
128
129 elif share_data["times"] < limit:
130 return False
131
132 else:
133 exceed_msg = "" if share_data["times"] == limit else "more than "
134 logger.info(
135 "--> {} has already been shared with pods {}{} times".format(
136 postid, exceed_msg, str(limit)
137 )
138 )
139 return True
140
141 except Exception as exc:

Callers 1

join_podsMethod · 0.85

Calls 1

get_databaseFunction · 0.85

Tested by

no test coverage detected