sptk2 logo
SPTK Home Page
SynchronizedQueue.h
1/*
2╔══════════════════════════════════════════════════════════════════════════════╗
3║ SIMPLY POWERFUL TOOLKIT (SPTK) ║
4╟──────────────────────────────────────────────────────────────────────────────╢
5║ copyright © 1999-2022 Alexey Parshin. All rights reserved. ║
6║ email alexeyp@gmail.com ║
7╚══════════════════════════════════════════════════════════════════════════════╝
8┌──────────────────────────────────────────────────────────────────────────────┐
9│ This library is free software; you can redistribute it and/or modify it │
10│ under the terms of the GNU Library General Public License as published by │
11│ the Free Software Foundation; either version 2 of the License, or (at your │
12│ option) any later version. │
13│ │
14│ This library is distributed in the hope that it will be useful, but │
15│ WITHOUT ANY WARRANTY; without even the implied warranty of │
16│ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library │
17│ General Public License for more details. │
18│ │
19│ You should have received a copy of the GNU Library General Public License │
20│ along with this library; if not, write to the Free Software Foundation, │
21│ Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. │
22│ │
23│ Please report all bugs and problems to alexeyp@gmail.com. │
24└──────────────────────────────────────────────────────────────────────────────┘
25*/
26
27#pragma once
28
29#include <functional>
30#include <mutex>
31#include <queue>
32#include <sptk5/sptk.h>
33#include <sptk5/threads/Semaphore.h>
34
35namespace sptk {
36
47template<class T>
49{
50public:
51 virtual ~SynchronizedQueue() = default;
52
60 using CallbackFunction = std::function<bool(T& item)>;
61
70 void push(T&& data)
71 {
72 std::unique_lock lock(m_mutex);
73 m_queue->push(std::move(data));
74 lock.unlock();
75 m_semaphore.post();
76 }
77
85 void push(const T& data)
86 {
87 std::unique_lock lock(m_mutex);
88 m_queue->push(data);
89 lock.unlock();
90 m_semaphore.post();
91 }
92
101 bool pop(T& item, std::chrono::milliseconds timeout)
102 {
103 if (m_semaphore.sleep_for(timeout))
104 {
105 std::scoped_lock lock(m_mutex);
106 if (!m_queue->empty())
107 {
108 item = std::move(m_queue->front());
109 m_queue->pop();
110 return true;
111 }
112 }
113 return false;
114 }
115
121 virtual void wakeup()
122 {
123 m_semaphore.post();
124 }
125
129 bool empty() const
130 {
131 std::scoped_lock lock(m_mutex);
132 return m_queue->empty();
133 }
134
138 size_t size() const
139 {
140 std::scoped_lock lock(m_mutex);
141 return m_queue->size();
142 }
143
147 void clear()
148 {
149 std::scoped_lock lock(m_mutex);
150 m_queue = std::make_shared<std::queue<T>>();
151 }
152
162 bool each(const CallbackFunction& callbackFunction)
163 {
164 std::scoped_lock lock(m_mutex);
165
166 auto newQueue = std::make_shared<std::queue<T>>();
167
168 // Iterating through queue until callback returns false
169 bool rc = true;
170 while (m_queue->size())
171 {
172 T& item = m_queue->front();
173 m_queue->pop();
174 newQueue->push(item);
175 // When rc switches to false, don't execute callback
176 // for the remaining queue items
177 if (rc)
178 {
179 try
180 {
181 rc = callbackFunction(item);
182 }
183 catch (const Exception&)
184 {
185 rc = false;
186 }
187 }
188 }
189
190 m_queue = newQueue;
191
192 return rc;
193 }
194
195private:
199 mutable std::mutex m_mutex;
200
204 Semaphore m_semaphore;
205
209 std::shared_ptr<std::queue<T>> m_queue {std::make_shared<std::queue<T>>()};
210};
214} // namespace sptk
SPTK generic exception class.
Definition: Exception.h:56
Generic unnamed semaphore class.
Definition: Semaphore.h:52
bool sleep_for(std::chrono::milliseconds timeout)
Wait until semaphore value is greater than zero, or until timeout interval is passed.
void post()
Post the semaphore.
Definition: SynchronizedQueue.h:49
size_t size() const
Definition: SynchronizedQueue.h:138
void push(T &&data)
Definition: SynchronizedQueue.h:70
virtual void wakeup()
Definition: SynchronizedQueue.h:121
void clear()
Definition: SynchronizedQueue.h:147
bool each(const CallbackFunction &callbackFunction)
Definition: SynchronizedQueue.h:162
bool pop(T &item, std::chrono::milliseconds timeout)
Definition: SynchronizedQueue.h:101
bool empty() const
Definition: SynchronizedQueue.h:129
std::function< bool(T &item)> CallbackFunction
Definition: SynchronizedQueue.h:60
void push(const T &data)
Definition: SynchronizedQueue.h:85

Fri Oct 14 2022 09:58:32: SPTK 5.4.1