본문 바로가기
프로그래밍/C++

C++ ] std::condition_variable 사용법

by eteo 2024. 5. 12.

 

 

 

std::condition_variable은 C++에서 스레드 간의 동기화를 위해 사용되는 매커니즘 중 하나로, 한 스레드가 특정 조건이 충족될 때까지 대기하도록 하고, 다른 스레드가 그 조건을 충족시켰을 때 대기 중인 스레드를 깨우는 방식으로 사용한다.

 

std::condition_variable은 주로 std::mutex와 함께 사용되며, 뮤텍스의 소유권을 관리하기 위해 std::unique_lock<std::mutex> 클래스와 결합하여 사용된다.

 

 


std::condition_variable 사용 패턴

  1. 스레드는 std::unique_lock<std::mutex>를 사용하여 뮤텍스를 잠근다.
  2. 스레드는 std::condition_variable의 대기 함수(wait, wait_for, wait_until)를 호출하여 특정 조건이 충족될 때까지 대기한다. 이 과정에서 대기 함수는 자동으로 뮤텍스를 해제하고, 조건이 충족되면 다시 뮤텍스를 잠근다.
  3. 다른 스레드가 조건을 충족시키고 notify_one 또는 notify_all 메서드를 호출하여 대기 중인 스레드를 깨운다.
  4. 깨어난 스레드는 필요한 작업을 수행한 후 뮤텍스를 해제한다. 

 

이러한 방식으로 std::condition_variable은 스레드 간의 효율적인 통신과 동기화를 가능하게 하며, 멀티스레딩 프로그램에서 경쟁 조건(Race Condition) 없이 안전하게 임계영역(Critical Section)에 있는 공유 자원(Shared Resource)에 접근하도록 한다.

 

 

 


헤더 및 주요 함수

 

condition_variable을 사용하기 위해 다음의 헤더를 포함한다.

 

#include <condition_variable>

 

 

 

 


wait(unique_lock<mutex>& lock)

넘겨받은 unique_lock을 해제하고 스레드를 바로 대기 상태로 전환한다. 그리고 조건 변수에 신호가 도착하면, unique_lock을 다시 잠그고 함수를 반환한다.

아래의 조건 검사를 두번째 인수로 받는 함수와 달리 무조건 대기 상태로 들어가고, 조건 변수에서 신호를 받았을 때 바로 대기 상태에서 벗어나게 된다.

  • lock: std::unique_lock<std::mutex> 타입의 락으로 함수가 호출될 때 잠겨 있어야 하며, 대기 상태로 들어가기 전에 자동으로 해제된다.

 

예시.

std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck);

 

 

 

 


wait(unique_lock<mutex>& lock, Predicate pred) 

조건 함수 pred가 처음부터 true라면 대기 상태에 들어가지 않고 바로 반환되며, false라면 unique_lock을 해제하고 스레드를 대기 상태로 만든다. 조건 변수에 신호가 오고 pred 함수가 true를 반환하면 unique_lock을 다시 잠그고 함수가 반환된다.

이 버전의 wait 함수는 위의 wait 함수와 달리 pred 조건이 true이면 바로 리턴되며, 조건 변수에 신호가 와서 대기 상태에서 깨어났을 때도 pred 함수가 true를 반환해야만 대기 상태에서 벗어나게 된다. 한편, 이 함수가 반복문 안에 있을 경우, pred 조건이 true를 만족하면 대기 상태에 들어가지 않고 계속 실행될 수도 있다.

 

  • 파라미터: lock: std::unique_lock<std::mutex> 타입의 락. 
  • pred: bool을 반환하는 조건 함수. 이 조건이 true를 반환할 때까지 스레드는 대기 상태를 유지한다.

 

예시.

bool ready = false;
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, []{ return ready; });

 

 

 

 


wait_for(unique_lock<mutex>& lock, const chrono::duration& rel_time, Predicate pred = Predicate())

지정된 시간 rel_time 동안 또는 조건 pred가 true가 될 때까지 대기하고, 시간이 만료되거나 조건이 충족되면 대기 상태를 벗어난다.

  • lock: std::unique_lock<std::mutex> 타입의 락
  • rel_time: 대기할 최대 시간
  • pred: (선택적) bool을 반환하는 조건 함수

 

예시.

std::unique_lock<std::mutex> lck(mtx);
cv.wait_for(lck, std::chrono::seconds(1), []{ return ready; });

 

 


notify_one() 

대기 중인 스레드 중 하나에게 신호를 보내 깨운다.

조건변수 사용시 wait 함수와 함께 검사 조건(Predicate)을 인자로 받은 경우, 대기 상태에서 깨어난 스레드는 그 조건을 재평가 하여 대기 상태를 벗어날지 여부를 결정한다.

 

 


notify_all()

대기 중인 모든 스레드에게 신호를 보내 깨운다.

조건변수 사용시 wait 함수와 함께 검사 조건(Predicate)을 인자로 받은 경우, 대기 상태에서 깨어난 스레드는 그 조건을 재평가 하여 대기 상태를 벗어날지 여부를 결정한다.

 

 

 

 


std::condition_variable 사용 예시

 

다음은 이해를 돕기 위해 예시로 만들어보았다.

 

- 사용자 입력을 통해 동적으로 생산자 스레드를 생성하고 생산자 스레드는 사용자 입력에 따른 데이터를 반복 생성하여 공유 큐에 삽입 후 소비자 스레드를 깨운다. 

- 소비자 스레드는 해당 큐에서 데이터를 꺼내 소비하고 다시 신호를 받고 조건 검사를 만족할 때까지 대기한다.

- 사용자가 'q'를 입력하는 경우 모든 스레드의 작업을 종료하고 프로그램을 종료한다.

 

 

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace std;

mutex mtx;
condition_variable cv;
queue<int> dataQueue;
bool finished = false; // 작업 완료 플래그

// 수정된 생산자 스레드 함수
void producer(int id) {
    for (int i = 1; i <= 5; ++i) {
        this_thread::sleep_for(chrono::milliseconds(100)); // 생산 속도 조절을 위한 딜레이
        {
            lock_guard<mutex> lck(mtx);
            int data = i * 100 + id;
            dataQueue.push(data);
            cout << "Producer " << id << " added data: " << data << endl;
        }
        cv.notify_one(); // 하나의 대기 중인 소비자 스레드를 깨움
    }
}

// 소비자 스레드 함수
void consumer() {
    while (true) {
        unique_lock<mutex> lck(mtx);
        cv.wait(lck, [] { return !dataQueue.empty() || finished; });
        if (!dataQueue.empty()) {
            int data = dataQueue.front();
            dataQueue.pop();
            cout << "Consumer consumed data: " << data << endl;
        } else if (finished) {
            break; // 모든 작업이 완료되었고 큐가 비어있다면 루프 종료
        }
    }
}

int main() {
    vector<thread> threads;
    thread consumerThread(consumer);

    while (true) {
        cout << "Enter ID (or 'q' to quit): ";
        string input;
        cin >> input;
        
        if (input == "q") {
            break; // 입력이 'q'면 루프 종료
        }
        
        int id = stoi(input);
        // 사용자가 입력한 id에 대해 생산자 스레드 생성
        threads.emplace_back(producer, id);
    }

    // 프로그램 종료 준비
    finished = true;
    cv.notify_all();

    // 모든 생산자 스레드가 종료될 때까지 기다림
    for (auto& t : threads) {
        if (t.joinable()) {
            t.join();
        }
    }

    // 소비자 스레드가 종료될 때까지 기다림
    if (consumerThread.joinable()) {
        consumerThread.join();
    }

    return 0;
}