Add transparent support for EventSource streaming. (#12887)
This activates if the streaming base URL does not start with "ws". All currently-live streaming base URLs start with "wss://".
This commit is contained in:
parent
7d03ceb9ef
commit
ad7f9fe438
1 changed files with 43 additions and 7 deletions
|
@ -2,6 +2,14 @@ import WebSocketClient from '@gamestdio/websocket';
|
||||||
|
|
||||||
const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max));
|
const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max));
|
||||||
|
|
||||||
|
const knownEventTypes = [
|
||||||
|
'update',
|
||||||
|
'delete',
|
||||||
|
'notification',
|
||||||
|
'conversation',
|
||||||
|
'filters_changed',
|
||||||
|
];
|
||||||
|
|
||||||
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
|
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
|
||||||
return (dispatch, getState) => {
|
return (dispatch, getState) => {
|
||||||
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
|
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
|
||||||
|
@ -69,8 +77,11 @@ export function connectStream(path, pollingRefresh = null, callbacks = () => ({
|
||||||
|
|
||||||
|
|
||||||
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
|
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
|
||||||
const params = [ `stream=${stream}` ];
|
const params = stream.split('&');
|
||||||
|
stream = params.shift();
|
||||||
|
|
||||||
|
if (streamingAPIBaseURL.startsWith('ws')) {
|
||||||
|
params.unshift(`stream=${stream}`);
|
||||||
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
|
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
|
||||||
|
|
||||||
ws.onopen = connected;
|
ws.onopen = connected;
|
||||||
|
@ -79,4 +90,29 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co
|
||||||
ws.onreconnect = reconnected;
|
ws.onreconnect = reconnected;
|
||||||
|
|
||||||
return ws;
|
return ws;
|
||||||
|
}
|
||||||
|
|
||||||
|
params.push(`access_token=${accessToken}`);
|
||||||
|
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`);
|
||||||
|
|
||||||
|
let firstConnect = true;
|
||||||
|
es.onopen = () => {
|
||||||
|
if (firstConnect) {
|
||||||
|
firstConnect = false;
|
||||||
|
connected();
|
||||||
|
} else {
|
||||||
|
reconnected();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (let type of knownEventTypes) {
|
||||||
|
es.addEventListener(type, (e) => {
|
||||||
|
received({
|
||||||
|
event: e.type,
|
||||||
|
payload: e.data,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
es.onerror = disconnected;
|
||||||
|
|
||||||
|
return es;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue