From e81f9a75dc7271f0a83cc69eeadedd22d16e4b9d Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 11 Oct 2018 03:00:54 +0900 Subject: [PATCH] =?UTF-8?q?v10=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/reversi/back.ts | 24 ++-- src/modules/reversi/index.ts | 34 ++--- src/stream.ts | 238 +++++++++++++++++++++++------------ 3 files changed, 185 insertions(+), 111 deletions(-) diff --git a/src/modules/reversi/back.ts b/src/modules/reversi/back.ts index 18468a3..4b828f1 100644 --- a/src/modules/reversi/back.ts +++ b/src/modules/reversi/back.ts @@ -73,11 +73,11 @@ class Session { private onMessage = async (msg: any) => { switch (msg.type) { - case '_init_': this.onInit(msg); break; - case 'updateForm': this.onUpdateForn(msg); break; - case 'started': this.onStarted(msg); break; - case 'ended': this.onEnded(msg); break; - case 'set': this.onSet(msg); break; + case '_init_': this.onInit(msg.body); break; + case 'updateForm': this.onUpdateForn(msg.body); break; + case 'started': this.onStarted(msg.body); break; + case 'ended': this.onEnded(msg.body); break; + case 'set': this.onSet(msg.body); break; } } @@ -92,14 +92,14 @@ class Session { * フォームが更新されたとき */ private onUpdateForn = (msg: any) => { - this.form.find(i => i.id == msg.body.id).value = msg.body.value; + this.form.find(i => i.id == msg.id).value = msg.value; } /** * 対局が始まったとき */ private onStarted = (msg: any) => { - this.game = msg.body; + this.game = msg; // TLに投稿する this.postGameStarted().then(note => { @@ -215,14 +215,14 @@ class Session { let text: string; - if (msg.body.game.surrendered) { + if (msg.game.surrendered) { if (this.isSettai) { text = serifs.reversi.settaiButYouSurrendered(this.userName); } else { text = serifs.reversi.youSurrendered(this.userName); } - } else if (msg.body.winnerId) { - if (msg.body.winnerId == this.account.id) { + } else if (msg.winnerId) { + if (msg.winnerId == this.account.id) { if (this.isSettai) { text = serifs.reversi.iWonButSettai(this.userName); } else { @@ -252,9 +252,9 @@ class Session { * 打たれたとき */ private onSet = (msg: any) => { - this.o.put(msg.body.color, msg.body.pos); + this.o.put(msg.color, msg.pos); - if (msg.body.next === this.botColor) { + if (msg.next === this.botColor) { this.think(); } } diff --git a/src/modules/reversi/index.ts b/src/modules/reversi/index.ts index 9c0311e..dd7f42c 100644 --- a/src/modules/reversi/index.ts +++ b/src/modules/reversi/index.ts @@ -66,19 +66,13 @@ export default class ReversiModule implements IModule { } private onReversiGameStart = (game: any) => { + console.log('enter reversi game room'); + // ゲームストリームに接続 const gw = this.ai.connection.connectToChannel('gamesReversiGame', { - game: game.id + gameId: game.id }); - function send(msg) { - try { - gw.send(JSON.stringify(msg)); - } catch (e) { - console.error(e); - } - } - // フォーム const form = [{ id: 'publish', @@ -114,15 +108,16 @@ export default class ReversiModule implements IModule { // バックエンドプロセスに情報を渡す ai.send({ type: '_init_', - game, - form, - account: this.ai.account + body: { + game: game, + form: form, + account: this.ai.account + } }); ai.on('message', msg => { if (msg.type == 'put') { - send({ - type: 'set', + gw.send('set', { pos: msg.pos }); } else if (msg.type == 'ended') { @@ -133,24 +128,19 @@ export default class ReversiModule implements IModule { }); // ゲームストリームから情報が流れてきたらそのままバックエンドプロセスに伝える - gw.addEventListener('*', message => { + gw.addListener('*', message => { ai.send(message); }); //#endregion // フォーム初期化 setTimeout(() => { - send({ - type: 'initForm', - body: form - }); + gw.send('initForm', form); }, 1000); // どんな設定内容の対局でも受け入れる setTimeout(() => { - send({ - type: 'accept' - }); + gw.send('accept', {}); }, 2000); } diff --git a/src/stream.ts b/src/stream.ts index 3d184d2..490d769 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -10,6 +10,8 @@ import config from './config'; export default class Stream extends EventEmitter { private stream: any; private state: string; + private buffer: any[]; + private sharedConnectionPools: Pool[] = []; private sharedConnections: SharedConnection[] = []; private nonSharedConnections: NonSharedConnection[] = []; @@ -17,7 +19,7 @@ export default class Stream extends EventEmitter { super(); this.state = 'initializing'; - console.log('initializing stream'); + this.buffer = []; this.stream = new ReconnectingWebsocket(`${config.wsUrl}/streaming?i=${config.i}`, [], { WebSocket: WebSocket @@ -27,26 +29,27 @@ export default class Stream extends EventEmitter { this.stream.addEventListener('message', this.onMessage); } - public useSharedConnection = (channel: string): SharedConnection => { - const existConnection = this.sharedConnections.find(c => c.channel === channel); + @autobind + public useSharedConnection(channel: string): SharedConnection { + let pool = this.sharedConnectionPools.find(p => p.channel === channel); - if (existConnection) { - existConnection.use(); - return existConnection; - } else { - const connection = new SharedConnection(this, channel); - connection.use(); - this.sharedConnections.push(connection); - return connection; + if (pool == null) { + pool = new Pool(this, channel); + this.sharedConnectionPools.push(pool); } + + const connection = new SharedConnection(this, channel, pool); + this.sharedConnections.push(connection); + return connection; } @autobind public removeSharedConnection(connection: SharedConnection) { - this.sharedConnections = this.sharedConnections.filter(c => c.id !== connection.id); + this.sharedConnections = this.sharedConnections.filter(c => c !== connection); } - public connectToChannel = (channel: string, params?: any): NonSharedConnection => { + @autobind + public connectToChannel(channel: string, params?: any): NonSharedConnection { const connection = new NonSharedConnection(this, channel, params); this.nonSharedConnections.push(connection); return connection; @@ -54,7 +57,7 @@ export default class Stream extends EventEmitter { @autobind public disconnectToChannel(connection: NonSharedConnection) { - this.nonSharedConnections = this.nonSharedConnections.filter(c => c.id !== connection.id); + this.nonSharedConnections = this.nonSharedConnections.filter(c => c !== connection); } /** @@ -66,12 +69,18 @@ export default class Stream extends EventEmitter { this.state = 'connected'; this.emit('_connected_'); - console.log('stream connected'); + + // バッファーを処理 + const _buffer = [].concat(this.buffer); // Shallow copy + this.buffer = []; // Clear buffer + _buffer.forEach(data => { + this.send(data); // Resend each buffered messages + }); // チャンネル再接続 if (isReconnect) { - this.sharedConnections.forEach(c => { - c.connect(); + this.sharedConnectionPools.forEach(p => { + p.connect(); }); this.nonSharedConnections.forEach(c => { c.connect(); @@ -86,7 +95,6 @@ export default class Stream extends EventEmitter { private onClose() { this.state = 'reconnecting'; this.emit('_disconnected_'); - console.log('stream disconnected'); } /** @@ -98,9 +106,19 @@ export default class Stream extends EventEmitter { if (type == 'channel') { const id = body.id; - const connection = this.sharedConnections.find(c => c.id === id) || this.nonSharedConnections.find(c => c.id === id); - connection.emit(body.type, body.body); - connection.emit('*', { type, body }); + + let connections: Connection[]; + + connections = this.sharedConnections.filter(c => c.id === id); + + if (connections.length === 0) { + connections = [this.nonSharedConnections.find(c => c.id === id)]; + } + + connections.filter(c => c != null).forEach(c => { + c.emit(body.type, body.body); + c.emit('*', { type: body.type, body: body.body }); + }); } else { this.emit(type, body); this.emit('*', { type, body }); @@ -117,6 +135,12 @@ export default class Stream extends EventEmitter { body: payload }; + // まだ接続が確立されていなかったらバッファリングして次に接続した時に送信する + if (this.state != 'connected') { + this.buffer.push(data); + return; + } + this.stream.send(JSON.stringify(data)); } @@ -130,19 +154,131 @@ export default class Stream extends EventEmitter { } } -abstract class Connection extends EventEmitter { +class Pool { public channel: string; public id: string; - protected params: any; protected stream: Stream; + private users = 0; + private disposeTimerId: any; + private isConnected = false; - constructor(stream: Stream, channel: string, params?: any) { + constructor(stream: Stream, channel: string) { + this.channel = channel; + this.stream = stream; + + this.id = Math.random().toString(); + } + + @autobind + public inc() { + if (this.users === 0 && !this.isConnected) { + this.connect(); + } + + this.users++; + + // タイマー解除 + if (this.disposeTimerId) { + clearTimeout(this.disposeTimerId); + this.disposeTimerId = null; + } + } + + @autobind + public dec() { + this.users--; + + // そのコネクションの利用者が誰もいなくなったら + if (this.users === 0) { + // また直ぐに再利用される可能性があるので、一定時間待ち、 + // 新たな利用者が現れなければコネクションを切断する + this.disposeTimerId = setTimeout(() => { + this.disconnect(); + }, 3000); + } + } + + @autobind + public connect() { + this.isConnected = true; + this.stream.send('connect', { + channel: this.channel, + id: this.id + }); + } + + @autobind + private disconnect() { + this.isConnected = false; + this.disposeTimerId = null; + this.stream.send('disconnect', { id: this.id }); + } +} + +abstract class Connection extends EventEmitter { + public channel: string; + protected stream: Stream; + public abstract id: string; + + constructor(stream: Stream, channel: string) { super(); this.stream = stream; this.channel = channel; + } + + @autobind + public send(id: string, typeOrPayload, payload?) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.stream.send('ch', { + id: id, + type: type, + body: body + }); + } + + public abstract dispose(): void; +} + +class SharedConnection extends Connection { + private pool: Pool; + + public get id(): string { + return this.pool.id; + } + + constructor(stream: Stream, channel: string, pool: Pool) { + super(stream, channel); + + this.pool = pool; + this.pool.inc(); + } + + @autobind + public send(typeOrPayload, payload?) { + super.send(this.pool.id, typeOrPayload, payload); + } + + @autobind + public dispose() { + this.pool.dec(); + this.removeAllListeners(); + this.stream.removeSharedConnection(this); + } +} + +class NonSharedConnection extends Connection { + public id: string; + protected params: any; + + constructor(stream: Stream, channel: string, params?: any) { + super(stream, channel); + this.params = params; this.id = Math.random().toString(); + this.connect(); } @@ -157,59 +293,7 @@ abstract class Connection extends EventEmitter { @autobind public send(typeOrPayload, payload?) { - const type = payload === undefined ? typeOrPayload.type : typeOrPayload; - const body = payload === undefined ? typeOrPayload.body : payload; - - this.stream.send('channel', { - id: this.id, - type: type, - body: body - }); - } - - public abstract dispose(): void; -} - -export class SharedConnection extends Connection { - private users = 0; - private disposeTimerId: any; - - constructor(stream: Stream, channel: string) { - super(stream, channel); - } - - @autobind - public use() { - this.users++; - - // タイマー解除 - if (this.disposeTimerId) { - clearTimeout(this.disposeTimerId); - this.disposeTimerId = null; - } - } - - @autobind - public dispose() { - this.users--; - - // そのコネクションの利用者が誰もいなくなったら - if (this.users === 0) { - // また直ぐに再利用される可能性があるので、一定時間待ち、 - // 新たな利用者が現れなければコネクションを切断する - this.disposeTimerId = setTimeout(() => { - this.disposeTimerId = null; - this.removeAllListeners(); - this.stream.send('disconnect', { id: this.id }); - this.stream.removeSharedConnection(this); - }, 3000); - } - } -} - -export class NonSharedConnection extends Connection { - constructor(stream: Stream, channel: string, params?: any) { - super(stream, channel, params); + super.send(this.id, typeOrPayload, payload); } @autobind