Многопоточность очереди событий / задач C++

Ниже представлена ​​реализация, для threads которой не требуется метод queue functionProxy. Хотя добавлять c++ новые методы проще, это все cpp равно беспорядочно.

Boost threads :: Bind и "Futures", похоже, многое thread из этого уберут. Думаю, я pthreads посмотрю на код повышения multithread и посмотрю, как он работает. Всем thread спасибо за ваши предложения.

GThreadObject.h

#include 

using namespace std;

class GThreadObject
{

    template 
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include 
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))>hreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))>hreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

main.cpp

#include 
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

Изменить: для cpp полноты картины я реализовал multi-threaded Boost :: bind. Ключевые отличия:

queue > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(>hreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function func = jobQueue.front();
    func();
}

Использование c++ ускоренной реализации для multithread 10 000 000 итераций functionOne() заняло threading ~ 19 секунд. Однако реализация cxx без ускорения заняла всего queue ~ 6.5 секунд. Так примерно multithread в 3 раза медленнее. Я предполагаю, что pthreads поиск хорошей очереди без multithread блокировки будет самым большим pthreads препятствием для производительности. Но multithread разница все равно довольно cxx большая.

c++

multithreading

queue

pthreads

2022-01-03T15:48:21+00:00