본문 바로가기
프로그래밍/C++

C++ ] 템플릿을 활용한 thread-safe 링버퍼 클래스

by eteo 2026. 2. 5.
반응형

 

 

C++ 템플릿을 활용해 여러 데이터 타입으로 재사용이 가능하고 멀티 스레드환경에서도 안전하게 사용할 수 있는 링버퍼 클래스를 작성해보았다.

 

내부적으로 고정 크기의 std::vector 버퍼를 사용하며, 일반적인 vector가 앞의 요소를 제거하면 메모리 이동 비용이 발생하는 것과 달리 head, tail 포인터 순환 구조를 통해 메모리를 효율적으로 사용할 수 있다.

 

이미 STL에 존재하는 컨테이너인 std::deque(Not thread-safe)와 std::mutex를 함께 쓰는 방법도 있긴 하지만, 직접 구현하는 링버퍼 클래스에는 다음과 같은 장점이 존재한다.

 

 

1. 고정 크기 버퍼를 사용하므로 예측이 가능

std::deque의 경우 capacity를 초과할 때마다 메모리를 재할당하거나 재배치하는 과정이 일어나서 비용이 크고 예측이 어렵다. 반면 아래 구현한 링버퍼는 생성 시점에 고정 크기의 메모리를 한번만 할당한다.

 

2. thread-safe 기능 포함

클래스 내부에 std::mutex를 포함시켜 외부에서 별도로 락을 관리하지 않아도 된다.

 

3. overflow / underflow 직접 관리 가능

링버퍼가 꽉 찼을 때나 비었을 때 영원히 대기할지 아니면 실패할지 등의 동작을 직접 정의할 수 있다.

아래에서도 블락킹 방식의 push() / pop()과 논블락킹 방식의 try_push() / try_pop() 두 가지를 구현해 놓았다.

 

4. API 확장 가능

직접 구현한 클래스이므로 필요에 따라 얼마든지 확장 가능하다. 예를 들면, 링버퍼가 꽉 찼을 때 가장 오래된 데이터부터 덮어쓰는 방식의 force_push()라든가, 데이터를 꺼내지 않고 보기만 하는 front() 함수 등을 필요한 경우 쉽게 추가 구현할 수 있다.

 

 

 

 

 

 

 

Ringbuffer.h

템플릿 클래스여서 컴파일러가 인스턴스화할 때 구현부를 알아야 하므로 헤더에 다 구현한다.

 

#pragma once

#include <vector>
#include <mutex>
#include <condition_variable>

// 템플릿 클래스는 컴파일러가 인스턴스화할 때 구현부를 알아야 하므로 헤더에 구현을 포함한다.
template <typename T>
class RingBuffer {
public:
    explicit RingBuffer(size_t capacity) : m_buffer(capacity), m_head(0), m_tail(0), m_full(false) {}

    bool isEmpty() const {
        std::lock_guard<std::mutex> lock(m_mutex);
        return !m_full && (m_head == m_tail);
    }

    bool isFull() const {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_full;
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(m_mutex);
        size_t ret;
        if (m_full) ret = m_buffer.size();
        else if (m_head >= m_tail) ret = m_head - m_tail;
        else ret = m_buffer.size() + m_head - m_tail;
        return ret;
    }

    size_t capacity() const {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_buffer.size();
    }

    void clear() {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_head = 0;
        m_tail = 0;
        m_full = false;
        m_cv_notFull.notify_all();
    }

    // blocking 방식 push, 버퍼가 가득 차면 자리가 날 때까지 대기
    void push(T item) {
        // condition_variable과 함께 mutex를 쓸 때는 unique_lock을 써야 wait 내부에서 unlock/lock이 가능하다.
        std::unique_lock<std::mutex> lock(m_mutex);

        m_cv_notFull.wait(lock, [this]() { return !m_full; });

        m_buffer[m_head] = std::move(item);
        advanceHead();

        m_cv_notEmpty.notify_one();
    }

    // blocking 방식 pop, 버퍼가 비었으면 데이터가 들어올 때까지 대기
    void pop(T& out) {
        std::unique_lock<std::mutex> lock(m_mutex);

        m_cv_notEmpty.wait(lock, [this]() { return (m_head != m_tail) || m_full; });

        out = std::move(m_buffer[m_tail]);
        advanceTail();

        m_cv_notFull.notify_one();
    }

