Skip to content

Commit 58e0a5e

Browse files
committed
Merge pull request #2 from awslabs/ThreadPool
Thread pool based executor implementation
2 parents 07987db + 275b1ba commit 58e0a5e

File tree

6 files changed

+257
-11
lines changed

6 files changed

+257
-11
lines changed

aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,22 @@
1515

1616
#pragma once
1717

18-
1918
#include <aws/core/Core_EXPORTS.h>
20-
21-
22-
#include <functional>
2319
#include <aws/core/utils/memory/stl/AWSFunction.h>
20+
#include <aws/core/utils/memory/stl/AWSQueue.h>
21+
#include <aws/core/utils/memory/stl/AWSVector.h>
22+
#include <functional>
23+
#include <mutex>
24+
2425

2526
namespace Aws
2627
{
2728
namespace Utils
2829
{
2930
namespace Threading
3031
{
32+
class ThreadTask;
33+
3134
/**
3235
* Interface for implementing an Executor, to implement a custom thread execution strategy, inherit from this class
3336
* and override SubmitToThread().
@@ -63,7 +66,52 @@ namespace Aws
6366
public:
6467
DefaultExecutor() {}
6568
protected:
66-
bool SubmitToThread(std::function<void()>&&);
69+
bool SubmitToThread(std::function<void()>&&) override;
70+
};
71+
72+
enum class OverflowPolicy
73+
{
74+
QUEUE_TASKS_EVENLY_ACCROSS_THREADS,
75+
REJECT_IMMEDIATELY
76+
};
77+
78+
/**
79+
* Thread Pool Executor implementation.
80+
*/
81+
class AWS_CORE_API PooledThreadExecutor : public Executor
82+
{
83+
public:
84+
PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS);
85+
~PooledThreadExecutor();
86+
87+
/**
88+
* Rule of 5 stuff.
89+
* Don't copy or move
90+
*/
91+
PooledThreadExecutor(const PooledThreadExecutor&) = delete;
92+
PooledThreadExecutor& operator =(const PooledThreadExecutor&) = delete;
93+
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
94+
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;
95+
96+
protected:
97+
bool SubmitToThread(std::function<void()>&&) override;
98+
99+
private:
100+
Aws::Queue<std::function<void()>*> m_tasks;
101+
std::mutex m_queueLock;
102+
std::mutex m_syncPointLock;
103+
std::condition_variable m_syncPoint;
104+
Aws::Vector<ThreadTask*> m_threadTaskHandles;
105+
size_t m_poolSize;
106+
OverflowPolicy m_overflowPolicy;
107+
108+
/**
109+
* Once you call this, you are responsible for freeing the memory pointed to by task.
110+
*/
111+
std::function<void()>* PopTask();
112+
bool HasTasks();
113+
114+
friend class ThreadTask;
67115
};
68116

69117

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
#pragma once
17+
18+
#include <aws/core/Core_EXPORTS.h>
19+
#include <functional>
20+
#include <thread>
21+
#include <atomic>
22+
23+
namespace Aws
24+
{
25+
namespace Utils
26+
{
27+
namespace Threading
28+
{
29+
class PooledThreadExecutor;
30+
31+
class AWS_CORE_API ThreadTask
32+
{
33+
public:
34+
ThreadTask(PooledThreadExecutor& executor);
35+
~ThreadTask();
36+
37+
/**
38+
* Rule of 5 stuff.
39+
* Don't copy or move
40+
*/
41+
ThreadTask(const ThreadTask&) = delete;
42+
ThreadTask& operator =(const ThreadTask&) = delete;
43+
ThreadTask(ThreadTask&&) = delete;
44+
ThreadTask& operator =(ThreadTask&&) = delete;
45+
46+
void StopProcessingWork();
47+
48+
protected:
49+
void MainTaskRunner();
50+
51+
private:
52+
std::atomic<bool> m_continue;
53+
PooledThreadExecutor& m_executor;
54+
std::thread m_thread;
55+
};
56+
}
57+
}
58+
}

aws-cpp-sdk-core/source/utils/logging/AWSLogging.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616

1717
#include <aws/core/utils/logging/AWSLogging.h>
18-
1918
#include <aws/core/utils/logging/LogSystemInterface.h>
2019

2120
#include <memory>

aws-cpp-sdk-core/source/utils/threading/Executor.cpp

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,96 @@
1414
*/
1515

1616
#include <aws/core/utils/threading/Executor.h>
17-
18-
17+
#include <aws/core/utils/threading/ThreadTask.h>
1918
#include <thread>
2019

20+
static const char* POOLED_CLASS_TAG = "PooledThreadExecutor";
21+
2122
using namespace Aws::Utils::Threading;
2223

2324
bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
2425
{
2526
std::thread t(fx);
2627
t.detach();
2728
return true;
28-
}
29+
}
30+
31+
PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
32+
m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
33+
{
34+
for (size_t index = 0; index < m_poolSize; ++index)
35+
{
36+
m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this));
37+
}
38+
}
39+
40+
PooledThreadExecutor::~PooledThreadExecutor()
41+
{
42+
for(auto threadTask : m_threadTaskHandles)
43+
{
44+
threadTask->StopProcessingWork();
45+
}
46+
47+
m_syncPoint.notify_all();
48+
49+
for (auto threadTask : m_threadTaskHandles)
50+
{
51+
Aws::Delete(threadTask);
52+
}
53+
54+
while(m_tasks.size() > 0)
55+
{
56+
std::function<void()>* fn = m_tasks.front();
57+
m_tasks.pop();
58+
59+
if(fn)
60+
{
61+
Aws::Delete(fn);
62+
}
63+
}
64+
65+
}
66+
67+
bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
68+
{
69+
//avoid the need to do copies inside the lock. Instead lets do a pointer push.
70+
std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn));
71+
72+
{
73+
std::lock_guard<std::mutex> locker(m_queueLock);
74+
75+
if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_poolSize >= m_tasks.size())
76+
{
77+
return false;
78+
}
79+
80+
m_tasks.push(fnCpy);
81+
}
82+
83+
m_syncPoint.notify_one();
84+
85+
return true;
86+
}
87+
88+
std::function<void()>* PooledThreadExecutor::PopTask()
89+
{
90+
std::lock_guard<std::mutex> locker(m_queueLock);
91+
92+
if (m_tasks.size() > 0)
93+
{
94+
std::function<void()>* fn = m_tasks.front();
95+
if (fn)
96+
{
97+
m_tasks.pop();
98+
return fn;
99+
}
100+
}
101+
102+
return nullptr;
103+
}
104+
105+
bool PooledThreadExecutor::HasTasks()
106+
{
107+
std::lock_guard<std::mutex> locker(m_queueLock);
108+
return m_tasks.size() > 0;
109+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
#include <aws/core/utils/threading/ThreadTask.h>
17+
#include <aws/core/utils/threading/Executor.h>
18+
19+
using namespace Aws::Utils;
20+
using namespace Aws::Utils::Threading;
21+
22+
ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_executor(executor), m_thread(std::bind(&ThreadTask::MainTaskRunner, this))
23+
{
24+
}
25+
26+
ThreadTask::~ThreadTask()
27+
{
28+
StopProcessingWork();
29+
m_thread.join();
30+
}
31+
32+
void ThreadTask::MainTaskRunner()
33+
{
34+
while (m_continue)
35+
{
36+
while (m_continue && m_executor.HasTasks())
37+
{
38+
auto fn = m_executor.PopTask();
39+
if(fn)
40+
{
41+
(*fn)();
42+
Aws::Delete(fn);
43+
}
44+
}
45+
46+
std::unique_lock<std::mutex> locker(m_executor.m_syncPointLock);
47+
if(m_continue)
48+
{
49+
m_executor.m_syncPoint.wait(locker);
50+
}
51+
}
52+
}
53+
54+
void ThreadTask::StopProcessingWork()
55+
{
56+
m_continue = false;
57+
}

aws-cpp-sdk-dynamodb-integration-tests/TableOperationTest.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
#include <aws/core/client/CoreErrors.h>
1919
#include <aws/core/auth/AWSCredentialsProviderChain.h>
2020
#include <aws/core/http/HttpTypes.h>
21-
#include <aws/dynamodb/DynamoDBClient.h>
22-
#include <aws/dynamodb/DynamoDBErrors.h>
21+
#include <aws/core/utils/memory/AWSMemory.h>
2322
#include <aws/core/utils/UnreferencedParam.h>
2423
#include <aws/core/utils/Outcome.h>
2524
#include <aws/core/utils/memory/stl/AWSSet.h>
2625
#include <aws/core/utils/memory/stl/AWSStringStream.h>
2726
#include <aws/core/utils/ratelimiter/DefaultRateLimiter.h>
27+
#include <aws/core/utils/threading/Executor.h>
28+
#include <aws/dynamodb/DynamoDBClient.h>
29+
#include <aws/dynamodb/DynamoDBErrors.h>
2830
#include <aws/dynamodb/model/CreateTableRequest.h>
2931
#include <aws/dynamodb/model/DeleteTableRequest.h>
3032
#include <aws/dynamodb/model/DescribeTableRequest.h>
@@ -159,6 +161,7 @@ class TableOperationTest : public ::testing::Test {
159161
config.readRateLimiter = m_limiter;
160162
config.writeRateLimiter = m_limiter;
161163
config.httpLibOverride = transferType;
164+
config.executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(ALLOCATION_TAG, 25);
162165

163166
//to test proxy functionality, uncomment the next two lines.
164167
//config.proxyHost = "localhost";

0 commit comments

Comments
 (0)