The
i understood the front thing just voicing out my inconvenience .. for now i did.. producer(incoming_message_to_queue_process) { If (!spsc_queue.empty()) { spsc_queue.push(incoming_message_to_queue_process) return; } spsc_queue.push(incoming_message_to_queue_process) class_variable.copy(incoming_message_to_queue_process) consume(class_variable) // here i needed front.. } consume(std::string something) { class_variable.clear() spsc_queue.pop() // do something if spsc_queue has more data -> consume( spsc_queue.front() ) } problem is now have two allocations.. and not sure if this is completely safe.. class_variable is normal std::string, i first thought let me wrap that as a char* in atomic.. then thought how to manage length.. just doing whatever gets me to end POC for now.. will smooth everything later.. but i'm wary of this as that wierd ordering things are difficult to catch in tests.. and i don't have deep understanding too..
The
this is working rn for large messages.. ~100mb .. gonna test small <1kb messages but with larger/sec... this part is biting me since a week
The
No this is completely wrong. It is not thread safe. Why can't the consumer just pop from the queue or use try_pop?
pop will make it a temp variable.. this feeds to async_write which doesn't hold it's own variables..
Anonymous
pop will make it a temp variable.. this feeds to async_write which doesn't hold it's own variables..
I don't get what you are saying . If you have another asynchronous operation that is called from the consumer thread, then only pop makes sense as the popped value can then be passed on safely to other threads leaving the queue in a valid state for further processing. If you are concerned about lifetimes, then I would suggest using a unique_ptr/shared_ptr to store the elements inside the queue
Anonymous
can't pass shared_ptr to the async op.. only a buffer..
Well then your async op needs to be redesigned
The
gonna search move semantics with it.. hope it supports
Aditya
Hello give me some tips to master c programming!!
Aditya
see lnked message
Not understand 😔
The
Not understand 😔
sorry pinned message
Aditya
sorry pinned message
I am restricted their
zahra
Has anyone ever worked with the cgal library?
Ludovic 'Archivist'
and that is only if "front" here means the value you just pushed
The
A lock-free queue cannot both work for your usecase and be lockfree. You can use a disruptor to get similar effects and use your "front" method before validating the transaction but that would not be lock-free
I made a change to the message you replied on.. spsc_queue is the boost lockfree one.. if i make the class variable an atomic pointer, this should work right.. coz i tests without atomic yesterday and it worked fine on ~5000 msgs/sec sustained.. but yes you never know by test as ordering stuff will show up in wierd cpu conditions
Ludovic 'Archivist'
or it would lock
Ludovic 'Archivist'
even your "empty" method is technically fuzzy
The
with a disruptor, it would instead look like producer(incoming_message) { disruptor.write_transaction(incoming_message.size(), [&](auto buffer){ std::copy(incoming_message.begin(),incoming_message.end(),buffer.begin()); // Do whatever you want to the incoming message or buffer }); }
1. Do you have a distruptor lib for cpp ? 2. My producer and consumer are running on different threads..that buffer is the problem.. it's written to in producer say.. and emptied/consumed on consumer... Here same problem that buffer has to be thread safe.. i already have queues to "producer" in distruptor event style
The
your "front" method doesn't ensure the preservation of the actual front value. It would be just as safe as the front method on a container from another thread
It's on consumer.. boost docs say it ensures on consumer.. and empty is fine too according to them.. though not sure how fine.
Ludovic 'Archivist'
I have an example of a disruptor, but it is bad (by that I mean suboptimal, I need to fix the memory orders because they are all seq_cst at the moment) https://git.nekoit.xyz/Archivist/SnugLog/raw/branch/main/LibSnugLog/include/disruptor.h
Ludovic 'Archivist'
please note that it is not generic (and was written by mr crazy that wants to mount disruptors as memory mapped files)
The
There's a consume function offered by spsc_queue.. where you can pass in the function utilising front element.. like instead of what I'm doing rn I could do Queue_.consume(&async_func) Not sure of safety/correctness etc.
The
wait, is your usecase 5000 messages per second or less?
It can be more... Right now I testing with single client connected...
The
Say 5*10 client = 50k/sec should be alright
The
But each client gets its own queue too
Ludovic 'Archivist'
But each client gets its own queue too
I was about to say that it seemed that way
Ludovic 'Archivist'
man, with these loads, you may as well use a mutex and get everything correct
Ludovic 'Archivist'
is order of messages important?
The
Yeah I was thinking of it.. but no readymade lib.. So if I make mine.. Wrap push , pop, front, empty in mutex and done right...
The
is order of messages important?
I think not right now.. as long as they don't cause jumbling in milliseconds range
Ludovic 'Archivist'
yeah, a locked queue would do better here, but your usecase looks very smelly
Ludovic 'Archivist'
a queue is supposed to be fire and forget
The
yeah, a locked queue would do better here, but your usecase looks very smelly
Basically I have a multithreaded system sending events.. to this websocket layer I'm running in single thread i/o context.. problem arises when those events coming in cause races.. even after i put all of them through a single queue.. Coz io/context is in different thread and the one sending it would be different.. Plus async asio functions don't hold ownership of object or I would fire and forget.. got to remember till the async_write completes.. Then I have a wide range of message sizes.. yesterday tested few bytes - 150mb .. and throughput i would like to say 5000 but last year I did a mistake and hit much above that..crashed prod db lol
Ludovic 'Archivist'
Is your concern the lifetime of the consumer?
The
Yes
Ludovic 'Archivist'
Ah, then don't solve your problem with a queue
Ludovic 'Archivist'
the queue is to transfer data from the web framework to the connection bound keep alive producer
Ludovic 'Archivist'
not to keep the consumer alive
Ludovic 'Archivist'
does your framework have an event loop?
The
You want me to use a single variable? Coz that's what I did above
Adeyemo
This conversation is very interesting I must say and I'm learning a whole lot here.
The
does your framework have an event loop?
It's asio.. i can go that way.. you can put incoming function wholesale on to the current execution strand(thread).. but I didn't want to do that way..
Ludovic 'Archivist'
Ludovic 'Archivist'
and when the connection with the client is lost, you tag that with an atomic bool inside of it for it to stop
The
No, the idea is to make the consumer process an object that keeps itself alive by rescheduling itself
Here object again turns into a queue lol.. i have streaming.. so as I was thinking earlier.. I could do a struct of char*, size, and the atomic bool for in use or not.. and reuse it.. But for reuse I need to check if in not use then free the char block
The
Though I don't completely understand what you mean
Ludovic 'Archivist'
Here object again turns into a queue lol.. i have streaming.. so as I was thinking earlier.. I could do a struct of char*, size, and the atomic bool for in use or not.. and reuse it.. But for reuse I need to check if in not use then free the char block
my mmo game handles it like that: void game::run(std::shared_ptr<game> self) { using namespace std::chrono_literals; if(ended.load()) return; tick(self); auto self_runner = [=]() { self->run(self); }; drogon::app().getLoop()->runAfter(500us, self_runner); }
Ludovic 'Archivist'
making the consumer a self-fulfilling prophecy of sort
Ludovic 'Archivist'
and on the web framework, I store a (mutex protected) set of all the game (or clients) to which I send the messages
Ludovic 'Archivist'
(here the "tick" metho is something like follows:) void game::tick(std::shared_ptr<game> self) { process_inbox(); process_gameloop(); process_outbox(); }
The
I could do queue_.consume_one .. it feels similar.. Look here code for consume_one https://www.boost.org/doc/libs/1_59_0/boost/lockfree/spsc_queue.hpp
Ludovic 'Archivist'
Self is the one containing data right.. you basically have no threads to sync across right..
the framework thread and the game scheduled object, the event loop is a work stealing scheduler
The
but why use a queue if you always consume_one on the producer?
I have incoming streams-> call this producer function, queues it up if consumer busy-> consumer function loops back on itself till queue finished
The
Producer consumer different threads here
Ludovic 'Archivist'
I have incoming streams-> call this producer function, queues it up if consumer busy-> consumer function loops back on itself till queue finished
I do that too, the web part bunches up the messages in a queue in my game object, the game object keeps scheduling itself until the web framework tells it the client disconnected Both may or may not be scheduled on different threads, the data is transmitted with a locked queue, but the locked queue is independent of the lifetime preserving system
Ludovic 'Archivist'
and this handles a few thousands clients with around 100k messages per second per cpu core
The
(here the "tick" metho is something like follows:) void game::tick(std::shared_ptr<game> self) { process_inbox(); process_gameloop(); process_outbox(); }
This is a different thread here ? I am unable to see where you pass data cross thread.. not getting which is a different thread
The
This is not slow? The locked queue.. If your each connection did 5kmps would it work?
Ludovic 'Archivist'
This is not slow? The locked queue.. If your each connection did 5kmps would it work?
well, it scales easily to 400k messages per seconds on a quad core machine from 2011 with a few hundred users
The
I have a similar arch rest of the code.. just that point is hitting my foot
Ludovic 'Archivist'
(while running a literal interpreter in the hot loop of "tick" may i mention)
Ludovic 'Archivist'
I have a similar arch rest of the code.. just that point is hitting my foot
make use of ioservices and threads to run your client processes, they are made for that in asio
Ludovic 'Archivist'
(they just are not very comfy)
Ludovic 'Archivist'
I just do not have to deal with the multithread scheduling myself