diff --git a/app/javascript/flavours/glitch/util/stream.js b/app/javascript/flavours/glitch/util/stream.js index 50f90d44cb..fe965bcb0e 100644 --- a/app/javascript/flavours/glitch/util/stream.js +++ b/app/javascript/flavours/glitch/util/stream.js @@ -2,6 +2,14 @@ import WebSocketClient from '@gamestdio/websocket'; const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); +const knownEventTypes = [ + 'update', + 'delete', + 'notification', + 'conversation', + 'filters_changed', +]; + export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { return (dispatch, getState) => { const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); @@ -69,14 +77,42 @@ export function connectStream(path, pollingRefresh = null, callbacks = () => ({ export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { - const params = [ `stream=${stream}` ]; - - const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); - - ws.onopen = connected; - ws.onmessage = e => received(JSON.parse(e.data)); - ws.onclose = disconnected; - ws.onreconnect = reconnected; + const params = stream.split('&'); + stream = params.shift(); + + if (streamingAPIBaseURL.startsWith('ws')) { + params.unshift(`stream=${stream}`); + const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); + + ws.onopen = connected; + ws.onmessage = e => received(JSON.parse(e.data)); + ws.onclose = disconnected; + ws.onreconnect = reconnected; + + return ws; + } + + params.push(`access_token=${accessToken}`); + const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`); + + let firstConnect = true; + es.onopen = () => { + if (firstConnect) { + firstConnect = false; + connected(); + } else { + reconnected(); + } + }; + for (let type of knownEventTypes) { + es.addEventListener(type, (e) => { + received({ + event: e.type, + payload: e.data, + }); + }); + } + es.onerror = disconnected; - return ws; + return es; };