@ -2,6 +2,7 @@ import dotenv from 'dotenv'
import express from 'express'
import redis from 'redis'
import pg from 'pg'
import log from 'npmlog'
dotenv . config ( )
@ -40,6 +41,7 @@ const authenticationMiddleware = (req, res, next) => {
pgPool . connect ( ( err , client , done ) => {
if ( err ) {
log . error ( err )
return next ( err )
}
@ -47,6 +49,7 @@ const authenticationMiddleware = (req, res, next) => {
done ( )
if ( err ) {
log . error ( err )
return next ( err )
}
@ -66,10 +69,12 @@ const authenticationMiddleware = (req, res, next) => {
const errorMiddleware = ( err , req , res , next ) => {
res . writeHead ( err . statusCode || 500 , { 'Content-Type' : 'application/json' } )
res . end ( JSON . stringify ( { error : ` ${ err } ` } ) )
res . end ( JSON . stringify ( { error : err . statusCode ? ` ${ err } ` : 'An unexpected error occured' } ) )
}
const streamFrom = ( id , res ) => {
const streamFrom = ( id , req , res , needsFiltering = false ) => {
log . verbose ( ` Starting stream from ${ id } for ${ req . accountId } ` )
res . setHeader ( 'Content-Type' , 'text/event-stream' )
res . setHeader ( 'Transfer-Encoding' , 'chunked' )
@ -78,11 +83,40 @@ const streamFrom = (id, res) => {
redisClient . on ( 'message' , ( channel , message ) => {
const { event , payload } = JSON . parse ( message )
res . write ( ` event: ${ event } \n ` )
res . write ( ` data: ${ payload } \n \n ` )
if ( needsFiltering ) {
pgPool . connect ( ( err , client , done ) => {
if ( err ) {
log . error ( err )
return
}
const unpackedPayload = JSON . parse ( payload )
const targetAccountIds = [ unpackedPayload . account . id ] + unpackedPayload . mentions . map ( item => item . id ) + ( unpackedPayload . reblog ? unpackedPayload . reblog . account . id : [ ] )
client . query ( 'SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ($2)' , [ req . accountId , targetAccountIds ] , ( err , result ) => {
done ( )
if ( err ) {
log . error ( err )
return
}
if ( result . rows . length > 0 ) {
return
}
res . write ( ` event: ${ event } \n ` )
res . write ( ` data: ${ payload } \n \n ` )
} )
} )
} else {
res . write ( ` event: ${ event } \n ` )
res . write ( ` data: ${ payload } \n \n ` )
}
} )
setInterval ( ( ) => res . write ( '\n' ) , 15000 )
// Heartbeat to keep connection alive
setInterval ( ( ) => res . write ( ':thump\n' ) , 15000 )
redisClient . subscribe ( id )
}
@ -90,8 +124,11 @@ const streamFrom = (id, res) => {
app . use ( authenticationMiddleware )
app . use ( errorMiddleware )
app . get ( '/api/v1/streaming/user' , ( req , res ) => streamFrom ( ` timeline: ${ req . accountId } ` , res ) )
app . get ( '/api/v1/streaming/public' , ( _ , res ) => streamFrom ( 'timeline:public' , res ) )
app . get ( '/api/v1/streaming/hashtag' , ( req , res ) => streamFrom ( ` timeline:hashtag: ${ req . params . tag } ` , res ) )
app . get ( '/api/v1/streaming/user' , ( req , res ) => streamFrom ( ` timeline: ${ req . accountId } ` , req , res ) )
app . get ( '/api/v1/streaming/public' , ( req , res ) => streamFrom ( 'timeline:public' , req , res , true ) )
app . get ( '/api/v1/streaming/hashtag' , ( req , res ) => streamFrom ( ` timeline:hashtag: ${ req . params . tag } ` , req , res , true ) )
log . level = 'verbose'
log . info ( ` Starting HTTP server on port ${ process . env . PORT || 4000 } ` )
app . listen ( 4000 )
app . listen ( process . env . PORT || 4000 )