The
i understood the front thing just voicing out my inconvenience .. for now i did..
producer(incoming_message_to_queue_process)
{
If (!spsc_queue.empty())
{ spsc_queue.push(incoming_message_to_queue_process)
return;
}
spsc_queue.push(incoming_message_to_queue_process)
class_variable.copy(incoming_message_to_queue_process)
consume(class_variable) // here i needed front..
}
consume(std::string something)
{
class_variable.clear()
spsc_queue.pop()
// do something
if spsc_queue has more data -> consume( spsc_queue.front() )
}
problem is now have two allocations.. and not sure if this is completely safe.. class_variable is normal std::string, i first thought let me wrap that as a char* in atomic.. then thought how to manage length.. just doing whatever gets me to end POC for now.. will smooth everything later..
but i'm wary of this as that wierd ordering things are difficult to catch in tests.. and i don't have deep understanding too..
The
this is working rn for large messages.. ~100mb .. gonna test small <1kb messages but with larger/sec...
this part is biting me since a week
Anonymous
The
The
Anonymous
The
Anonymous
The
The
gonna search move semantics with it.. hope it supports
Aditya
Hello give me some tips to master c programming!!
The
zahra
Has anyone ever worked with the cgal library?
Ludovic 'Archivist'
Ludovic 'Archivist'
and that is only if "front" here means the value you just pushed
Ludovic 'Archivist'
Ludovic 'Archivist'
Ludovic 'Archivist'
or it would lock
Ludovic 'Archivist'
even your "empty" method is technically fuzzy
The
Ludovic 'Archivist'
Ludovic 'Archivist'
I have an example of a disruptor, but it is bad (by that I mean suboptimal, I need to fix the memory orders because they are all seq_cst at the moment)
https://git.nekoit.xyz/Archivist/SnugLog/raw/branch/main/LibSnugLog/include/disruptor.h
Ludovic 'Archivist'
please note that it is not generic (and was written by mr crazy that wants to mount disruptors as memory mapped files)
The
There's a consume function offered by spsc_queue.. where you can pass in the function utilising front element.. like instead of what I'm doing rn I could do
Queue_.consume(&async_func)
Not sure of safety/correctness etc.
Ludovic 'Archivist'
The
Say 5*10 client = 50k/sec should be alright
The
But each client gets its own queue too
Ludovic 'Archivist'
man, with these loads, you may as well use a mutex and get everything correct
Ludovic 'Archivist'
is order of messages important?
The
Yeah I was thinking of it.. but no readymade lib..
So if I make mine..
Wrap push , pop, front, empty in mutex and done right...
Ludovic 'Archivist'
Ludovic 'Archivist'
yeah, a locked queue would do better here, but your usecase looks very smelly
Ludovic 'Archivist'
a queue is supposed to be fire and forget
The
yeah, a locked queue would do better here, but your usecase looks very smelly
Basically I have a multithreaded system sending events.. to this websocket layer I'm running in single thread i/o context.. problem arises when those events coming in cause races.. even after i put all of them through a single queue..
Coz io/context is in different thread and the one sending it would be different..
Plus async asio functions don't hold ownership of object or I would fire and forget.. got to remember till the async_write completes..
Then I have a wide range of message sizes.. yesterday tested few bytes - 150mb .. and throughput i would like to say 5000 but last year I did a mistake and hit much above that..crashed prod db lol
Ludovic 'Archivist'
Is your concern the lifetime of the consumer?
The
Yes
Ludovic 'Archivist'
Ah, then don't solve your problem with a queue
Ludovic 'Archivist'
the queue is to transfer data from the web framework to the connection bound keep alive producer
Ludovic 'Archivist'
not to keep the consumer alive
Ludovic 'Archivist'
does your framework have an event loop?
The
You want me to use a single variable? Coz that's what I did above
Adeyemo
This conversation is very interesting I must say and I'm learning a whole lot here.
The
does your framework have an event loop?
It's asio.. i can go that way.. you can put incoming function wholesale on to the current execution strand(thread).. but I didn't want to do that way..
Ludovic 'Archivist'
Ludovic 'Archivist'
and when the connection with the client is lost, you tag that with an atomic bool inside of it for it to stop
The
Though I don't completely understand what you mean
Ludovic 'Archivist'
Ludovic 'Archivist'
making the consumer a self-fulfilling prophecy of sort
Ludovic 'Archivist'
and on the web framework, I store a (mutex protected) set of all the game (or clients) to which I send the messages
Ludovic 'Archivist'
(here the "tick" metho is something like follows:)
void game::tick(std::shared_ptr<game> self) {
process_inbox();
process_gameloop();
process_outbox();
}
The
I could do queue_.consume_one .. it feels similar..
Look here code for consume_one https://www.boost.org/doc/libs/1_59_0/boost/lockfree/spsc_queue.hpp
Ludovic 'Archivist'
The
The
Producer consumer different threads here
Ludovic 'Archivist'
and this handles a few thousands clients with around 100k messages per second per cpu core
The
Ludovic 'Archivist'
The
Ludovic 'Archivist'
The
This is not slow? The locked queue.. If your each connection did 5kmps would it work?
The
I have a similar arch rest of the code.. just that point is hitting my foot
Ludovic 'Archivist'
(while running a literal interpreter in the hot loop of "tick" may i mention)
Ludovic 'Archivist'
(they just are not very comfy)
Ludovic 'Archivist'
I just do not have to deal with the multithread scheduling myself