Yuriy
Всем доброго времени суток. Есть тут те кто пользуется openresty?
Надеюсь да. возник следующий вопрос
Есть websocket
когда клиент к нму подключается в loop передается дескриптор на redis pubsub
(Сначала создаю redis клиента, подписваюсь, затем запускаю loop и мониторю
redisClient:read_reply())
если Reply я его обрабатываю.
то есть
while true do
local data,err = redisClent:read_reply()
if not err then
local handledReply = myHander(data)
ws:send_text(handledReply)
end
end
то есть, получается, Что мой handler на какое то время блокирует цикл и если от redis придет новое сообщение во время обработки то я его банально не увижу.
Собсно вопрос - как сделать нееблокирующую обработку myHander?
P.S я думал по поводу ngx.timer.at
но на сколько я вижу по его описанию - в него нельзя будет передать дескриптор ws чтобы отправить ответ.
Snusmumriken
Ну, первое что приходит в голову - прикрутить что-нибудь типа lua lanes, которая будет крутить кучу потоков параллельно, и диспетчер тредов, с подобной схемой:
1. Основной тред - чисто приёмо-передатчик между пулом потоков и клиентами, крутится в бесконечном цикле, принимает-отправляет.
2. Треды, которые выполняют грязную работу.
Основной цикл отправляет задачу тредам, ждёт пока те не отправят ему ответ и возвращает его. Чот такое.
У меня просто подобная схема работает в очень большом количестве мест, недостатков особых не вижу, возможно, resty не очень потянет ибо ждёт полного завершения задачи прежде чем посылать новый запрос, надо в настройках покопаться, там вроде можно задать количество потоков.
Snusmumriken
А, всё проще.
while true do
local data,err = redisClent:read_reply()
if not err then
local handledReply = myHander(data)
ws:send_text(handledReply)
end
end
В общем, если придёт ответ от редиски, ты его увидишь даже если хендлер заблокирует поток на пол часа (даже если отвалится по таймауту, ответ должен остаться).
TCP-соединениями занимается операционная система, а не твоё приложение, и она же записывает их в буферы TCP-стека, параллельно твоему приложению. Это не UDP, где если тебе наприсылали больше чем 64кб (которые предоставляет ос под UDP), то следующие ты не увидишь.
Поэтому в принципе, имхо можно расслабиться. Или ты проверял что теряется ответ?
Yuriy
Нет. Я не проверял. Я проверял что timer вызваться не может, ну и логично было бы для меня подумать что раз чтение ответа крутится к цикле, то цикл должен опрашивать каждый промежуток времени и в цикле операции блокирвоать не желательно
Yuriy
то есть я правильно понимаю из твоего сообщения что когда ответ от редики придет об будет валяться какое то время в TCP буффере?
Snusmumriken
Ага.
Yuriy
типа встроенной очереди. Оке. Я услышал. Это радует))
Snusmumriken
Если редиска вышлет 1000мб - то не факт что ось это потянет (при каждом receive - ось передаёт нам кусок буфера, и вычищает его, типа, зачем ей хранить лишнее). А небольшие куски будут болтаться.
Почитай про TCP-стек, довольно полезная инфа, особенно для сетевиков.
Yuriy
Я просто думал уже на ноду увдить эту обработку, хотя я думаю что там под капотом callback точно так же устроен
Да я в курсе по поводу TCP. Я просто не знаю на сколко косокеты openresty глубоко работают с TCP пэтому и беспокоюсь
1000 Мб там точно не будет. До 1000 знаков.
Snusmumriken
А так - неблокирующие пулы это особое искусство.
Можно на корутинах разбить задачу на кучу поздадач. Пример корутин - например вот тут:
https://www.lua.org/pil/9.4.html
Просто твой хендлер будет условно после каждого мало-мальски долгого действия выставлять coroutine.yield, и пока он не вернёт человеческий результат - твой цикл будет постоянно опрашивать то его, то редисковый сокет.
Я сам корутины не очень люблю ибо похоже на кучу goto в программе не по делу, и чуть что пригребаю набор тяжёлых потоков, но под простые задачи корутины пойдут.
Yuriy
аха. Я знаю, я их пользвоал на несолкьих задачах.
Я не пользовался lines - вот видимо пришел мой час)))
vvzvlad
Задача: один процесс/поток на lua записывает что-то в БД/память/файл, другие потоки/процессы сразу же получают коллбек "записано новое". емкость буфера — смех, 500кб хватит(на самом деле и 10кб). в приоритете нетребовательность к ресурсам, ибо процессор слабый. скорость реакции в пределах пары мс. где-чем это лучше сделать?
Snusmumriken
> емкость буфера — смех, 500кб
Но зато за него отвечает ось, и она запросит дальнейшие пакеты после того как мы получим наш 500-кб кусочек, вместо того чтобы нафиг всё обрубить. UDP просто продолжал бы слать пакеты после переполнения, и они уходили бы вникуда, пришлось бы мутить свои протоколы для того чтобы это восстановить.
Я этим уже занимался, у меня есть мой собсный RUDP-протокол на луа, с байтоёбством, это СЛОЖНО )))
И то, там нет, например, автобалансировки скорости, кроме тех случаев когда пачка пакетов не дошла.
vvzvlad
> емкость буфера — смех, 500кб
Но зато за него отвечает ось, и она запросит дальнейшие пакеты после того как мы получим наш 500-кб кусочек, вместо того чтобы нафиг всё обрубить. UDP просто продолжал бы слать пакеты после переполнения, и они уходили бы вникуда, пришлось бы мутить свои протоколы для того чтобы это восстановить.
Я этим уже занимался, у меня есть мой собсный RUDP-протокол на луа, с байтоёбством, это СЛОЖНО )))
И то, там нет, например, автобалансировки скорости, кроме тех случаев когда пачка пакетов не дошла.
а ты точно мне отвечаешь?)) там не будет других пакетов. там общение по uart с железкой с радио-интерфейсом, там пакеты по 1к максимум и их единицы в секунду максимум.
Snusmumriken
А, пардон, мне показалось что ты отвечаешь Юрке.
vvzvlad
не, я опять велосипеды строю
Snusmumriken
Ну тебе хватит простого однопоточного приложения.
Чтобы жрало мало - надыбай где-нибудь слип хоть в каком-то виде. Если есть jit - можно так:
local ffi = require("ffi")
ffi.cdef[[
void Sleep(int ms);
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
]]
os.sleep = ffi.os == "Windows" and ffi.C.Sleep
or function(s) ffi.C.poll(nil, 0, s) end
while true do
local data = Receive(0) -- неблокирующе, желательно
while data do
doSomething(data)
data = Receive(0)
end
os.sleep(10) -- в мс, достаточно чтобы разгрузить проц
end
Ну типа такой цикл без задержек примет и обработает кучу данных, а потом, если нет данных, будет уходить в слип.
Snusmumriken
Если нет jit - напиши dll/so-либу со слипом, это десять минуточек, или заюзай luasocket : )
vvzvlad
ммм... У меня есть библиотека luars232, которая читает и пишет в uart(=RS232). У нее блокирующее чтение и нет буфера, и жесткие требования насчет скорости, и большую часть времени она занимается тем, что в бесконечном цикле проверяет, не пришли ли ей данные. Если пришли — то она их разбирает, передает в функцию, функция согласно своей логике собирает пакет и отправляет его обратно в порт. Работает, по получается вещь в себе — чтение блокирует сам скрипт, а другой скрипт порт открыть уже не может, там доступ монопольный.
Я хочу сделать отдельный скрипт/поток/тред, который бы занимался только чтением-записью в порт(не сетевой, а RS232-порт, физический). Т.е. он мониторит порт, ему пришли данные — он записал их в бд-шку, в табличку "принято". другому скрипту пришел коллебек, он данные распарсил, в табличку "отправить" бд записал, что ему отправить надо — оп, первому скрипту тоже пришел коллбек, он в порт выплюнул, то, что пришло. Третий скрипт хочет что-то записать, он тоже в табличку "отправить" засунул, и первый скрипт точно так же отправил.
Вот я и хочу понять, как мне этот механизм передачи сообщений между потоками сделать, чтобы ресурсов много не ело, и задержка была не очень большая.
Snusmumriken
А, ну тады lanes.
Ты прост говорил что у тебя буфер на 500кб, если он есть и он даёт возможность неблокирующего чтения - это офигенно )))
Если блокирует - посмотри, насколько загружен процессор в процессе приёма. Если на 50-100% - это риально трешёвая либа, перепиши.
Ну и lanes, да. Если под твою архитектуру нет lanes - заюзай luasocket и несколько открытых приложений: приложение-приёмник с uart и приложения которым приёмник пересылает.
Что-то типа микросервисов. Можно ещё развлечься на тему "подписок на сообщения", мол, когда запускаешь приёмник он начинает слушать порт, а когда запускаешь всё остальное - они подключаются к приёмнику и подписываются на приём данных от него.
vvzvlad
нет, мне нужен буфер на 500кб максиум, больше сообщений там единовременно никогда не будет.
>>Если блокирует - посмотри, насколько загружен процессор в процессе приёма.
когда? при приеме с uart? нет, там нормально, там пара процентов.
Snusmumriken
Норм. Тогда это и есть твой слип : )
Ну, знаешь? Типа слип в вендовой cmd - это пинг сервера в течение n секунд. А тут у тебя слип пока ты принимаешь с uart.
Кстати, у тебя там таймер задаётся, сколько времени принимать с uart'а?
vvzvlad
>>Если под твою архитектуру нет lanes - заюзай luasocket и несколько открытых приложений: приложение-приёмник с uart и приложения которым приёмник пересылает.
А я не хочу пересылать. Я хочу что-то типа таблички в базе данных, куда кто угодно может записать и кто угодно может прочитать. но с коллбеками, которые бы вызывались при записи.
vvzvlad
Snusmumriken
Ну тебе хватит простого однопоточного приложения.
Чтобы жрало мало - надыбай где-нибудь слип хоть в каком-то виде. Если есть jit - можно так:
local ffi = require("ffi")
ffi.cdef[[
void Sleep(int ms);
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
]]
os.sleep = ffi.os == "Windows" and ffi.C.Sleep
or function(s) ffi.C.poll(nil, 0, s) end
while true do
local data = Receive(0) -- неблокирующе, желательно
while data do
doSomething(data)
data = Receive(0)
end
os.sleep(10) -- в мс, достаточно чтобы разгрузить проц
end
Ну типа такой цикл без задержек примет и обработает кучу данных, а потом, если нет данных, будет уходить в слип.
У меня есть ощущение, что ты что-то усложняешь : )
Вот тут тогда - приём на протяжении одной мс, если приняли - пишем в базу, другие приложения подхватят.
Запись в базу - это быстро.
Snusmumriken
Думаю что это - вообще всё что тебе нужно.
while true do
local data = Receive(0.1)
while data do
writeToBase(data)
data = Receive(0.1)
end
end
Snusmumriken
Или тебе интересно как сделать колбеки в других приложениях?
vvzvlad
а чтение? проверять наличие новых сообщений так же?
Snusmumriken
> Receive(0.1) - чтение с uart, замени своей функцией чтения с uart.
vvzvlad
не. мне надо проверить, нет ли чего нового в бд, чтобы отправить.
Snusmumriken
Ммм. Счётчик количества записей в бд.
Если кол-во записей изменилось - прочитать все новые записи, обработать их.
Как вариант, в базе хранится статус записи, и когда ты берёшь запись каким-то приложением на обработку - помечаешь её как "в работе" (и ещё время смены статуса, желательно), а когда завершаешь работу с ней - "обработано".
Приложения обработки, соответственно, делают запрос на записи, которые не в работе, или те, у которых статус "в работе" держится слишком долго (на случай внезапного вырубания железки, чтобы не морочиться выставлением статусов тому, что было в обработке в момент вырубания).
vvzvlad
ну, т.е.
while true do
data_received = uart_receive(0.1)
if (data_received ~= nil)
db_write(data_received)
end
data_for_send = db_read()
if (data_send ~= nil)
uart_send(data_for_send)
end
end
Snusmumriken
А зачем приёмнику что-то принимать из базы? Он ещё ответы возвращает?
Если так - то да, чот такое.
Правда, я бы сделал чуть иначе, чтобы за один такт обрабатывало сразу кучу всего:
while true do
-- приняли и записали в базу
-- всё что нам присылают
local data = uart_receive(0.1)
while data do
db_write(data_received)
data = uart_receive(0.1)
end
-- считали первую попавшуюся "выполненную"
-- подразумевается что db_read удаляет ту
-- что выполнена, в общем, как итератор
data = db_read()
while data do
uart_send(data)
data = db_read(0.1)
end
end
vvzvlad
У меня двухсторонее общение с железкой. От нее можно что-то принять, а потом ей что-то отправить, и вернуться опять в цикл ожидания нового приема. это работает, но я не могу отправить ей просто сообщение, потому что цикл чтения блокирует весь поток. а я бы хотел запустить mqtt-сервер или tcp-сервер, который точно так же блокирует текущий поток. вместо в одном скрипте они работать не будут
поэтому я хочу сделать отдельный поток на чтение-запись и общаться с ним записью-чтением в бд.
чтобы был поток, который общается с uart, а был отдельно tcp-сервер, который бы писал в бд, и отдельно mqtt-сервер, который бы тоже читал-писал в бд.
Snusmumriken
Расслабься, хоть какой-то железный буфер там должен присутствовать в любом случае, железки работают параллельно друг от друга.
vvzvlad
в этом варианте я боюсь, что несколько скриптов, которые в цикле делают db_read() сожрут все ресурсы. поэтому я хотел коллбеки.
vvzvlad
ааа
Snusmumriken
Твоя база умеет посылать сигналы в момент записи? Или она сама умеет запускать приложения на запись?
vvzvlad
у меня нет базы
vvzvlad
я хочу ее сделать, поэтому и спрашиваю, что выбрать
Snusmumriken
Да что угодно, хоть пара папок с файлами на приём/передачу.
Приложения мониторят папку, если видят в ней файл - забирают его содержимое и переименовывают, типа чтобы другим приложениям не досталось. Тоже кручение в цикле, мониторинг количества и обработка на тот момент, когда появился файл. Ну и слип, конечно. Слип - это важно, без него твои бесконечные циклы сожрут 100% проца без толку.
Вместо папок можно сокеты и связь по ним, без бд. Вместо db_read() и db_write() - socket.read()/socket.write()
Расслабляемся и делаем максимально просто.
vvzvlad
но как же... коллбеки... асинхронность...
Snusmumriken
Сокеты блин!
Чувак, тот факт что у тебя куча приложений запущено - уже говорит о асинхронности, панимаищь?
Колбеки сам сделаешь на приём данных из базы/сокета.
vvzvlad
1)сокет — это fifo-буфер?
vvzvlad
2)Могу ли я записать в сокет один раз, а прочитать его содержимое двумя разными потоками?
vvzvlad
3)что будет, если два потока будут одновременно писать в один сокет?
vvzvlad
4)надо ли иметь открытый на другом конце сокет для записи в него?
Snusmumriken
*приёмник uart*
while true do
local data = uart_receive(0.1)
while data do
socket_send(data)
data = uart_receive(0.1)
end
data = socket_read()
while data do
uart_send(data)
data = socket_read(0.1)
end
end
*обработчики*
while true do
local data = socket_receive()
while data do
answer = CALLBACK(data)
socket_send(answer)
end
end
vvzvlad
да, я понимаю, как это работает. мне тонкости не очень ясны.
Snusmumriken
1. Сокет - это fifo-буфер. То что первым прислали - будет первым обработано.
2. Сокет - это связь между ДВУМЯ приложениями. Чтобы переслать сразу нескольким - пересылай сразу нескольким.
3. Тебе будет заебись и с UDP. Если несколько потоков пишут в один слушающий сокет - они сохранятся в буфере. Чтобы обработать ВСЁ что нам прислали "за этот такт" - вот эта штука и есть:
local data = socket_receive()
while data do
answer = CALLBACK(data)
socket_send(answer)
end
4. Сокеты не открыты для записи и не закрыты. У них режим "слушать" - типа, запросить у ОС порт, все данные приходящие на который будут передаваться именно нашему приложению, а не кому-то ещё. Тебе, для простоты, достаточно открыть слушающий на обработчиках по списку.
vvzvlad
я имею ввиду, запись в сокет — атомарная операция? т.е. пока один поток пишет, второй ждет? я просто из железа пришел, и там запись в порты не всегда атомарная, там легко могут перепутаться два пакета, если писать их однвременно.
Snusmumriken
Нет, буферизированная.
Читай стак TCP/IP.
Snusmumriken
Оно многослойное.
https://ru.wikipedia.org/wiki/TCP/IP
vvzvlad
ага
Snusmumriken
Карочи, сначала - читай и пробуй, потом спрашивай.
Я могу дать много информации по сетям, но она бесполезна пока ты сам не поймёшь всю вот эту базу, а спрашивать у меня базу - тратить моё время, ибо её много но она тупая.
Snusmumriken
Вот тебе книжка для лёгкого чтения:
http://naymov.com/edu/ukit/olifer.pdf
Ты близок к железу, поэтому тебе будет гораздо проще.
fgntfg
Чем мультикаст от броадкаста отличается?
На каком уровне модели oci работает свитч? А L3? А роутер?
fgntfg
Можно ответить просебя
Snusmumriken
Мультикаст - отправка нескольким чувакам, броадкаст - отправка вообще всем в подсети (UDP на броадкаст-адрес, например).
Свитч - канальный уровень osi, работает на MAC-адресах. Роутер - сетевой уровень, айпишники и маки, анализ содержимого подсети, выдача айпишников если dhcp включен.
L3 - не понял.
fgntfg
L3 switch
Snusmumriken
Ну какой-то особый свитч, про который я не читал. Хм.
fgntfg
Работает на сетевом уровне.
Snusmumriken
Ну типа что-то между свитчом и роутером, кароч.
fgntfg
Есть IPv4, есть IPv6, куда дели v5?
Snusmumriken
Вникуда : )
Экспериментальный, типа, как расширение ipv4.
Snusmumriken
Да нет, вроде всё просто.
Snusmumriken
Можно задавать вопросы, типа "когда выгодно использовать звёздчатую архитектуру сети, а когда - распределённую". Но тут можно найти аналогии в программировании, поэтому не так весело.
Snusmumriken
Ты шо, одмен? )))
Может тебе ещё принести ведро джиттера? ))
Snusmumriken
Вот есть windows 8, а есть - windows 10. Куда дели windows 9? ))
mva
mva
могу даже пруф дать
mva
а вот 7, 8 и 9 - нету :)
Snusmumriken
IPv4+IPv6=IPv10
fgntfg
Я такую хрень щас админю, что врагу не пожелаешь
Snusmumriken
Нашёл у себя в подъезде бумажную версию. Типа, у дома привычка оставлять ненужное но потенциально ценное барахло.
Lucky me!
Snusmumriken
Пофигу : )
Олифер даёт наиболее полную инфу, начиная с низов, и это не изменится в ближайшее время.
Хотя первые две трети книги работают "историей развития сетей", это просто интересно.