Merge branch 'master' into glitch-soc/master
Conflicts: - `streaming/index.js`: Upstream entirely refactored it. Ported our changes to upstream's refactor. Hopefuly.
This commit is contained in:
		
						commit
						e9e6453eff
					
				
					 6 changed files with 835 additions and 339 deletions
				
			
		|  | @ -1,3 +1,5 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| import { connectStream } from '../stream'; | ||||
| import { | ||||
|   updateTimeline, | ||||
|  | @ -19,24 +21,59 @@ import { getLocale } from '../locales'; | |||
| 
 | ||||
| const { messages } = getLocale(); | ||||
| 
 | ||||
| export function connectTimelineStream (timelineId, path, pollingRefresh = null, accept = null) { | ||||
| /** | ||||
|  * @param {number} max | ||||
|  * @return {number} | ||||
|  */ | ||||
| const randomUpTo = max => | ||||
|   Math.floor(Math.random() * Math.floor(max)); | ||||
| 
 | ||||
|   return connectStream (path, pollingRefresh, (dispatch, getState) => { | ||||
| /** | ||||
|  * @param {string} timelineId | ||||
|  * @param {string} channelName | ||||
|  * @param {Object.<string, string>} params | ||||
|  * @param {Object} options | ||||
|  * @param {function(Function, Function): void} [options.fallback] | ||||
|  * @param {function(object): boolean} [options.accept] | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectTimelineStream = (timelineId, channelName, params = {}, options = {}) => | ||||
|   connectStream(channelName, params, (dispatch, getState) => { | ||||
|     const locale = getState().getIn(['meta', 'locale']); | ||||
| 
 | ||||
|     let pollingId; | ||||
| 
 | ||||
|     /** | ||||
|      * @param {function(Function, Function): void} fallback | ||||
|      */ | ||||
|     const useFallback = fallback => { | ||||
|       fallback(dispatch, () => { | ||||
|         pollingId = setTimeout(() => useFallback(fallback), 20000 + randomUpTo(20000)); | ||||
|       }); | ||||
|     }; | ||||
| 
 | ||||
|     return { | ||||
|       onConnect() { | ||||
|         dispatch(connectTimeline(timelineId)); | ||||
| 
 | ||||
|         if (pollingId) { | ||||
|           clearTimeout(pollingId); | ||||
|           pollingId = null; | ||||
|         } | ||||
|       }, | ||||
| 
 | ||||
|       onDisconnect() { | ||||
|         dispatch(disconnectTimeline(timelineId)); | ||||
| 
 | ||||
|         if (options.fallback) { | ||||
|           pollingId = setTimeout(() => useFallback(options.fallback), randomUpTo(40000)); | ||||
|         } | ||||
|       }, | ||||
| 
 | ||||
|       onReceive (data) { | ||||
|         switch(data.event) { | ||||
|         case 'update': | ||||
|           dispatch(updateTimeline(timelineId, JSON.parse(data.payload), accept)); | ||||
|           dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept)); | ||||
|           break; | ||||
|         case 'delete': | ||||
|           dispatch(deleteFromTimelines(data.payload)); | ||||
|  | @ -63,17 +100,59 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null, | |||
|       }, | ||||
|     }; | ||||
|   }); | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * @param {Function} dispatch | ||||
|  * @param {function(): void} done | ||||
|  */ | ||||
| const refreshHomeTimelineAndNotification = (dispatch, done) => { | ||||
|   dispatch(expandHomeTimeline({}, () => | ||||
|     dispatch(expandNotifications({}, () => | ||||
|       dispatch(fetchAnnouncements(done)))))); | ||||
| }; | ||||
| 
 | ||||
| export const connectUserStream      = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification); | ||||
| export const connectCommunityStream = ({ onlyMedia } = {}) => connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); | ||||
| export const connectPublicStream    = ({ onlyMedia, onlyRemote } = {}) => connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`); | ||||
| export const connectHashtagStream   = (id, tag, local, accept) => connectTimelineStream(`hashtag:${id}${local ? ':local' : ''}`, `hashtag${local ? ':local' : ''}&tag=${tag}`, null, accept); | ||||
| export const connectDirectStream    = () => connectTimelineStream('direct', 'direct'); | ||||
| export const connectListStream      = id => connectTimelineStream(`list:${id}`, `list&list=${id}`); | ||||
| /** | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectUserStream = () => | ||||
|   connectTimelineStream('home', 'user', {}, { fallback: refreshHomeTimelineAndNotification }); | ||||
| 
 | ||||
| /** | ||||
|  * @param {Object} options | ||||
|  * @param {boolean} [options.onlyMedia] | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectCommunityStream = ({ onlyMedia } = {}) => | ||||
|   connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); | ||||
| 
 | ||||
| /** | ||||
|  * @param {Object} options | ||||
|  * @param {boolean} [options.onlyMedia] | ||||
|  * @param {boolean} [options.onlyRemote] | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectPublicStream = ({ onlyMedia, onlyRemote } = {}) => | ||||
|   connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`); | ||||
| 
 | ||||
