mchess-server/connection/message_buffer.go

83 lines
1.6 KiB
Go

package connection
import "sync"
type MessageBuffer struct {
messages []message
getIndex int
insertIndex int
size int
newDataInserted chan bool
firstWriteHappened bool
cond *sync.Cond
}
type message struct {
content string
new bool
}
func newMessageBuffer(size int) *MessageBuffer {
cond := sync.NewCond(&sync.Mutex{})
return &MessageBuffer{
messages: make([]message, size),
size: size,
getIndex: 0,
insertIndex: 0,
newDataInserted: make(chan bool),
firstWriteHappened: false,
cond: cond,
}
}
func (b *MessageBuffer) Insert(msg string) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
oldMessage := b.messages[b.insertIndex]
b.messages[b.insertIndex] = message{content: msg, new: true}
if b.firstWriteHappened &&
b.insertIndex == b.getIndex &&
oldMessage.new { // insertIndex caught up with getIndex
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
}
b.insertIndex = b.incrementAndWrapIndex(b.insertIndex)
b.firstWriteHappened = true
b.cond.Broadcast()
}
func (b *MessageBuffer) Get() (string, error) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if !b.firstWriteHappened {
b.cond.Wait()
}
var msg *message
for {
msg = &b.messages[b.getIndex]
if msg.new {
msg.new = false
break
}
b.cond.Wait()
}
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
return msg.content, nil
}
func (b MessageBuffer) incrementAndWrapIndex(index int) int {
newIndex := index + 1
if newIndex == b.size {
newIndex = 0
}
return newIndex
}