@ -92,13 +92,18 @@ const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development'
/ * *
/ * *
* @ param { string } json
* @ param { string } json
* @ param { any } req
* @ return { Object . < string , any > | null }
* @ return { Object . < string , any > | null }
* /
* /
const parseJSON = ( json ) => {
const parseJSON = ( json , req ) => {
try {
try {
return JSON . parse ( json ) ;
return JSON . parse ( json ) ;
} catch ( err ) {
} catch ( err ) {
log . error ( err ) ;
if ( req . accountId ) {
log . warn ( req . requestId , ` Error parsing message from user ${ req . accountId } : ${ err } ` ) ;
} else {
log . silly ( req . requestId , ` Error parsing message from ${ req . remoteAddress } : ${ err } ` ) ;
}
return null ;
return null ;
}
}
} ;
} ;
@ -450,7 +455,7 @@ const startWorker = async (workerId) => {
* /
* /
const createSystemMessageListener = ( req , eventHandlers ) => {
const createSystemMessageListener = ( req , eventHandlers ) => {
return message => {
return message => {
const json = parseJSON ( message );
const json = parseJSON ( message , req );
if ( ! json ) return ;
if ( ! json ) return ;
@ -573,7 +578,7 @@ const startWorker = async (workerId) => {
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ` ) ;
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ` ) ;
const listener = message => {
const listener = message => {
const json = parseJSON ( message );
const json = parseJSON ( message , req );
if ( ! json ) return ;
if ( ! json ) return ;
@ -1037,7 +1042,7 @@ const startWorker = async (workerId) => {
ws . on ( 'error' , onEnd ) ;
ws . on ( 'error' , onEnd ) ;
ws . on ( 'message' , data => {
ws . on ( 'message' , data => {
const json = parseJSON ( data );
const json = parseJSON ( data , session . request );
if ( ! json ) return ;
if ( ! json ) return ;