| /** | ||||
|  * @param {string} columnId | ||||
|  * @param {string} tagName | ||||
|  * @param {boolean} onlyLocal | ||||
|  * @param {function(object): boolean} accept | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectHashtagStream = (columnId, tagName, onlyLocal, accept) => | ||||
|   connectTimelineStream(`hashtag:${columnId}${onlyLocal ? ':local' : ''}`, `hashtag${onlyLocal ? ':local' : ''}`, { tag: tagName }, { accept }); | ||||
| 
 | ||||
| /** | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectDirectStream = () => | ||||
|   connectTimelineStream('direct', 'direct'); | ||||
| 
 | ||||
| /** | ||||
|  * @param {string} listId | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectListStream = listId => | ||||
|   connectTimelineStream(`list:${listId}`, 'list', { list: listId }); | ||||
|  |  | |||
|  | @ -1,87 +1,236 @@ | |||
| // @ts-check
 | ||||
| 
 | ||||
| import WebSocketClient from '@gamestdio/websocket'; | ||||
| 
 | ||||
| const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); | ||||
| /** | ||||
|  * @type {WebSocketClient | undefined} | ||||
|  */ | ||||
| let sharedConnection; | ||||
| 
 | ||||
| const knownEventTypes = [ | ||||
|   'update', | ||||
|   'delete', | ||||
|   'notification', | ||||
|   'conversation', | ||||
|   'filters_changed', | ||||
| ]; | ||||
| /** | ||||
|  * @typedef Subscription | ||||
|  * @property {string} channelName | ||||
|  * @property {Object.<string, string>} params | ||||
|  * @property {function(): void} onConnect | ||||
|  * @property {function(StreamEvent): void} onReceive | ||||
|  * @property {function(): void} onDisconnect | ||||
|  */ | ||||
| 
 | ||||
| export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { | ||||
|   return (dispatch, getState) => { | ||||
|     const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); | ||||
|     const accessToken = getState().getIn(['meta', 'access_token']); | ||||
|     const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); | ||||
|  /** | ||||
|   * @typedef StreamEvent | ||||
|   * @property {string} event | ||||
|   * @property {object} payload | ||||
|   */ | ||||
| 
 | ||||
|     let polling = null; | ||||
| /** | ||||
|  * @type {Array.<Subscription>} | ||||
|  */ | ||||
| const subscriptions = []; | ||||
| 
 | ||||
|     const setupPolling = () => { | ||||
|       pollingRefresh(dispatch, () => { | ||||
|         polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); | ||||
|       }); | ||||
|     }; | ||||
| /** | ||||
|  * @type {Object.<string, number>} | ||||
|  */ | ||||
| const subscriptionCounters = {}; | ||||
| 
 | ||||
