
Roman
23.03.2018
18:33:43
?
а я уже запарился конкурентный код писать..

Daniel
23.03.2018
18:34:10
боюсь, будут рейсы

m
23.03.2018
18:35:19
рейсы по wg?

Google

Daniel
23.03.2018
18:41:22
не
нам нужен двухпозиционный триггер
у нас горутина должна встать на задержку после того, как отработает
и сняться с нее, когда скажут
это, собственно, означает обратную связь
мы из горутины должны проинформировать управляющую процедуру, что вся обработка закончена, и мы готовы к новой итерации
так?

m
23.03.2018
18:56:06
переписал свой пример
хотя, фигня всёравно выходит.
но я тоже поужинал и теперь уже не помошник.


Roman
23.03.2018
19:02:03
короче..
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
"time"
)
type Dam struct {
lock sync.Mutex
queue []chan struct{}
}
func NewDam() *Dam {
return &Dam{
lock: sync.Mutex{},
queue: make([]chan struct{}, 0),
}
}
func (dam *Dam) Await(timeout time.Duration) error {
newChan := make(chan struct{})
dam.lock.Lock()
dam.queue = append(dam.queue, newChan)
dam.lock.Unlock()
select {
case <-newChan:
return nil
case <-time.After(timeout):
return fmt.Errorf("timeout!")
}
}
func (dam *Dam) Flush() int {
dam.lock.Lock()
defer dam.lock.Unlock()
totalCount := len(dam.queue)
for _, waitChan := range dam.queue {
close(waitChan)
}
// Reset queue
dam.queue = make([]chan struct{}, 0)
return totalCount
}
func main() {
dam := NewDam()
go func() {
// Flush the dam every 3 seconds
for {
time.Sleep(3 * time.Second)
totalFlushed := dam.Flush()
log.Println("Dam flushed! accumulated:", totalFlushed)
}
}()
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
go func() {
creationTime := time.Now()
fmt.Println("Added", creationTime)
if err := dam.Await(time.Second * 2); err != nil {
fmt.Println("timed out", creationTime)
return
}
fmt.Println("Finished!", creationTime)
}()
}
if err := scanner.Err(); err != nil {
panic(err)
}
}
реализация дамбы

Google

Roman
23.03.2018
19:03:05
думаю теперь как убрать слайс

m
23.03.2018
19:10:25
слайс с локом - это очень похоже на канал. ?

Roman
23.03.2018
19:16:02
Await блокирует главный поток, не даёт flush вызвать пока не затаймаутит
из-за совместного лока поверх канала
без лока здесь тоже нельзя поскольку происходит мутация канала которая должна быть синхронизирована, нельзя ресетить канал, пока кто-то его прослушивает

Daniel
23.03.2018
19:20:01
я вот забыл спросить - это все зачем?

Roman
23.03.2018
19:22:44

Daniel
23.03.2018
19:23:53
эмм
а подробнее?

Roman
23.03.2018
19:25:32
там довольно сложный механизм автоматического восстановления соединения, который значительно упрощает работу с сокетами.
как только клиент понимает что сервер ушёл - он запускает горутину по восстановлению соединения, которая при завершении "флашит дамбу"
а дамбда это по сути queue запросов от клиента. Т.е. user code пишет в очередь запросов с помощью client.Request(timeout) которые либо таймаутят выходя с очереди дамбы, либо пробиваются до сервера когда дамба флашится (когда горутине по восстановлению связи удаётся подсоединиться)
приложение например выполнило 4 запроса на сервер, но вдруг, прям до этого, сервер пошёл на перезагрузку.. timeout у запросов скажем 60 секунд. Если в течении 60 секунд сервер перезапускается - то все запросы встреливают, если нет то возвращают timeout ошибку

Daniel
23.03.2018
19:27:56
я не понимаю, зачем это. зачем дамба и горутина, когда можно синхронно запустить тот самый механизм восстановления

Roman
23.03.2018
19:28:30

Daniel
23.03.2018
19:28:33
ага

Roman
23.03.2018
19:28:57
тогда придётся самому ручками писать перезапуска... при каждой ошибке прописывать reconnect логику

Daniel
23.03.2018
19:29:44
чего-то я не понимаю

Roman
23.03.2018
19:29:48
чего я хочу предотвратить.. webwire client даёт гарантию что соединение будет восстановлено автоматически, тебе как пользователю библиотеки об этом даже думать не надо

Daniel
23.03.2018
19:29:58
зачем вообще такой странный механизм, с дамбой?
но

Google

Daniel
23.03.2018
19:30:35
у нас есть буферизованный канал - это очередь запросов

Roman
23.03.2018
19:30:41

