package connection import "sync" type MessageBuffer struct { messages []message getIndex int insertIndex int size int newDataInserted chan bool firstWriteHappened bool cond *sync.Cond } type message struct { content string new bool } func newMessageBuffer(size int) *MessageBuffer { cond := sync.NewCond(&sync.Mutex{}) return &MessageBuffer{ messages: make([]message, size), size: size, getIndex: 0, insertIndex: 0, newDataInserted: make(chan bool), firstWriteHappened: false, cond: cond, } } func (b *MessageBuffer) Insert(msg string) { b.cond.L.Lock() defer b.cond.L.Unlock() oldMessage := b.messages[b.insertIndex] b.messages[b.insertIndex] = message{content: msg, new: true} if b.firstWriteHappened && b.insertIndex == b.getIndex && oldMessage.new { // insertIndex caught up with getIndex b.getIndex = b.incrementAndWrapIndex(b.getIndex) } b.insertIndex = b.incrementAndWrapIndex(b.insertIndex) b.firstWriteHappened = true b.cond.Broadcast() } func (b *MessageBuffer) Get() (string, error) { b.cond.L.Lock() defer b.cond.L.Unlock() if !b.firstWriteHappened { b.cond.Wait() } var msg *message for { msg = &b.messages[b.getIndex] if msg.new { msg.new = false break } b.cond.Wait() } b.getIndex = b.incrementAndWrapIndex(b.getIndex) return msg.content, nil } func (b MessageBuffer) incrementAndWrapIndex(index int) int { newIndex := index + 1 if newIndex == b.size { newIndex = 0 } return newIndex }