Fix streaming server. Redis connection subscribe for each channel. (#3828)

th-downstream
猫吸血鬼ディフリス / 猫ロキP 7 years ago committed by Eugen Rochko
parent 77dcf442e7
commit 8f202bc639

@ -115,7 +115,7 @@ const startWorker = (workerId) => {
const subs = {}; const subs = {};
redisSubscribeClient.on('pmessage', (_, channel, message) => { redisSubscribeClient.on('message', (channel, message) => {
const callbacks = subs[channel]; const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`); log.silly(`New message on channel ${channel}`);
@ -127,8 +127,6 @@ const startWorker = (workerId) => {
callbacks.forEach(callback => callback(message)); callbacks.forEach(callback => callback(message));
}); });
redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`);
const subscriptionHeartbeat = (channel) => { const subscriptionHeartbeat = (channel) => {
const interval = 6*60; const interval = 6*60;
const tellSubscribed = () => { const tellSubscribed = () => {
@ -144,12 +142,20 @@ const startWorker = (workerId) => {
const subscribe = (channel, callback) => { const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`); log.silly(`Adding listener for ${channel}`);
subs[channel] = subs[channel] || []; subs[channel] = subs[channel] || [];
if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel);
}
subs[channel].push(callback); subs[channel].push(callback);
}; };
const unsubscribe = (channel, callback) => { const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`); log.silly(`Removing listener for ${channel}`);
subs[channel] = subs[channel].filter(item => item !== callback); subs[channel] = subs[channel].filter(item => item !== callback);
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
}
}; };
const allowCrossDomain = (req, res, next) => { const allowCrossDomain = (req, res, next) => {

Loading…
Cancel
Save