|     const clearPolling = () => { | ||||
|       if (polling) { | ||||
|         clearTimeout(polling); | ||||
|         polling = null; | ||||
| /** | ||||
|  * @param {Subscription} subscription | ||||
|  */ | ||||
| const addSubscription = subscription => { | ||||
|   subscriptions.push(subscription); | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * @param {Subscription} subscription | ||||
|  */ | ||||
| const removeSubscription = subscription => { | ||||
|   const index = subscriptions.indexOf(subscription); | ||||
| 
 | ||||
|   if (index !== -1) { | ||||
|     subscriptions.splice(index, 1); | ||||
|   } | ||||
|     }; | ||||
| }; | ||||
| 
 | ||||
|     const subscription = getStream(streamingAPIBaseURL, accessToken, path, { | ||||
|       connected () { | ||||
|         if (pollingRefresh) { | ||||
|           clearPolling(); | ||||
| /** | ||||
|  * @param {Subscription} subscription | ||||
|  */ | ||||
| const subscribe = ({ channelName, params, onConnect }) => { | ||||
|   const key = channelNameWithInlineParams(channelName, params); | ||||
| 
 | ||||
|   subscriptionCounters[key] = subscriptionCounters[key] || 0; | ||||
| 
 | ||||
|   if (subscriptionCounters[key] === 0) { | ||||
|     sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params })); | ||||
|   } | ||||
| 
 | ||||
|   subscriptionCounters[key] += 1; | ||||
|   onConnect(); | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * @param {Subscription} subscription | ||||
|  */ | ||||
| const unsubscribe = ({ channelName, params, onDisconnect }) => { | ||||
|   const key = channelNameWithInlineParams(channelName, params); | ||||
| 
 | ||||
|   subscriptionCounters[key] = subscriptionCounters[key] || 1; | ||||
| 
 | ||||
|   if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) { | ||||
|     sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params })); | ||||
|   } | ||||
| 
 | ||||
|   subscriptionCounters[key] -= 1; | ||||
|   onDisconnect(); | ||||
| }; | ||||
| 
 | ||||
