From 1ee4a17f3792669d3f03ddcb060aa48b622eca61 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 2 Feb 2017 13:56:14 +0100 Subject: [PATCH] Add logging and filtering to the node.js streaming API --- .../components/features/ui/index.jsx | 1 - package.json | 2 + streaming/index.js | 55 ++++++++++++++++--- yarn.lock | 54 ++++++++++++++++++ 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/app/assets/javascripts/components/features/ui/index.jsx b/app/assets/javascripts/components/features/ui/index.jsx index 003d061add..da44434b12 100644 --- a/app/assets/javascripts/components/features/ui/index.jsx +++ b/app/assets/javascripts/components/features/ui/index.jsx @@ -3,7 +3,6 @@ import NotificationsContainer from './containers/notifications_container'; import PureRenderMixin from 'react-addons-pure-render-mixin'; import LoadingBarContainer from './containers/loading_bar_container'; import HomeTimeline from '../home_timeline'; -import MentionsTimeline from '../mentions_timeline'; import Compose from '../compose'; import TabsBar from './components/tabs_bar'; import ModalContainer from './containers/modal_container'; diff --git a/package.json b/package.json index ea6b2f57f7..9685f07a46 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "emojione": "latest", "enzyme": "^2.4.1", "es6-promise": "^3.2.1", + "eventsource": "^0.2.1", "express": "^4.14.1", "http-link-header": "^0.5.0", "immutable": "^3.8.1", @@ -32,6 +33,7 @@ "jsdom": "^9.6.0", "mocha": "^3.1.1", "node-sass": "^4.0.0", + "npmlog": "^4.0.2", "pg": "^6.1.2", "react": "^15.3.2", "react-addons-perf": "^15.3.2", diff --git a/streaming/index.js b/streaming/index.js index 70067d2f63..945e287f5e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -2,6 +2,7 @@ import dotenv from 'dotenv' import express from 'express' import redis from 'redis' import pg from 'pg' +import log from 'npmlog' dotenv.config() @@ -40,6 +41,7 @@ const authenticationMiddleware = (req, res, next) => { pgPool.connect((err, client, done) => { if (err) { + log.error(err) return next(err) } @@ -47,6 +49,7 @@ const authenticationMiddleware = (req, res, next) => { done() if (err) { + log.error(err) return next(err) } @@ -66,10 +69,12 @@ const authenticationMiddleware = (req, res, next) => { const errorMiddleware = (err, req, res, next) => { res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: `${err}` })) + res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' })) } -const streamFrom = (id, res) => { +const streamFrom = (id, req, res, needsFiltering = false) => { + log.verbose(`Starting stream from ${id} for ${req.accountId}`) + res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Transfer-Encoding', 'chunked') @@ -78,11 +83,40 @@ const streamFrom = (id, res) => { redisClient.on('message', (channel, message) => { const { event, payload } = JSON.parse(message) - res.write(`event: ${event}\n`) - res.write(`data: ${payload}\n\n`) + if (needsFiltering) { + pgPool.connect((err, client, done) => { + if (err) { + log.error(err) + return + } + + const unpackedPayload = JSON.parse(payload) + const targetAccountIds = [unpackedPayload.account.id] + unpackedPayload.mentions.map(item => item.id) + (unpackedPayload.reblog ? unpackedPayload.reblog.account.id : []) + + client.query('SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ($2)', [req.accountId, targetAccountIds], (err, result) => { + done() + + if (err) { + log.error(err) + return + } + + if (result.rows.length > 0) { + return + } + + res.write(`event: ${event}\n`) + res.write(`data: ${payload}\n\n`) + }) + }) + } else { + res.write(`event: ${event}\n`) + res.write(`data: ${payload}\n\n`) + } }) - setInterval(() => res.write('\n'), 15000) + // Heartbeat to keep connection alive + setInterval(() => res.write(':thump\n'), 15000) redisClient.subscribe(id) } @@ -90,8 +124,11 @@ const streamFrom = (id, res) => { app.use(authenticationMiddleware) app.use(errorMiddleware) -app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, res)) -app.get('/api/v1/streaming/public', (_, res) => streamFrom('timeline:public', res)) -app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, res)) +app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, req, res)) +app.get('/api/v1/streaming/public', (req, res) => streamFrom('timeline:public', req, res, true)) +app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true)) + +log.level = 'verbose' +log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`) -app.listen(4000) +app.listen(process.env.PORT || 4000) diff --git a/yarn.lock b/yarn.lock index 33b17a9e2b..bd17479294 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2313,6 +2313,12 @@ events@^1.0.0, events@^1.1.1, events@~1.1.0: version "1.1.1" resolved "https://registry.yarnpkg.com/events/-/events-1.1.1.tgz#9ebdb7635ad099c70dcc4c2a1f5004288e8bd924" +eventsource@^0.2.1: + version "0.2.1" + resolved "https://registry.yarnpkg.com/eventsource/-/eventsource-0.2.1.tgz#662bf85f376e73b5c34c2ee17db566b8419a6232" + dependencies: + original "^1.0.0" + evp_bytestokey@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/evp_bytestokey/-/evp_bytestokey-1.0.0.tgz#497b66ad9fef65cd7c08a6180824ba1476b66e53" @@ -2554,6 +2560,20 @@ gauge@~2.6.0: strip-ansi "^3.0.1" wide-align "^1.1.0" +gauge@~2.7.1: + version "2.7.2" + resolved "https://registry.yarnpkg.com/gauge/-/gauge-2.7.2.tgz#15cecc31b02d05345a5d6b0e171cdb3ad2307774" + dependencies: + aproba "^1.0.3" + console-control-strings "^1.0.0" + has-unicode "^2.0.0" + object-assign "^4.1.0" + signal-exit "^3.0.0" + string-width "^1.0.1" + strip-ansi "^3.0.1" + supports-color "^0.2.0" + wide-align "^1.1.0" + gaze@^1.0.0: version "1.1.2" resolved "https://registry.yarnpkg.com/gaze/-/gaze-1.1.2.tgz#847224677adb8870d679257ed3388fdb61e40105" @@ -3797,6 +3817,15 @@ npmlog@4.x, npmlog@^4.0.0: gauge "~2.6.0" set-blocking "~2.0.0" +npmlog@^4.0.2: + version "4.0.2" + resolved "https://registry.yarnpkg.com/npmlog/-/npmlog-4.0.2.tgz#d03950e0e78ce1527ba26d2a7592e9348ac3e75f" + dependencies: + are-we-there-yet "~1.1.2" + console-control-strings "~1.1.0" + gauge "~2.7.1" + set-blocking "~2.0.0" + nth-check@~1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/nth-check/-/nth-check-1.0.1.tgz#9929acdf628fc2c41098deab82ac580cf149aae4" @@ -3911,6 +3940,12 @@ optionator@^0.8.1: type-check "~0.3.2" wordwrap "~1.0.0" +original@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/original/-/original-1.0.0.tgz#9147f93fa1696d04be61e01bd50baeaca656bd3b" + dependencies: + url-parse "1.0.x" + os-browserify@^0.2.0: version "0.2.1" resolved "https://registry.yarnpkg.com/os-browserify/-/os-browserify-0.2.1.tgz#63fc4ccee5d2d7763d26bbf8601078e6c2e0044f" @@ -4462,6 +4497,10 @@ querystring@0.2.0, querystring@^0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/querystring/-/querystring-0.2.0.tgz#b209849203bb25df820da756e747005878521620" +querystringify@0.0.x: + version "0.0.4" + resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-0.0.4.tgz#0cf7f84f9463ff0ae51c4c4b142d95be37724d9c" + raf@^3.1.0: version "3.3.0" resolved "https://registry.yarnpkg.com/raf/-/raf-3.3.0.tgz#93845eeffc773f8129039f677f80a36044eee2c3" @@ -4937,6 +4976,10 @@ require-main-filename@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/require-main-filename/-/require-main-filename-1.0.1.tgz#97f717b69d48784f5f526a6c5aa8ffdda055a4d1" +requires-port@1.0.x: + version "1.0.0" + resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff" + reselect@^2.5.4: version "2.5.4" resolved "https://registry.yarnpkg.com/reselect/-/reselect-2.5.4.tgz#b7d23fdf00b83fa7ad0279546f8dbbbd765c7047" @@ -5349,6 +5392,10 @@ supports-color@3.1.2, supports-color@^3.1.0, supports-color@^3.1.2: dependencies: has-flag "^1.0.0" +supports-color@^0.2.0: + version "0.2.0" + resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-0.2.0.tgz#d92de2694eb3f67323973d7ae3d8b55b4c22190a" + supports-color@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-2.0.0.tgz#535d045ce6b6363fa40117084629995e9df324c7" @@ -5538,6 +5585,13 @@ url-loader@^0.5.7: loader-utils "0.2.x" mime "1.2.x" +url-parse@1.0.x: + version "1.0.5" + resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.0.5.tgz#0854860422afdcfefeb6c965c662d4800169927b" + dependencies: + querystringify "0.0.x" + requires-port "1.0.x" + url@^0.11.0, url@~0.11.0: version "0.11.0" resolved "https://registry.yarnpkg.com/url/-/url-0.11.0.tgz#3838e97cfc60521eb73c525a8e55bfdd9e2e28f1"