@ -282,6 +282,14 @@ const startWorker = (workerId) => {
next ( ) ;
} ;
/ * *
* @ param { any } req
* @ param { string [ ] } necessaryScopes
* @ return { boolean }
* /
const isInScope = ( req , necessaryScopes ) =>
req . scopes . some ( scope => necessaryScopes . includes ( scope ) ) ;
/ * *
* @ param { string } token
* @ param { any } req
@ -314,7 +322,6 @@ const startWorker = (workerId) => {
req . scopes = result . rows [ 0 ] . scopes . split ( ' ' ) ;
req . accountId = result . rows [ 0 ] . account _id ;
req . chosenLanguages = result . rows [ 0 ] . chosen _languages ;
req . allowNotifications = req . scopes . some ( scope => [ 'read' , 'read:notifications' ] . includes ( scope ) ) ;
req . deviceId = result . rows [ 0 ] . device _id ;
resolve ( ) ;
@ -581,15 +588,13 @@ const startWorker = (workerId) => {
* @ param { function ( string , string ) : void } output
* @ param { function ( string [ ] , function ( string ) : void ) : void } attachCloseHandler
* @ param { boolean = } needsFiltering
* @ param { boolean = } notificationOnly
* @ param { boolean = } allowLocalOnly
* @ return { function ( string ) : void }
* /
const streamFrom = ( ids , req , output , attachCloseHandler , needsFiltering = false , notificationOnly = false , allowLocalOnly = false ) => {
const streamFrom = ( ids , req , output , attachCloseHandler , needsFiltering = false , allowLocalOnly = false ) => {
const accountId = req . accountId || req . remoteAddress ;
const streamType = notificationOnly ? ' (notification)' : '' ;
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ${ streamType } ` ) ;
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ` ) ;
const listener = message => {
const json = parseJSON ( message ) ;
@ -607,14 +612,6 @@ const startWorker = (workerId) => {
output ( event , encodedPayload ) ;
} ;
if ( notificationOnly && event !== 'notification' ) {
return ;
}
if ( event === 'notification' && ! req . allowNotifications ) {
return ;
}
// Only send local-only statuses to logged-in users
if ( event === 'update' && payload . local _only && ! ( req . accountId && allowLocalOnly ) ) {
log . silly ( req . requestId , ` Message ${ payload . id } filtered because it was local-only ` ) ;
@ -767,7 +764,7 @@ const startWorker = (workerId) => {
const onSend = streamToHttp ( req , res ) ;
const onEnd = streamHttpEnd ( req , subscriptionHeartbeat ( channelIds ) ) ;
streamFrom ( channelIds , req , onSend , onEnd , options . needsFiltering , options . notificationOnly, options . allowLocalOnly) ;
streamFrom ( channelIds , req , onSend , onEnd , options . needsFiltering , options . allowLocalOnly) ;
} ) . catch ( err => {
log . verbose ( req . requestId , 'Subscription error:' , err . toString ( ) ) ;
httpNotFound ( res ) ;
@ -783,88 +780,106 @@ const startWorker = (workerId) => {
* @ property { string } [ only _media ]
* /
/ * *
* @ param { any } req
* @ return { string [ ] }
* /
const channelsForUserStream = req => {
const arr = [ ` timeline: ${ req . accountId } ` ] ;
if ( isInScope ( req , [ 'crypto' ] ) && req . deviceId ) {
arr . push ( ` timeline: ${ req . accountId } : ${ req . deviceId } ` ) ;
}
if ( isInScope ( req , [ 'read' , 'read:notifications' ] ) ) {
arr . push ( ` timeline: ${ req . accountId } :notifications ` ) ;
}
return arr ;
} ;
/ * *
* @ param { any } req
* @ param { string } name
* @ param { StreamParams } params
* @ return { Promise . < { channelIds : string [ ] , options : { needsFiltering : boolean , notificationOnly : boolean } } > }
* @ return { Promise . < { channelIds : string [ ] , options : { needsFiltering : boolean } } > }
* /
const channelNameToIds = ( req , name , params ) => new Promise ( ( resolve , reject ) => {
switch ( name ) {
case 'user' :
resolve ( {
channelIds : req . deviceId ? [ ` timeline: ${ req . accountId } ` , ` timeline: ${ req . accountId } : ${ req . deviceId } ` ] : [ ` timeline: ${ req . accountId } ` ] ,
options : { needsFiltering : false , notificationOnly : false , allowLocalOnly : true } ,
channelIds : channelsForUserStream( req ) ,
options : { needsFiltering : false , allowLocalOnly: true } ,
} ) ;
break ;
case 'user:notification' :
resolve ( {
channelIds : [ ` timeline: ${ req . accountId } ` ] ,
options : { needsFiltering : false , notificationOnly: true , allowLocalOnly: true } ,
channelIds : [ ` timeline: ${ req . accountId } :notifications `] ,
options : { needsFiltering : false , allowLocalOnly: true } ,
} ) ;
break ;
case 'public' :
resolve ( {
channelIds : [ 'timeline:public' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: isTruthy ( params . allow _local _only ) } ,
options : { needsFiltering : true , allowLocalOnly: isTruthy ( params . allow _local _only ) } ,
} ) ;
break ;
case 'public:allow_local_only' :
resolve ( {
channelIds : [ 'timeline:public' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
break ;
case 'public:local' :
resolve ( {
channelIds : [ 'timeline:public:local' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
break ;
case 'public:remote' :
resolve ( {
channelIds : [ 'timeline:public:remote' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: false } ,
options : { needsFiltering : true , allowLocalOnly: false } ,
} ) ;
break ;
case 'public:media' :
resolve ( {
channelIds : [ 'timeline:public:media' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: isTruthy ( query . allow _local _only ) } ,
options : { needsFiltering : true , allowLocalOnly: isTruthy ( query . allow _local _only ) } ,
} ) ;
break ;
case 'public:allow_local_only:media' :
resolve ( {
channelIds : [ 'timeline:public:media' ] ,
options : { needsFiltering : true , notificationsOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
break ;
case 'public:local:media' :
resolve ( {
channelIds : [ 'timeline:public:local:media' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
break ;
case 'public:remote:media' :
resolve ( {
channelIds : [ 'timeline:public:remote:media' ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: false } ,
options : { needsFiltering : true , allowLocalOnly: false } ,
} ) ;
break ;
case 'direct' :
resolve ( {
channelIds : [ ` timeline:direct: ${ req . accountId } ` ] ,
options : { needsFiltering : false , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : false , allowLocalOnly: true } ,
} ) ;
break ;
@ -874,7 +889,7 @@ const startWorker = (workerId) => {
} else {
resolve ( {
channelIds : [ ` timeline:hashtag: ${ params . tag . toLowerCase ( ) } ` ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
}
@ -885,7 +900,7 @@ const startWorker = (workerId) => {
} else {
resolve ( {
channelIds : [ ` timeline:hashtag: ${ params . tag . toLowerCase ( ) } :local ` ] ,
options : { needsFiltering : true , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : true , allowLocalOnly: true } ,
} ) ;
}
@ -894,7 +909,7 @@ const startWorker = (workerId) => {
authorizeListAccess ( params . list , req ) . then ( ( ) => {
resolve ( {
channelIds : [ ` timeline:list: ${ params . list } ` ] ,
options : { needsFiltering : false , notificationOnly: false , allowLocalOnly: true } ,
options : { needsFiltering : false , allowLocalOnly: true } ,
} ) ;
} ) . catch ( ( ) => {
reject ( 'Not authorized to stream this list' ) ;
@ -941,7 +956,8 @@ const startWorker = (workerId) => {
const onSend = streamToWs ( request , socket , streamNameFromChannelName ( channelName , params ) ) ;
const stopHeartbeat = subscriptionHeartbeat ( channelIds ) ;
const listener = streamFrom ( channelIds , request , onSend , undefined , options . needsFiltering , options . notificationOnly , options . allowLocalOnly ) ;
const listener = streamFrom ( channelIds , request , onSend , undefined , options . needsFiltering , options . allowLocalOnly ) ;
subscriptions [ channelIds . join ( ';' ) ] = {
listener ,