| const sharedCallbacks = { | ||||
|   connected () { | ||||
|     subscriptions.forEach(subscription => subscribe(subscription)); | ||||
|   }, | ||||
| 
 | ||||
|   received (data) { | ||||
|     const { stream } = data; | ||||
| 
 | ||||
|     subscriptions.filter(({ channelName, params }) => { | ||||
|       const streamChannelName = stream[0]; | ||||
| 
 | ||||
|       if (stream.length === 1) { | ||||
|         return channelName === streamChannelName; | ||||
|       } | ||||
| 
 | ||||
|       const streamIdentifier = stream[1]; | ||||
| 
 | ||||
|       if (['hashtag', 'hashtag:local'].includes(channelName)) { | ||||
|         return channelName === streamChannelName && params.tag === streamIdentifier; | ||||
|       } else if (channelName === 'list') { | ||||
|         return channelName === streamChannelName && params.list === streamIdentifier; | ||||
|       } | ||||
| 
 | ||||
|       return false; | ||||
|     }).forEach(subscription => { | ||||
|       subscription.onReceive(data); | ||||
|     }); | ||||
|   }, | ||||
| 
 | ||||
|   disconnected () { | ||||
|         if (pollingRefresh) { | ||||
|           polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); | ||||
|     subscriptions.forEach(({ onDisconnect }) => onDisconnect()); | ||||
|   }, | ||||
| 
 | ||||
|   reconnected () { | ||||
|     subscriptions.forEach(subscription => subscribe(subscription)); | ||||
|   }, | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * @param {string} channelName | ||||
|  * @param {Object.<string, string>} params | ||||
|  * @return {string} | ||||
|  */ | ||||
| const channelNameWithInlineParams = (channelName, params) => { | ||||
|   if (Object.keys(params).length === 0) { | ||||
|     return channelName; | ||||
|   } | ||||
| 
 | ||||
|         onDisconnect(); | ||||
|   return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`; | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * @param {string} channelName | ||||
|  * @param {Object.<string, string>} params | ||||
|  * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks | ||||
|  * @return {function(): void} | ||||
|  */ | ||||
| export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => { | ||||
|   const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); | ||||
|   const accessToken = getState().getIn(['meta', 'access_token']); | ||||
|   const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState); | ||||
| 
 | ||||
|   // If we cannot use a websockets connection, we must fall back
 | ||||
|   // to using individual connections for each channel
 | ||||
|   if (!streamingAPIBaseURL.startsWith('ws')) { | ||||
|     const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), { | ||||
|       connected () { | ||||
|         onConnect(); | ||||
|       }, | ||||
| 
 | ||||
|       received (data) { | ||||
|         onReceive(data); | ||||
|       }, | ||||
| 
 | ||||
|       reconnected () { | ||||
|         if (pollingRefresh) { | ||||
|           clearPolling(); | ||||
|           pollingRefresh(dispatch); | ||||
|         } | ||||
| 
 | ||||
|         onConnect(); | ||||
|       disconnected () { | ||||
|         onDisconnect(); | ||||
|       }, | ||||
| 
 | ||||
|       reconnected () { | ||||
|         onConnect(); | ||||
|       }, | ||||
|     }); | ||||
| 
 | ||||
|     const disconnect = () => { | ||||
|       if (subscription) { | ||||
|         subscription.close(); | ||||
|     return () => { | ||||
|       connection.close(); | ||||
|     }; | ||||
|   } | ||||
| 
 | ||||
|       clearPolling(); | ||||
|   const subscription = { | ||||
|     channelName, | ||||
|     params, | ||||
|     onConnect, | ||||
|     onReceive, | ||||
|     onDisconnect, | ||||
|   }; | ||||
| 
 | ||||
|     return disconnect; | ||||
|   addSubscription(subscription); | ||||
| 
 | ||||
|   // If a connection is open, we can execute the subscription right now. Otherwise,
 | ||||
|   // because we have already registered it, it will be executed on connect
 | ||||
| 
 | ||||
|   if (!sharedConnection) { | ||||
|     sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks)); | ||||
|   } else if (sharedConnection.readyState === WebSocketClient.OPEN) { | ||||
|     subscribe(subscription); | ||||
|   } | ||||
| 
 | ||||
|   return () => { | ||||
|     removeSubscription(subscription); | ||||
|     unsubscribe(subscription); | ||||
|   }; | ||||
| } | ||||
| }; | ||||
| 
 | ||||
| const KNOWN_EVENT_TYPES = [ | ||||
|   'update', | ||||
|   'delete', | ||||
|   'notification', | ||||
|   'conversation', | ||||
|   'filters_changed', | ||||
|   'encrypted_message', | ||||
|   'announcement', | ||||
|   'announcement.delete', | ||||
|   'announcement.reaction', | ||||
| ]; | ||||
| 
 | ||||
| export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { | ||||
|   const params = stream.split('&'); | ||||
|   stream = params.shift(); | ||||
| /** | ||||
|  * @param {MessageEvent} e | ||||
|  * @param {function(StreamEvent): void} received | ||||
|  */ | ||||
| const handleEventSourceMessage = (e, received) => { | ||||
|   received({ | ||||
|     event: e.type, | ||||
|     payload: e.data, | ||||
|   }); | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * @param {string} streamingAPIBaseURL | ||||
|  * @param {string} accessToken | ||||
|  * @param {string} channelName | ||||
|  * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks | ||||
|  * @return {WebSocketClient | EventSource} | ||||
|  */ | ||||
| const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => { | ||||
|   const params = channelName.split('&'); | ||||
| 
 | ||||
|   channelName = params.shift(); | ||||
| 
 | ||||
|   if (streamingAPIBaseURL.startsWith('ws')) { | ||||
|     params.unshift(`stream=${stream}`); | ||||
|     const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); | ||||
| 
 | ||||
|     ws.onopen      = connected; | ||||
|  | @ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co | |||
|     return ws; | ||||
|   } | ||||
| 
 | ||||
|   stream = stream.replace(/:/g, '/'); | ||||
|   channelName = channelName.replace(/:/g, '/'); | ||||
| 
 | ||||
|   if (channelName.endsWith(':media')) { | ||||
|     channelName = channelName.replace('/media', ''); | ||||
|     params.push('only_media=true'); | ||||
|   } | ||||
| 
 | ||||
|   params.push(`access_token=${accessToken}`); | ||||
|   const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`); | ||||
| 
 | ||||
|   const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`); | ||||
| 
 | ||||
|   let firstConnect = true; | ||||
| 
 | ||||
|   es.onopen = () => { | ||||
|     if (firstConnect) { | ||||
|       firstConnect = false; | ||||
|  | @ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co | |||
|       reconnected(); | ||||
|     } | ||||
|   }; | ||||
|   for (let type of knownEventTypes) { | ||||
|     es.addEventListener(type, (e) => { | ||||
|       received({ | ||||
|         event: e.type, | ||||
|         payload: e.data, | ||||
| 
 | ||||
|   KNOWN_EVENT_TYPES.forEach(type => { | ||||
|     es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received)); | ||||
|   }); | ||||
|     }); | ||||
|   } | ||||
|   es.onerror = disconnected; | ||||
| 
 | ||||
|   es.onerror = /** @type {function(): void} */ (disconnected); | ||||
| 
 | ||||
|   return es; | ||||
| }; | ||||
|  |  | |||
|  | @ -30,7 +30,7 @@ class Form::CustomEmojiBatch | |||
|   private | ||||
| 
 | ||||
|   def custom_emojis | ||||
|     CustomEmoji.where(id: custom_emoji_ids) | ||||
|     @custom_emojis ||= CustomEmoji.where(id: custom_emoji_ids) | ||||
|   end | ||||
| 
 | ||||
|   def update! | ||||
|  |  | |||
|  | @ -173,11 +173,7 @@ Rails.application.routes.draw do | |||
|     get '/dashboard', to: 'dashboard#index' | ||||
| 
 | ||||
|     resources :domain_allows, only: [:new, :create, :show, :destroy] | ||||
|     resources :domain_blocks, only: [:new, :create, :show, :destroy, :update] do | ||||
|       member do | ||||
|         get :edit | ||||
|       end | ||||
|     end | ||||
|     resources :domain_blocks, only: [:new, :create, :show, :destroy, :update, :edit] | ||||
| 
 | ||||
|     resources :email_domain_blocks, only: [:index, :new, :create, :destroy] | ||||
|     resources :action_logs, only: [:index] | ||||
|  |  | |||
|  | @ -89,7 +89,7 @@ module Mastodon | |||
|             path_segments = object.key.split('/') | ||||
|             path_segments.delete('cache') | ||||
| 
 | ||||
|             if path_segments.size != 7 | ||||
|             unless [7, 10].include?(path_segments.size) | ||||
|               progress.log(pastel.yellow("Unrecognized file found: #{object.key}")) | ||||
|               next | ||||
|             end | ||||
|  | @ -133,7 +133,7 @@ module Mastodon | |||
|           path_segments = key.split(File::SEPARATOR) | ||||
|           path_segments.delete('cache') | ||||
| 
 | ||||
|           if path_segments.size != 7 | ||||
|           unless [7, 10].include?(path_segments.size) | ||||
|             progress.log(pastel.yellow("Unrecognized file found: #{key}")) | ||||
|             next | ||||
|           end | ||||
|  | @ -258,7 +258,7 @@ module Mastodon | |||
|       path_segments = path.split('/')[2..-1] | ||||
|       path_segments.delete('cache') | ||||
| 
 | ||||
|       if path_segments.size != 7 | ||||
|       unless [7, 10].include?(path_segments.size) | ||||
|         say('Not a media URL', :red) | ||||
|         exit(1) | ||||
|       end | ||||
|  | @ -311,7 +311,7 @@ module Mastodon | |||
|         segments = object.key.split('/') | ||||
|         segments.delete('cache') | ||||
| 
 | ||||
|         next if segments.size != 7 | ||||
|         next unless [7, 10].include?(segments.size) | ||||
| 
 | ||||
|         model_name = segments.first.classify | ||||
|         record_id  = segments[2..-2].join.to_i | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
		Loading…
	
		Reference in a new issue