@ -6,12 +6,12 @@ const url = require('url');
const dotenv = require ( 'dotenv' ) ;
const express = require ( 'express' ) ;
const Redis = require ( 'ioredis' ) ;
const { JSDOM } = require ( 'jsdom' ) ;
const log = require ( 'npmlog' ) ;
const pg = require ( 'pg' ) ;
const dbUrlToConfig = require ( 'pg-connection-string' ) . parse ;
const metrics = require ( 'prom-client' ) ;
const redis = require ( 'redis' ) ;
const uuid = require ( 'uuid' ) ;
const WebSocket = require ( 'ws' ) ;
@ -24,30 +24,12 @@ dotenv.config({
log . level = process . env . LOG _LEVEL || 'verbose' ;
/ * *
* @ param { Object . < string , any > } defaultConfig
* @ param { string } redisUrl
* @ param { Object . < string , any > } config
* /
const redisUrlToClient = async ( defaultConfig , redisUrl ) => {
const config = defaultConfig ;
let client ;
if ( ! redisUrl ) {
client = redis . createClient ( config ) ;
} else if ( redisUrl . startsWith ( 'unix://' ) ) {
client = redis . createClient ( Object . assign ( config , {
socket : {
path : redisUrl . slice ( 7 ) ,
} ,
} ) ) ;
} else {
client = redis . createClient ( Object . assign ( config , {
url : redisUrl ,
} ) ) ;
}
const createRedisClient = async ( config ) => {
const { redisParams , redisUrl } = config ;
const client = new Redis ( redisUrl , redisParams ) ;
client . on ( 'error' , ( err ) => log . error ( 'Redis Client Error!' , err ) ) ;
await client . connect ( ) ;
return client ;
} ;
@ -147,23 +129,22 @@ const pgConfigFromEnv = (env) => {
* @ returns { Object . < string , any > } configuration for the Redis connection
* /
const redisConfigFromEnv = ( env ) => {
const redisNamespace = env . REDIS _NAMESPACE || null ;
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
// which means we can't use it. But this is something that should be looked into.
const redisPrefix = env . REDIS _NAMESPACE ? ` ${ env . REDIS _NAMESPACE } : ` : '' ;
const redisParams = {
socket : {
host : env . REDIS _HOST || '127.0.0.1' ,
port : env . REDIS _PORT || 6379 ,
} ,
database : env . REDIS _DB || 0 ,
host : env . REDIS _HOST || '127.0.0.1' ,
port : env . REDIS _PORT || 6379 ,
db : env . REDIS _DB || 0 ,
password : env . REDIS _PASSWORD || undefined ,
} ;
if ( redisNamespace ) {
redisParams . namespace = redisNamespace ;
// redisParams.path takes precedence over host and port.
if ( env . REDIS _URL && env . REDIS _URL . startsWith ( 'unix://' ) ) {
redisParams . path = env . REDIS _URL . slice ( 7 ) ;
}
const redisPrefix = redisNamespace ? ` ${ redisNamespace } : ` : '' ;
return {
redisParams ,
redisPrefix ,
@ -179,15 +160,15 @@ const startServer = async () => {
const pgPool = new pg . Pool ( pgConfigFromEnv ( process . env ) ) ;
const server = http . createServer ( app ) ;
const { redisParams , redisUrl , redisPrefix } = redisConfigFromEnv ( process . env ) ;
/ * *
* @ type { Object . < string , Array . < function ( Object < string , any > ) : void >> }
* /
const subs = { } ;
const redisSubscribeClient = await redisUrlToClient ( redisParams , redisUrl ) ;
const redisClient = await redisUrlToClient ( redisParams , redisUrl ) ;
const redisConfig = redisConfigFromEnv ( process . env ) ;
const redisSubscribeClient = await createRedisClient ( redisConfig ) ;
const redisClient = await createRedisClient ( redisConfig ) ;
const { redisPrefix } = redisConfig ;
// Collect metrics from Node.js
metrics . collectDefaultMetrics ( ) ;
@ -277,13 +258,13 @@ const startServer = async () => {
} ;
/ * *
* @ param { string } message
* @ param { string } channel
* @ param { string } message
* /
const onRedisMessage = ( message, channel ) => {
const onRedisMessage = ( channel, message ) => {
const callbacks = subs [ channel ] ;
log . silly ( ` New message on channel ${ channel} ` ) ;
log . silly ( ` New message on channel ${ redisPrefix} ${ channel} ` ) ;
if ( ! callbacks ) {
return ;
@ -294,6 +275,7 @@ const startServer = async () => {
callbacks . forEach ( callback => callback ( json ) ) ;
} ;
redisSubscribeClient . on ( "message" , onRedisMessage ) ;
/ * *
* @ callback SubscriptionListener
@ -312,8 +294,14 @@ const startServer = async () => {
if ( subs [ channel ] . length === 0 ) {
log . verbose ( ` Subscribe ${ channel } ` ) ;
redisSubscribeClient . subscribe ( channel , onRedisMessage ) ;
redisSubscriptions . inc ( ) ;
redisSubscribeClient . subscribe ( channel , ( err , count ) => {
if ( err ) {
log . error ( ` Error subscribing to ${ channel } ` ) ;
}
else {
redisSubscriptions . set ( count ) ;
}
} ) ;
}
subs [ channel ] . push ( callback ) ;
@ -334,8 +322,14 @@ const startServer = async () => {
if ( subs [ channel ] . length === 0 ) {
log . verbose ( ` Unsubscribe ${ channel } ` ) ;
redisSubscribeClient . unsubscribe ( channel ) ;
redisSubscriptions . dec ( ) ;
redisSubscribeClient . unsubscribe ( channel , ( err , count ) => {
if ( err ) {
log . error ( ` Error unsubscribing to ${ channel } ` ) ;
}
else {
redisSubscriptions . set ( count ) ;
}
} ) ;
delete subs [ channel ] ;
}
} ;