C++ ] std::condition_variable 사용법
std::condition_variable은 C++에서 스레드 간의 동기화를 위해 사용되는 매커니즘 중 하나로, 한 스레드가 특정 조건이 충족될 때까지 대기하도록 하고, 다른 스레드가 그 조건을 충족시켰을 때 대기 중인 스레드를 깨우는 방식으로 사용한다.
std::condition_variable은 주로 std::mutex와 함께 사용되며, 뮤텍스의 소유권을 관리하기 위해 std::unique_lock<std::mutex> 클래스와 결합하여 사용된다.
std::condition_variable 사용 패턴
- 스레드는 std::unique_lock<std::mutex>를 사용하여 뮤텍스를 잠근다.
- 스레드는 std::condition_variable의 대기 함수(wait, wait_for, wait_until)를 호출하여 특정 조건이 충족될 때까지 대기한다. 이 과정에서 대기 함수는 자동으로 뮤텍스를 해제하고, 조건이 충족되면 다시 뮤텍스를 잠근다.
- 다른 스레드가 조건을 충족시키고 notify_one 또는 notify_all 메서드를 호출하여 대기 중인 스레드를 깨운다.
- 깨어난 스레드는 필요한 작업을 수행한 후 뮤텍스를 해제한다.
이러한 방식으로 std::condition_variable은 스레드 간의 효율적인 통신과 동기화를 가능하게 하며, 멀티스레딩 프로그램에서 경쟁 조건(Race Condition) 없이 안전하게 임계영역(Critical Section)에 있는 공유 자원(Shared Resource)에 접근하도록 한다.
헤더 및 주요 함수
condition_variable을 사용하기 위해 다음의 헤더를 포함한다.
#include <condition_variable>
wait(unique_lock<mutex>& lock)
넘겨받은 unique_lock을 해제하고 스레드를 바로 대기 상태로 전환한다. 그리고 조건 변수에 신호가 도착하면, unique_lock을 다시 잠그고 함수를 반환한다.
아래의 조건 검사를 두번째 인수로 받는 함수와 달리 무조건 대기 상태로 들어가고, 조건 변수에서 신호를 받았을 때 바로 대기 상태에서 벗어나게 된다.
- lock: std::unique_lock<std::mutex> 타입의 락으로 함수가 호출될 때 잠겨 있어야 하며, 대기 상태로 들어가기 전에 자동으로 해제된다.
예시.
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck);
wait(unique_lock<mutex>& lock, Predicate pred)
조건 함수 pred가 처음부터 true라면 대기 상태에 들어가지 않고 바로 반환되며, false라면 unique_lock을 해제하고 스레드를 대기 상태로 만든다. 조건 변수에 신호가 오고 pred 함수가 true를 반환하면 unique_lock을 다시 잠그고 함수가 반환된다.
이 버전의 wait 함수는 위의 wait 함수와 달리 pred 조건이 true이면 바로 리턴되며, 조건 변수에 신호가 와서 대기 상태에서 깨어났을 때도 pred 함수가 true를 반환해야만 대기 상태에서 벗어나게 된다. 한편, 이 함수가 반복문 안에 있을 경우, pred 조건이 true를 만족하면 대기 상태에 들어가지 않고 계속 실행될 수도 있다.
- 파라미터: lock: std::unique_lock<std::mutex> 타입의 락.
- pred: bool을 반환하는 조건 함수. 이 조건이 true를 반환할 때까지 스레드는 대기 상태를 유지한다.
예시.
bool ready = false;
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, []{ return ready; });
wait_for(unique_lock<mutex>& lock, const chrono::duration& rel_time, Predicate pred = Predicate())
지정된 시간 rel_time 동안 또는 조건 pred가 true가 될 때까지 대기하고, 시간이 만료되거나 조건이 충족되면 대기 상태를 벗어난다.
- lock: std::unique_lock<std::mutex> 타입의 락
- rel_time: 대기할 최대 시간
- pred: (선택적) bool을 반환하는 조건 함수
예시.
std::unique_lock<std::mutex> lck(mtx);
cv.wait_for(lck, std::chrono::seconds(1), []{ return ready; });
notify_one()
대기 중인 스레드 중 하나에게 신호를 보내 깨운다.
조건변수 사용시 wait 함수와 함께 검사 조건(Predicate)을 인자로 받은 경우, 대기 상태에서 깨어난 스레드는 그 조건을 재평가 하여 대기 상태를 벗어날지 여부를 결정한다.
notify_all()
대기 중인 모든 스레드에게 신호를 보내 깨운다.
조건변수 사용시 wait 함수와 함께 검사 조건(Predicate)을 인자로 받은 경우, 대기 상태에서 깨어난 스레드는 그 조건을 재평가 하여 대기 상태를 벗어날지 여부를 결정한다.
std::condition_variable 사용 예시
다음은 이해를 돕기 위해 예시로 만들어보았다.
- 사용자 입력을 통해 동적으로 생산자 스레드를 생성하고 생산자 스레드는 사용자 입력에 따른 데이터를 반복 생성하여 공유 큐에 삽입 후 소비자 스레드를 깨운다.
- 소비자 스레드는 해당 큐에서 데이터를 꺼내 소비하고 다시 신호를 받고 조건 검사를 만족할 때까지 대기한다.
- 사용자가 'q'를 입력하는 경우 모든 스레드의 작업을 종료하고 프로그램을 종료한다.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace std;
mutex mtx;
condition_variable cv;
queue<int> dataQueue;
bool finished = false; // 작업 완료 플래그
// 수정된 생산자 스레드 함수
void producer(int id) {
for (int i = 1; i <= 5; ++i) {
this_thread::sleep_for(chrono::milliseconds(100)); // 생산 속도 조절을 위한 딜레이
{
lock_guard<mutex> lck(mtx);
int data = i * 100 + id;
dataQueue.push(data);
cout << "Producer " << id << " added data: " << data << endl;
}
cv.notify_one(); // 하나의 대기 중인 소비자 스레드를 깨움
}
}
// 소비자 스레드 함수
void consumer() {
while (true) {
unique_lock<mutex> lck(mtx);
cv.wait(lck, [] { return !dataQueue.empty() || finished; });
if (!dataQueue.empty()) {
int data = dataQueue.front();
dataQueue.pop();
cout << "Consumer consumed data: " << data << endl;
} else if (finished) {
break; // 모든 작업이 완료되었고 큐가 비어있다면 루프 종료
}
}
}
int main() {
vector<thread> threads;
thread consumerThread(consumer);
while (true) {
cout << "Enter ID (or 'q' to quit): ";
string input;
cin >> input;
if (input == "q") {
break; // 입력이 'q'면 루프 종료
}
int id = stoi(input);
// 사용자가 입력한 id에 대해 생산자 스레드 생성
threads.emplace_back(producer, id);
}
// 프로그램 종료 준비
finished = true;
cv.notify_all();
// 모든 생산자 스레드가 종료될 때까지 기다림
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
// 소비자 스레드가 종료될 때까지 기다림
if (consumerThread.joinable()) {
consumerThread.join();
}
return 0;
}