    // non-blocking 방식 push, 데이터 삽입 성공 여부를 bool로 리턴
    bool try_push(T item) {
        std::lock_guard<std::mutex> lock(m_mutex);
        bool ret = false;
        if (!m_full) {
            ret = true;
            m_buffer[m_head] = std::move(item);
            advanceHead();

            m_cv_notEmpty.notify_one();
        }
        return ret;
    }

    // non-blocking 방식 pop, 데이터 추출 성공 여부를 bool로 리턴하고, 데이터는 out 참조 파라미터를 통해 전달
    bool try_pop(T& out) {
        std::lock_guard<std::mutex> lock(m_mutex);
        bool ret = false;

        if ((m_head != m_tail) || m_full) {
            ret = true;
            out = std::move(m_buffer[m_tail]);
            advanceTail();

            m_cv_notFull.notify_one();
        }

        return ret;
    }

private:
    std::vector<T> m_buffer;
    size_t m_head, m_tail;
    bool m_full;
    // mutable 키워드를 사용하면 const로 선언한 멤버함수 안에서도 이 변수를 수정할 수 있다.
    mutable std::mutex m_mutex;
    std::condition_variable m_cv_notFull, m_cv_notEmpty;

    void advanceHead() {
        m_head = (m_head + 1) % m_buffer.size();
        m_full = (m_head == m_tail);
    }

    void advanceTail() {
        m_tail = (m_tail + 1) % m_buffer.size();
        m_full = false;
    }
};

 

 

API의 이름은 vector나 deque 같은 표준 컨테이너에 있는 함수 이름을 참고해서 지었다.

 

Blocking vs Non-Blocking

Blocking 방식의 push() / pop()과 Non-Blocking 방식의 try_push() / try_pop()을 둘 다 구현해두었긴 한데, 실제로는 거의 Non-Blocking 방식의 try_* 계열 함수를 사용하게 될 것같다. 아무래도 자료구조 단인 링버퍼 내부에서 블락하는 것보다는 윗단에서 리턴값을 보고 처리하도록 제어권을 넘기는 편이 낫기 때문이다.

 

Non-Blocking API 사용 시 false 리턴 가능성

try_* 계열 함수 사용 시 false 리턴할 가능성은 얼마나 있을까? try_push() 함수의 경우는 개발 과정에서 충분히 테스트를 하고 링버퍼의 크기를 넉넉하게 설정해놨다면, 실제 운용 시 버퍼가 가득차서 false 리턴하는 일은 거의 발생하지 않을거다.

 

한편, try_pop() 함수의 경우는 사용자가 size()를 먼저 호출하고 데이터가 있을 때만 try_pop()을 호출하면 블락킹될 일이 없을것 같지만, 이건 단일 스레드 환경에서만 그렇다. 멀티 스레드 환경에서는 "데이터가 있는지 확인" 하는 것과 "데이터를 꺼내는" 행위가 하나의 lock 안에서 원자적으로 처리되는게 아닌 이상, 그 사이 찰나의 순간 다른 소비자 스레드가 먼저 데이터를 채가면 false 리턴하는 경우도 생길 수 있다.

 

 

 

 

 

 

 

 

사용 예시

다음은 위에서 정의한 클래스를 활용해서 struct Data 타입을 5개 담을 수 있는 크기의 링버퍼를 만들고, 1개의 생산자 스레드와 2개의 소비자 스레드에서 데이터를 주고받는 예시이다.

 

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <string>
#include "RingBuffer.h"

struct Data {
    int id;
    float value;
};

// Data 구조체 타입으로 크기 5의 링버퍼 생성
RingBuffer<Data> rb(5);

std::mutex cout_mutex; // 출력 충돌 방지용 뮤텍스

void producer(int id) {
    for (int i = 0; i < 10; ++i) {
        Data data{ id * 100 + i, i * 0.1f + 1.0f };
        rb.push(data);

        {
            std::lock_guard<std::mutex> lock(cout_mutex);
            std::cout << "[Producer] Sent { id: "
                + std::to_string(data.id) + ", value: "
                + std::to_string(data.value) + " }" << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(int id) {
    for (int i = 0; i < 5; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
        Data data;
        rb.pop(data);

        {
            std::lock_guard<std::mutex> lock(cout_mutex);
            std::cout << "  [Consumer " + std::to_string(id) + "] Got { id: "
                + std::to_string(data.id) + ", value: "
                + std::to_string(data.value) + " }" << std::endl;
        }

    }
}

int main() {
    std::thread p(producer, 1);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);

    p.join();
    c1.join();
    c2.join();

    return 0;
}

 

 

반응형