|
|
|
@ -6,12 +6,12 @@ const url = require('url');
|
|
|
|
|
|
|
|
|
|
const dotenv = require('dotenv');
|
|
|
|
|
const express = require('express');
|
|
|
|
|
const Redis = require('ioredis');
|
|
|
|
|
const { JSDOM } = require('jsdom');
|
|
|
|
|
const log = require('npmlog');
|
|
|
|
|
const pg = require('pg');
|
|
|
|
|
const dbUrlToConfig = require('pg-connection-string').parse;
|
|
|
|
|
const metrics = require('prom-client');
|
|
|
|
|
const redis = require('redis');
|
|
|
|
|
const uuid = require('uuid');
|
|
|
|
|
const WebSocket = require('ws');
|
|
|
|
|
|
|
|
|
@ -24,30 +24,12 @@ dotenv.config({
|
|
|
|
|
log.level = process.env.LOG_LEVEL || 'verbose';
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param {Object.<string, any>} defaultConfig
|
|
|
|
|
* @param {string} redisUrl
|
|
|
|
|
* @param {Object.<string, any>} config
|
|
|
|
|
*/
|
|
|
|
|
const redisUrlToClient = async (defaultConfig, redisUrl) => {
|
|
|
|
|
const config = defaultConfig;
|
|
|
|
|
|
|
|
|
|
let client;
|
|
|
|
|
|
|
|
|
|
if (!redisUrl) {
|
|
|
|
|
client = redis.createClient(config);
|
|
|
|
|
} else if (redisUrl.startsWith('unix://')) {
|
|
|
|
|
client = redis.createClient(Object.assign(config, {
|
|
|
|
|
socket: {
|
|
|
|
|
path: redisUrl.slice(7),
|
|
|
|
|
},
|
|
|
|
|
}));
|
|
|
|
|
} else {
|
|
|
|
|
client = redis.createClient(Object.assign(config, {
|
|
|
|
|
url: redisUrl,
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const createRedisClient = async (config) => {
|
|
|
|
|
const { redisParams, redisUrl } = config;
|
|
|
|
|
const client = new Redis(redisUrl, redisParams);
|
|
|
|
|
client.on('error', (err) => log.error('Redis Client Error!', err));
|
|
|
|
|
await client.connect();
|
|
|
|
|
|
|
|
|
|
return client;
|
|
|
|
|
};
|
|
|
|
@ -147,23 +129,22 @@ const pgConfigFromEnv = (env) => {
|
|
|
|
|
* @returns {Object.<string, any>} configuration for the Redis connection
|
|
|
|
|
*/
|
|
|
|
|
const redisConfigFromEnv = (env) => {
|
|
|
|
|
const redisNamespace = env.REDIS_NAMESPACE || null;
|
|
|
|
|
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
|
|
|
|
|
// which means we can't use it. But this is something that should be looked into.
|
|
|
|
|
const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
|
|
|
|
|
|
|
|
|
|
const redisParams = {
|
|
|
|
|
socket: {
|
|
|
|
|
host: env.REDIS_HOST || '127.0.0.1',
|
|
|
|
|
port: env.REDIS_PORT || 6379,
|
|
|
|
|
},
|
|
|
|
|
database: env.REDIS_DB || 0,
|
|
|
|
|
host: env.REDIS_HOST || '127.0.0.1',
|
|
|
|
|
port: env.REDIS_PORT || 6379,
|
|
|
|
|
db: env.REDIS_DB || 0,
|
|
|
|
|
password: env.REDIS_PASSWORD || undefined,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (redisNamespace) {
|
|
|
|
|
redisParams.namespace = redisNamespace;
|
|
|
|
|
// redisParams.path takes precedence over host and port.
|
|
|
|
|
if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
|
|
|
|
|
redisParams.path = env.REDIS_URL.slice(7);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
redisParams,
|
|
|
|
|
redisPrefix,
|
|
|
|
@ -179,15 +160,15 @@ const startServer = async () => {
|
|
|
|
|
const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
|
|
|
|
|
const server = http.createServer(app);
|
|
|
|
|
|
|
|
|
|
const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
|
|
|
|
|
*/
|
|
|
|
|
const subs = {};
|
|
|
|
|
|
|
|
|
|
const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
|
|
|
|
|
const redisClient = await redisUrlToClient(redisParams, redisUrl);
|
|
|
|
|
const redisConfig = redisConfigFromEnv(process.env);
|
|
|
|
|
const redisSubscribeClient = await createRedisClient(redisConfig);
|
|
|
|
|
const redisClient = await createRedisClient(redisConfig);
|
|
|
|
|
const { redisPrefix } = redisConfig;
|
|
|
|
|
|
|
|
|
|
// Collect metrics from Node.js
|
|
|
|
|
metrics.collectDefaultMetrics();
|
|
|
|
@ -277,13 +258,13 @@ const startServer = async () => {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param {string} message
|
|
|
|
|
* @param {string} channel
|
|
|
|
|
* @param {string} message
|
|
|
|
|
*/
|
|
|
|
|
const onRedisMessage = (message, channel) => {
|
|
|
|
|
const onRedisMessage = (channel, message) => {
|
|
|
|
|
const callbacks = subs[channel];
|
|
|
|
|
|
|
|
|
|
log.silly(`New message on channel ${channel}`);
|
|
|
|
|
log.silly(`New message on channel ${redisPrefix}${channel}`);
|
|
|
|
|
|
|
|
|
|
if (!callbacks) {
|
|
|
|
|
return;
|
|
|
|
@ -294,6 +275,7 @@ const startServer = async () => {
|
|
|
|
|
|
|
|
|
|
callbacks.forEach(callback => callback(json));
|
|
|
|
|
};
|
|
|
|
|
redisSubscribeClient.on("message", onRedisMessage);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @callback SubscriptionListener
|
|
|
|
@ -312,8 +294,14 @@ const startServer = async () => {
|
|
|
|
|
|
|
|
|
|
if (subs[channel].length === 0) {
|
|
|
|
|
log.verbose(`Subscribe ${channel}`);
|
|
|
|
|
redisSubscribeClient.subscribe(channel, onRedisMessage);
|
|
|
|
|
redisSubscriptions.inc();
|
|
|
|
|
redisSubscribeClient.subscribe(channel, (err, count) => {
|
|
|
|
|
if (err) {
|
|
|
|
|
log.error(`Error subscribing to ${channel}`);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
redisSubscriptions.set(count);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
subs[channel].push(callback);
|
|
|
|
@ -334,8 +322,14 @@ const startServer = async () => {
|
|
|
|
|
|
|
|
|
|
if (subs[channel].length === 0) {
|
|
|
|
|
log.verbose(`Unsubscribe ${channel}`);
|
|
|
|
|
redisSubscribeClient.unsubscribe(channel);
|
|
|
|
|
redisSubscriptions.dec();
|
|
|
|
|
redisSubscribeClient.unsubscribe(channel, (err, count) => {
|
|
|
|
|
if (err) {
|
|
|
|
|
log.error(`Error unsubscribing to ${channel}`);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
redisSubscriptions.set(count);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
delete subs[channel];
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|