Daniel
23.03.2018
19:31:12
у нас есть соединение, которое, если порвалось, занимается восстановлением, а не чтением из канала
а как восстановилось - читает из канала
почему нельзя сделать так?

Roman
23.03.2018
19:31:36
у нас есть буферизованный канал - это очередь запросов
это не совсем то, буферизированный канал выстреливает тогда, когда набирается определённое колво элементов, а тут дамба пробивается когда соединение восстанавливается и колво конкурентных запросов изначально неизвестно

Daniel
23.03.2018
19:31:48
а?!
что значит "канал выстреливает"?

Roman
23.03.2018
19:32:14
пишет
в receiver
как нейрон

Daniel
23.03.2018
19:32:28
ох

Daniel
23.03.2018
19:32:36
есть соединение
так?

Roman
23.03.2018
19:32:42
так

Daniel
23.03.2018
19:32:50
есть горутина читатель и горутина писатель
так?

Roman
23.03.2018
19:32:55
нет

Daniel
23.03.2018
19:32:58
а как?

Roman
23.03.2018
19:34:13
множество горутин могут одновременно делать запросы на сервер: https://github.com/qbeon/webwire-go/blob/master/examples/echo/client/client.go#L45
webwire больше похож на дуплексный HTTP, нежели на традиционные сокеты, сокет он абстрагирует

Google

Daniel
23.03.2018
19:35:25
а это надо зачем-то?
почему не сделать одну горутину, которая делает запросы, и не засылать ей запросы каналом?

Roman
23.03.2018
19:37:28
т.е. следственно мы можем параллельно замультиплексить несаколько запросов:
for reqData := range requestsToMake {
go func() {
reply, err := client.Request("givemesomething", reqData)
switch err := err.(type) {
case nil:
// got a reply!
case webwire.ReqErr:
// got an error reply
case webwire.ReqTimeoutErr:
// request timed out
}
}()
}

Daniel
23.03.2018
19:37:51
никакой параллельности на одном соединение не будет палюбэ

Roman
23.03.2018
19:37:54
а если в это время сервер например недоступен, то эти запросы аккумулируются и будут ожидать соединения
дождутся? пошлют и запрос и получат reply
нет дождутся? timeout

Daniel
23.03.2018
19:38:09
еще раз

Roman
23.03.2018
19:38:39
как HTTP2

Admin
ERROR: S client not available

Daniel
23.03.2018
19:39:16
это означает, что запускать более одной горутины-писателя не имеет смысла
правильно?

Greg
23.03.2018
19:40:25
Там zmq рекламировали, как консервативную штуку
Тогда уж наследника юзать
http://nanomsg.org/
И на Go полная pure имплементация
https://github.com/go-mangos/mangos

Roman
23.03.2018
19:41:48
а это надо зачем-то?
по сути это замена HTTP2 которая умеет в обе стороны...
с HTTP2 нам пришлось бы посылать запросы по HTTP, а сигналы от сервера получать по websocket, потому-что ни у HTTP1.1 ни HTTP2 нет обратной связи, собственно почему сокеты и появились на свет...
но это два отдельных канала, их нужно отдельно аутентифицировать и много ещё нюансов, которые мне все дико надоели и я решил свершить мини-революцию, реализовать абстракцию над вебсокетами, которая даст аутентификацию, сессии, автовосстановление как соединения так и сессии, request-reply, graceful shutdown и прочее другое..
собственно получилось https://github.com/qbeon/webwire-go
а... ну и конечно-же оффициальная поддержка JavaScript чтоб всё как из одной литой формы
короче чуя я надо FAQ писать

Google

Daniel
23.03.2018
19:43:34
я не понимаю, зачем там асинхронность
все остальное я понимаю хорошо
а вот асинхронность - нет

Roman
23.03.2018
19:44:07

Greg
23.03.2018
19:44:24
Нет, они об одном и том же
Просто различается громодзкость решения

Roman
23.03.2018
19:45:26
в webwire другой набор фич реализован, в README я надеюсь всё описал)

Daniel
23.03.2018
19:45:59
еще раз - зачем асинхронность?

Roman
23.03.2018
19:46:45
самый плизкий конкурент это socket.io
но сокет.ио не обладает нужным набором фич и оффициальной поддержкой Go ну и многое другое..

Daniel
23.03.2018
19:47:29
в чем угодно

Kirill
23.03.2018
19:47:42
еще центрифуга похожа

Daniel
23.03.2018
19:48:21
"asynchronous duplex messaging library" - вот почему asynchronous?

Greg
23.03.2018
19:48:27
Там нет стримов
Всегда request-reply

Roman
23.03.2018
19:49:22

Kirill
23.03.2018
19:49:22

Roman
23.03.2018
19:49:55