( client, functionName: string, commandName: string, args: ArgumentType[], callback )
| 132 | } |
| 133 | |
| 134 | export function executeWithAutoPipelining( |
| 135 | client, |
| 136 | functionName: string, |
| 137 | commandName: string, |
| 138 | args: ArgumentType[], |
| 139 | callback |
| 140 | ) { |
| 141 | // On cluster mode let's wait for slots to be available |
| 142 | if (client.isCluster && !client.slots.length) { |
| 143 | if (client.status === "wait") client.connect().catch(noop); |
| 144 | return asCallback( |
| 145 | new Promise(function (resolve, reject) { |
| 146 | client.delayUntilReady((err) => { |
| 147 | if (err) { |
| 148 | reject(err); |
| 149 | return; |
| 150 | } |
| 151 | |
| 152 | executeWithAutoPipelining( |
| 153 | client, |
| 154 | functionName, |
| 155 | commandName, |
| 156 | args, |
| 157 | null |
| 158 | ).then(resolve, reject); |
| 159 | }); |
| 160 | }), |
| 161 | callback |
| 162 | ); |
| 163 | } |
| 164 | |
| 165 | // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes |
| 166 | const prefix = client.options.keyPrefix || ""; |
| 167 | let slotKey = client.isCluster |
| 168 | ? client.slots[ |
| 169 | calculateSlot(`${prefix}${getFirstKeyForCommand(commandName, args)}`) |
| 170 | ].join(",") |
| 171 | : "main"; |
| 172 | |
| 173 | // When scaleReads is enabled, separate read and write commands into different pipelines |
| 174 | // so they can be routed to replicas and masters respectively |
| 175 | if (client.isCluster && client.options.scaleReads !== "master") { |
| 176 | const isReadOnly = exists(commandName) && hasFlag(commandName, "readonly"); |
| 177 | slotKey += isReadOnly ? ":read" : ":write"; |
| 178 | } |
| 179 | |
| 180 | if (!client._autoPipelines.has(slotKey)) { |
| 181 | const pipeline = client.pipeline(); |
| 182 | pipeline[kExec] = false; |
| 183 | pipeline[kCallbacks] = []; |
| 184 | client._autoPipelines.set(slotKey, pipeline); |
| 185 | } |
| 186 | |
| 187 | const pipeline = client._autoPipelines.get(slotKey); |
| 188 | |
| 189 | /* |
| 190 | Mark the pipeline as scheduled. |
| 191 | The symbol will make sure that the pipeline is only scheduled once per tick. |
no test coverage detected
searching dependent graphs…