Fix streaming server sometimes silently dropping subscriptions (#17841)

main
Claire 3 years ago committed by GitHub
parent 7eb2e791ee
commit f29458da1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -167,6 +167,11 @@ const startWorker = async (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
/**
* @type {Object.<string, Array.<function(string): void>>}
*/
const subs = {};
const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
@ -190,6 +195,22 @@ const startWorker = async (workerId) => {
}; };
}; };
/**
* @param {string} message
* @param {string} channel
*/
const onRedisMessage = (message, channel) => {
const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`);
if (!callbacks) {
return;
}
callbacks.forEach(callback => callback(message));
};
/** /**
* @param {string} channel * @param {string} channel
* @param {function(string): void} callback * @param {function(string): void} callback
@ -197,17 +218,33 @@ const startWorker = async (workerId) => {
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
redisSubscribeClient.subscribe(channel, callback); subs[channel] = subs[channel] || [];
if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, onRedisMessage);
}
subs[channel].push(callback);
}; };
/** /**
* @param {string} channel * @param {string} channel
* @param {function(string): void} callback
*/ */
const unsubscribe = (channel, callback) => { const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`); log.silly(`Removing listener for ${channel}`);
redisSubscribeClient.unsubscribe(channel, callback); if (!subs[channel]) {
return;
}
subs[channel] = subs[channel].filter(item => item !== callback);
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
delete subs[channel];
}
}; };
const FALSE_VALUES = [ const FALSE_VALUES = [

Loading…
Cancel
Save