diff --git a/package-lock.json b/package-lock.json index 5cd3af3..939657b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -37,9 +37,9 @@ "integrity": "sha1-nbVjk33YaRX2kJK8QyWdL0hXjkE=" }, "@types/ws": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/@types/ws/-/ws-5.1.2.tgz", - "integrity": "sha512-NkTXUKTYdXdnPE2aUUbGOXE1XfMK527SCvU/9bj86kyFF6kZ9ZnOQ3mK5jADn98Y2vEUD/7wKDgZa7Qst2wYOg==", + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-6.0.1.tgz", + "integrity": "sha512-EzH8k1gyZ4xih/MaZTXwT2xOkPiIMSrhQ9b8wrlX88L0T02eYsddatQlwVFlEPyEqV0ChpdpNnE51QPH6NVT4Q==", "requires": { "@types/events": "*", "@types/node": "*" @@ -107,6 +107,11 @@ "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha1-x57Zf380y48robyXkLzDZkdLS3k=" }, + "autobind-decorator": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/autobind-decorator/-/autobind-decorator-2.1.0.tgz", + "integrity": "sha512-bgyxeRi1R2Q8kWpHsb1c+lXCulbIAHsyZRddaS+agAUX3hFUVZMociwvRgeZi1zWvfqEEjybSv4zxWvFV8ydQQ==" + }, "aws-sign2": { "version": "0.7.0", "resolved": "https://registry.npmjs.org/aws-sign2/-/aws-sign2-0.7.0.tgz", @@ -430,9 +435,9 @@ "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==" }, "reconnecting-websocket": { - "version": "4.0.0-rc5", - "resolved": "https://registry.npmjs.org/reconnecting-websocket/-/reconnecting-websocket-4.0.0-rc5.tgz", - "integrity": "sha512-ew+Twq9j66vhRtW9mT0xIgkLCQsDpslAideVYuB1JjW4U9wm27XZfA786K6pCKcUFkDWmktL+uI92ITLdn2eOQ==" + "version": "4.1.5", + "resolved": "https://registry.npmjs.org/reconnecting-websocket/-/reconnecting-websocket-4.1.5.tgz", + "integrity": "sha512-NZvNhK+N2Z/hTCJb/xDDfK7zfypoDBKajLwMe7vDzQfRIUl5vyGWYATFuMcoEc393LipmU9rzVMczU1zybhW4w==" }, "request": { "version": "2.87.0", diff --git a/package.json b/package.json index 2aacf5f..261e7fc 100644 --- a/package.json +++ b/package.json @@ -11,11 +11,12 @@ "@types/promise-retry": "1.1.2", "@types/seedrandom": "2.4.27", "@types/ws": "6.0.1", + "autobind-decorator": "2.1.0", "lokijs": "1.5.5", "mecab-async": "0.1.2", "misskey-reversi": "0.0.5", "promise-retry": "1.1.1", - "reconnecting-websocket": "4.0.0-rc5", + "reconnecting-websocket": "4.1.5", "request": "2.87.0", "request-promise-native": "1.0.5", "seedrandom": "2.4.3", diff --git a/src/ai.ts b/src/ai.ts index 9bb7152..9cfbdfd 100644 --- a/src/ai.ts +++ b/src/ai.ts @@ -1,7 +1,6 @@ // AI CORE import * as loki from 'lokijs'; -import * as WebSocket from 'ws'; import * as request from 'request-promise-native'; import config from './config'; import IModule from './module'; @@ -9,26 +8,15 @@ import MessageLike from './message-like'; import { FriendDoc } from './friend'; import { User } from './misskey/user'; import getCollection from './utils/get-collection'; -const ReconnectingWebSocket = require('reconnecting-websocket'); +import Stream from './stream'; /** * 藍 */ export default class 藍 { public account: User; - - /** - * ホームストリーム - */ - private connection: any; - - /** - * ローカルタイムラインストリーム - */ - private localTimelineConnection: any; - + public connection: Stream; private modules: IModule[] = []; - public db: loki; private contexts: loki.Collection<{ @@ -65,86 +53,35 @@ export default class 藍 { }); //#endregion + // Init stream + this.connection = new Stream(); + + //#region Main stream + const mainStream = this.connection.useSharedConnection('main'); + + // メンションされたとき + mainStream.on('mention', data => { + if (data.userId == this.account.id) return; // 自分は弾く + if (data.text.startsWith('@' + this.account.username)) { + this.onMention(new MessageLike(this, data, false)); + } + }); + + // 返信されたとき + mainStream.on('reply', data => { + if (data.userId == this.account.id) return; // 自分は弾く + this.onMention(new MessageLike(this, data, false)); + }); + + // メッセージ + mainStream.on('messagingMessage', data => { + if (data.userId == this.account.id) return; // 自分は弾く + this.onMention(new MessageLike(this, data, true)); + }); + //#endregion + // Install modules this.modules.forEach(m => m.install(this)); - - //#region Home stream - this.connection = new ReconnectingWebSocket(`${config.wsUrl}/?i=${config.i}`, [], { - WebSocket: WebSocket - }); - - this.connection.addEventListener('open', () => { - console.log('home stream opened'); - }); - - this.connection.addEventListener('close', () => { - console.log('home stream closed'); - this.connection._shouldReconnect && this.connection._connect() - }); - - this.connection.addEventListener('message', message => { - const msg = JSON.parse(message.data); - - this.onMessage(msg); - }); - //#endregion - - //#region Local timeline stream - this.localTimelineConnection = new ReconnectingWebSocket(`${config.wsUrl}/local-timeline?i=${config.i}`, [], { - WebSocket: WebSocket - }); - - this.localTimelineConnection.addEventListener('open', () => { - console.log('local-timeline stream opened'); - }); - - this.localTimelineConnection.addEventListener('close', () => { - console.log('local-timeline stream closed'); - this.localTimelineConnection._shouldReconnect && this.localTimelineConnection._connect() - }); - - this.localTimelineConnection.addEventListener('message', message => { - const msg = JSON.parse(message.data); - - this.onLocalNote(msg.body); - }); - //#endregion - } - - private onMessage = (msg: any) => { - switch (msg.type) { - // メンションされたとき - case 'mention': { - if (msg.body.userId == this.account.id) return; // 自分は弾く - if (msg.body.text.startsWith('@' + this.account.username)) { - this.onMention(new MessageLike(this, msg.body, false)); - } - break; - } - - // 返信されたとき - case 'reply': { - if (msg.body.userId == this.account.id) return; // 自分は弾く - this.onMention(new MessageLike(this, msg.body, false)); - break; - } - - // メッセージ - case 'messaging_message': { - if (msg.body.userId == this.account.id) return; // 自分は弾く - this.onMention(new MessageLike(this, msg.body, true)); - break; - } - - default: - break; - } - } - - private onLocalNote = (note: any) => { - this.modules.filter(m => m.hasOwnProperty('onLocalNote')).forEach(m => { - return m.onLocalNote(note); - }); } private onMention = (msg: MessageLike) => { diff --git a/src/module.ts b/src/module.ts index 0f1ac16..0fdb025 100644 --- a/src/module.ts +++ b/src/module.ts @@ -5,7 +5,6 @@ export default interface IModule { name: string; install?: (ai: 藍) => void; onMention?: (msg: MessageLike) => boolean | Result; - onLocalNote?: (note: any) => void; onReplyThisModule?: (msg: MessageLike, data?: any) => void | Result; } diff --git a/src/modules/reversi/back.ts b/src/modules/reversi/back.ts index db87908..18468a3 100644 --- a/src/modules/reversi/back.ts +++ b/src/modules/reversi/back.ts @@ -74,7 +74,7 @@ class Session { private onMessage = async (msg: any) => { switch (msg.type) { case '_init_': this.onInit(msg); break; - case 'update-form': this.onUpdateForn(msg); break; + case 'updateForm': this.onUpdateForn(msg); break; case 'started': this.onStarted(msg); break; case 'ended': this.onEnded(msg); break; case 'set': this.onSet(msg); break; diff --git a/src/modules/reversi/index.ts b/src/modules/reversi/index.ts index 6f6423a..baa0cd8 100644 --- a/src/modules/reversi/index.ts +++ b/src/modules/reversi/index.ts @@ -1,6 +1,5 @@ import * as childProcess from 'child_process'; -const ReconnectingWebSocket = require('reconnecting-websocket'); -import 藍 from '../../ai'; +\import 藍 from '../../ai'; import IModule from '../../module'; import serifs from '../../serifs'; import config from '../../config'; @@ -24,24 +23,13 @@ export default class ReversiModule implements IModule { this.ai = ai; - this.reversiConnection = new ReconnectingWebSocket(`${config.wsUrl}/games/reversi?i=${config.i}`, [], { - WebSocket: WebSocket - }); + this.reversiConnection = this.ai.connection.useSharedConnection('gamesReversi'); - this.reversiConnection.addEventListener('open', () => { - console.log('reversi stream opened'); - }); + // 招待されたとき + this.reversiConnection.on('invited', msg => this.onReversiInviteMe(msg.parent)); - this.reversiConnection.addEventListener('close', () => { - console.log('reversi stream closed'); - this.reversiConnection._shouldReconnect && this.reversiConnection._connect() - }); - - this.reversiConnection.addEventListener('message', message => { - const msg = JSON.parse(message.data); - - this.onReversiConnectionMessage(msg); - }); + // マッチしたとき + this.reversiConnection.on('matched', msg => this.onReversiGameStart(msg)); } public onMention = (msg: MessageLike) => { @@ -62,26 +50,6 @@ export default class ReversiModule implements IModule { } } - private onReversiConnectionMessage = (msg: any) => { - switch (msg.type) { - - // 招待されたとき - case 'invited': { - this.onReversiInviteMe(msg.body.parent); - break; - } - - // マッチしたとき - case 'matched': { - this.onReversiGameStart(msg.body); - break; - } - - default: - break; - } - } - private onReversiInviteMe = async (inviter: any) => { console.log(`Someone invited me: @${inviter.username}`); @@ -99,8 +67,8 @@ export default class ReversiModule implements IModule { private onReversiGameStart = (game: any) => { // ゲームストリームに接続 - const gw = new ReconnectingWebSocket(`${config.wsUrl}/games/reversi-game?i=${config.i}&game=${game.id}`, [], { - WebSocket: WebSocket + const gw = this.ai.connection.connectToChannel('gamesReversiGame', { + game: game.id }); function send(msg) { @@ -111,89 +79,79 @@ export default class ReversiModule implements IModule { } } - gw.addEventListener('open', () => { - console.log('reversi game stream opened'); - - // フォーム - const form = [{ - id: 'publish', - type: 'switch', - label: '藍が対局情報を投稿するのを許可', - value: true + // フォーム + const form = [{ + id: 'publish', + type: 'switch', + label: '藍が対局情報を投稿するのを許可', + value: true + }, { + id: 'strength', + type: 'radio', + label: '強さ', + value: 3, + items: [{ + label: '接待', + value: 0 }, { - id: 'strength', - type: 'radio', - label: '強さ', - value: 3, - items: [{ - label: '接待', - value: 0 - }, { - label: '弱', - value: 2 - }, { - label: '中', - value: 3 - }, { - label: '強', - value: 4 - }, { - label: '最強', - value: 5 - }] - }]; + label: '弱', + value: 2 + }, { + label: '中', + value: 3 + }, { + label: '強', + value: 4 + }, { + label: '最強', + value: 5 + }] + }]; - //#region バックエンドプロセス開始 - const ai = childProcess.fork(__dirname + '/back.js'); + //#region バックエンドプロセス開始 + const ai = childProcess.fork(__dirname + '/back.js'); - // バックエンドプロセスに情報を渡す - ai.send({ - type: '_init_', - game, - form, - account: this.ai.account - }); - - ai.on('message', msg => { - if (msg.type == 'put') { - send({ - type: 'set', - pos: msg.pos - }); - } else if (msg.type == 'ended') { - gw.close(); - - this.onGameEnded(game); - } - }); - - // ゲームストリームから情報が流れてきたらそのままバックエンドプロセスに伝える - gw.addEventListener('message', message => { - const msg = JSON.parse(message.data); - ai.send(msg); - }); - //#endregion - - // フォーム初期化 - setTimeout(() => { - send({ - type: 'init-form', - body: form - }); - }, 1000); - - // どんな設定内容の対局でも受け入れる - setTimeout(() => { - send({ - type: 'accept' - }); - }, 2000); + // バックエンドプロセスに情報を渡す + ai.send({ + type: '_init_', + game, + form, + account: this.ai.account }); - gw.addEventListener('close', () => { - console.log('reversi game stream closed'); - gw._shouldReconnect && gw._connect() + ai.on('message', msg => { + if (msg.type == 'put') { + send({ + type: 'set', + pos: msg.pos + }); + } else if (msg.type == 'ended') { + gw.dispose(); + + this.onGameEnded(game); + } }); + + // ゲームストリームから情報が流れてきたらそのままバックエンドプロセスに伝える + gw.addEventListener('*', message => { + ai.send(message); + }); + //#endregion + + // フォーム初期化 + setTimeout(() => { + send({ + type: 'initForm', + body: form + }); + }, 1000); + + // どんな設定内容の対局でも受け入れる + setTimeout(() => { + send({ + type: 'accept' + }); + }, 2000); } private onGameEnded(game: any) { diff --git a/src/modules/welcome/index.ts b/src/modules/welcome/index.ts index 17af303..5953071 100644 --- a/src/modules/welcome/index.ts +++ b/src/modules/welcome/index.ts @@ -8,6 +8,10 @@ export default class WelcomeModule implements IModule { public install = (ai: 藍) => { this.ai = ai; + + const tl = this.ai.connection.useSharedConnection('localTimeline'); + + tl.on('note', this.onLocalNote); } public onLocalNote = (note: any) => { diff --git a/src/serifs.ts b/src/serifs.ts index 3c0880e..b645a6f 100644 --- a/src/serifs.ts +++ b/src/serifs.ts @@ -14,6 +14,14 @@ export default { goodMorning: (tension, name) => name ? `おはようございます、${name}!${tension}` : `おはようございます!${tension}`, + /* + goodMorning: { + normal: (tension, name) => name ? `おはようございます、${name}!${tension}` : `おはようございます!${tension}`, + + hiru: (tension, name) => name ? `おはようございます、${name}!${tension}もうお昼ですよ?${tension}` : `おはようございます!${tension}もうお昼ですよ?${tension}`, + }, +*/ + goodNight: name => name ? `おやすみなさい、${name}!` : 'おやすみなさい!', okaeri: { diff --git a/src/stream.ts b/src/stream.ts new file mode 100644 index 0000000..3d184d2 --- /dev/null +++ b/src/stream.ts @@ -0,0 +1,221 @@ +import autobind from 'autobind-decorator'; +import { EventEmitter } from 'events'; +import * as WebSocket from 'ws'; +const ReconnectingWebsocket = require('reconnecting-websocket'); +import config from './config'; + +/** + * Misskey stream connection + */ +export default class Stream extends EventEmitter { + private stream: any; + private state: string; + private sharedConnections: SharedConnection[] = []; + private nonSharedConnections: NonSharedConnection[] = []; + + constructor() { + super(); + + this.state = 'initializing'; + console.log('initializing stream'); + + this.stream = new ReconnectingWebsocket(`${config.wsUrl}/streaming?i=${config.i}`, [], { + WebSocket: WebSocket + }); + this.stream.addEventListener('open', this.onOpen); + this.stream.addEventListener('close', this.onClose); + this.stream.addEventListener('message', this.onMessage); + } + + public useSharedConnection = (channel: string): SharedConnection => { + const existConnection = this.sharedConnections.find(c => c.channel === channel); + + if (existConnection) { + existConnection.use(); + return existConnection; + } else { + const connection = new SharedConnection(this, channel); + connection.use(); + this.sharedConnections.push(connection); + return connection; + } + } + + @autobind + public removeSharedConnection(connection: SharedConnection) { + this.sharedConnections = this.sharedConnections.filter(c => c.id !== connection.id); + } + + public connectToChannel = (channel: string, params?: any): NonSharedConnection => { + const connection = new NonSharedConnection(this, channel, params); + this.nonSharedConnections.push(connection); + return connection; + } + + @autobind + public disconnectToChannel(connection: NonSharedConnection) { + this.nonSharedConnections = this.nonSharedConnections.filter(c => c.id !== connection.id); + } + + /** + * Callback of when open connection + */ + @autobind + private onOpen() { + const isReconnect = this.state == 'reconnecting'; + + this.state = 'connected'; + this.emit('_connected_'); + console.log('stream connected'); + + // チャンネル再接続 + if (isReconnect) { + this.sharedConnections.forEach(c => { + c.connect(); + }); + this.nonSharedConnections.forEach(c => { + c.connect(); + }); + } + } + + /** + * Callback of when close connection + */ + @autobind + private onClose() { + this.state = 'reconnecting'; + this.emit('_disconnected_'); + console.log('stream disconnected'); + } + + /** + * Callback of when received a message from connection + */ + @autobind + private onMessage(message) { + const { type, body } = JSON.parse(message.data); + + if (type == 'channel') { + const id = body.id; + const connection = this.sharedConnections.find(c => c.id === id) || this.nonSharedConnections.find(c => c.id === id); + connection.emit(body.type, body.body); + connection.emit('*', { type, body }); + } else { + this.emit(type, body); + this.emit('*', { type, body }); + } + } + + /** + * Send a message to connection + */ + @autobind + public send(typeOrPayload, payload?) { + const data = payload === undefined ? typeOrPayload : { + type: typeOrPayload, + body: payload + }; + + this.stream.send(JSON.stringify(data)); + } + + /** + * Close this connection + */ + @autobind + public close() { + this.stream.removeEventListener('open', this.onOpen); + this.stream.removeEventListener('message', this.onMessage); + } +} + +abstract class Connection extends EventEmitter { + public channel: string; + public id: string; + protected params: any; + protected stream: Stream; + + constructor(stream: Stream, channel: string, params?: any) { + super(); + + this.stream = stream; + this.channel = channel; + this.params = params; + this.id = Math.random().toString(); + this.connect(); + } + + @autobind + public connect() { + this.stream.send('connect', { + channel: this.channel, + id: this.id, + params: this.params + }); + } + + @autobind + public send(typeOrPayload, payload?) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.stream.send('channel', { + id: this.id, + type: type, + body: body + }); + } + + public abstract dispose(): void; +} + +export class SharedConnection extends Connection { + private users = 0; + private disposeTimerId: any; + + constructor(stream: Stream, channel: string) { + super(stream, channel); + } + + @autobind + public use() { + this.users++; + + // タイマー解除 + if (this.disposeTimerId) { + clearTimeout(this.disposeTimerId); + this.disposeTimerId = null; + } + } + + @autobind + public dispose() { + this.users--; + + // そのコネクションの利用者が誰もいなくなったら + if (this.users === 0) { + // また直ぐに再利用される可能性があるので、一定時間待ち、 + // 新たな利用者が現れなければコネクションを切断する + this.disposeTimerId = setTimeout(() => { + this.disposeTimerId = null; + this.removeAllListeners(); + this.stream.send('disconnect', { id: this.id }); + this.stream.removeSharedConnection(this); + }, 3000); + } + } +} + +export class NonSharedConnection extends Connection { + constructor(stream: Stream, channel: string, params?: any) { + super(stream, channel, params); + } + + @autobind + public dispose() { + this.removeAllListeners(); + this.stream.send('disconnect', { id: this.id }); + this.stream.disconnectToChannel(this); + } +}