C++ 템플릿을 활용해 여러 데이터 타입으로 재사용이 가능하고 멀티 스레드환경에서도 안전하게 사용할 수 있는 링버퍼 클래스를 작성해보았다.
내부적으로 고정 크기의 std::vector 버퍼를 사용하며, 일반적인 vector가 앞의 요소를 제거하면 메모리 이동 비용이 발생하는 것과 달리 head, tail 포인터 순환 구조를 통해 메모리를 효율적으로 사용할 수 있다.
이미 STL에 존재하는 컨테이너인 std::deque(Not thread-safe)와 std::mutex를 함께 쓰는 방법도 있긴 하지만, 직접 구현하는 링버퍼 클래스에는 다음과 같은 장점이 존재한다.
1. 고정 크기 버퍼를 사용하므로 예측이 가능
std::deque의 경우 capacity를 초과할 때마다 메모리를 재할당하거나 재배치하는 과정이 일어나서 비용이 크고 예측이 어렵다. 반면 아래 구현한 링버퍼는 생성 시점에 고정 크기의 메모리를 한번만 할당한다.
2. thread-safe 기능 포함
클래스 내부에 std::mutex를 포함시켜 외부에서 별도로 락을 관리하지 않아도 된다.
3. overflow / underflow 직접 관리 가능
링버퍼가 꽉 찼을 때나 비었을 때 영원히 대기할지 아니면 실패할지 등의 동작을 직접 정의할 수 있다.
아래에서도 블락킹 방식의 push() / pop()과 논블락킹 방식의 try_push() / try_pop() 두 가지를 구현해 놓았다.
4. API 확장 가능
직접 구현한 클래스이므로 필요에 따라 얼마든지 확장 가능하다. 예를 들면, 링버퍼가 꽉 찼을 때 가장 오래된 데이터부터 덮어쓰는 방식의 force_push()라든가, 데이터를 꺼내지 않고 보기만 하는 front() 함수 등을 필요한 경우 쉽게 추가 구현할 수 있다.
Ringbuffer.h
템플릿 클래스여서 컴파일러가 인스턴스화할 때 구현부를 알아야 하므로 헤더에 다 구현한다.
#pragma once
#include <vector>
#include <mutex>
#include <condition_variable>
// 템플릿 클래스는 컴파일러가 인스턴스화할 때 구현부를 알아야 하므로 헤더에 구현을 포함한다.
template <typename T>
class RingBuffer {
public:
explicit RingBuffer(size_t capacity) : m_buffer(capacity), m_head(0), m_tail(0), m_full(false) {}
bool isEmpty() const {
std::lock_guard<std::mutex> lock(m_mutex);
return !m_full && (m_head == m_tail);
}
bool isFull() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_full;
}
size_t size() const {
std::lock_guard<std::mutex> lock(m_mutex);
size_t ret;
if (m_full) ret = m_buffer.size();
else if (m_head >= m_tail) ret = m_head - m_tail;
else ret = m_buffer.size() + m_head - m_tail;
return ret;
}
size_t capacity() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_buffer.size();
}
void clear() {
std::lock_guard<std::mutex> lock(m_mutex);
m_head = 0;
m_tail = 0;
m_full = false;
m_cv_notFull.notify_all();
}
// blocking 방식 push, 버퍼가 가득 차면 자리가 날 때까지 대기
void push(T item) {
// condition_variable과 함께 mutex를 쓸 때는 unique_lock을 써야 wait 내부에서 unlock/lock이 가능하다.
std::unique_lock<std::mutex> lock(m_mutex);
m_cv_notFull.wait(lock, [this]() { return !m_full; });
m_buffer[m_head] = std::move(item);
advanceHead();
m_cv_notEmpty.notify_one();
}
// blocking 방식 pop, 버퍼가 비었으면 데이터가 들어올 때까지 대기
void pop(T& out) {
std::unique_lock<std::mutex> lock(m_mutex);
m_cv_notEmpty.wait(lock, [this]() { return (m_head != m_tail) || m_full; });
out = std::move(m_buffer[m_tail]);
advanceTail();
m_cv_notFull.notify_one();
}
// non-blocking 방식 push, 데이터 삽입 성공 여부를 bool로 리턴
bool try_push(T item) {
std::lock_guard<std::mutex> lock(m_mutex);
bool ret = false;
if (!m_full) {
ret = true;
m_buffer[m_head] = std::move(item);
advanceHead();
m_cv_notEmpty.notify_one();
}
return ret;
}
// non-blocking 방식 pop, 데이터 추출 성공 여부를 bool로 리턴하고, 데이터는 out 참조 파라미터를 통해 전달
bool try_pop(T& out) {
std::lock_guard<std::mutex> lock(m_mutex);
bool ret = false;
if ((m_head != m_tail) || m_full) {
ret = true;
out = std::move(m_buffer[m_tail]);
advanceTail();
m_cv_notFull.notify_one();
}
return ret;
}
private:
std::vector<T> m_buffer;
size_t m_head, m_tail;
bool m_full;
// mutable 키워드를 사용하면 const로 선언한 멤버함수 안에서도 이 변수를 수정할 수 있다.
mutable std::mutex m_mutex;
std::condition_variable m_cv_notFull, m_cv_notEmpty;
void advanceHead() {
m_head = (m_head + 1) % m_buffer.size();
m_full = (m_head == m_tail);
}
void advanceTail() {
m_tail = (m_tail + 1) % m_buffer.size();
m_full = false;
}
};
API의 이름은 vector나 deque 같은 표준 컨테이너에 있는 함수 이름을 참고해서 지었다.
✓ Blocking vs Non-Blocking
Blocking 방식의 push() / pop()과 Non-Blocking 방식의 try_push() / try_pop()을 둘 다 구현해두었긴 한데, 실제로는 거의 Non-Blocking 방식의 try_* 계열 함수를 사용하게 될 것같다. 아무래도 자료구조 단인 링버퍼 내부에서 블락하는 것보다는 윗단에서 리턴값을 보고 처리하도록 제어권을 넘기는 편이 낫기 때문이다.
✓ Non-Blocking API 사용 시 false 리턴 가능성
try_* 계열 함수 사용 시 false 리턴할 가능성은 얼마나 있을까? try_push() 함수의 경우는 개발 과정에서 충분히 테스트를 하고 링버퍼의 크기를 넉넉하게 설정해놨다면, 실제 운용 시 버퍼가 가득차서 false 리턴하는 일은 거의 발생하지 않을거다.
한편, try_pop() 함수의 경우는 사용자가 size()를 먼저 호출하고 데이터가 있을 때만 try_pop()을 호출하면 블락킹될 일이 없을것 같지만, 이건 단일 스레드 환경에서만 그렇다. 멀티 스레드 환경에서는 "데이터가 있는지 확인" 하는 것과 "데이터를 꺼내는" 행위가 하나의 lock 안에서 원자적으로 처리되는게 아닌 이상, 그 사이 찰나의 순간 다른 소비자 스레드가 먼저 데이터를 채가면 false 리턴하는 경우도 생길 수 있다.
사용 예시
다음은 위에서 정의한 클래스를 활용해서 struct Data 타입을 5개 담을 수 있는 크기의 링버퍼를 만들고, 1개의 생산자 스레드와 2개의 소비자 스레드에서 데이터를 주고받는 예시이다.
#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <string>
#include "RingBuffer.h"
struct Data {
int id;
float value;
};
// Data 구조체 타입으로 크기 5의 링버퍼 생성
RingBuffer<Data> rb(5);
std::mutex cout_mutex; // 출력 충돌 방지용 뮤텍스
void producer(int id) {
for (int i = 0; i < 10; ++i) {
Data data{ id * 100 + i, i * 0.1f + 1.0f };
rb.push(data);
{
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "[Producer] Sent { id: "
+ std::to_string(data.id) + ", value: "
+ std::to_string(data.value) + " }" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(150));
Data data;
rb.pop(data);
{
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << " [Consumer " + std::to_string(id) + "] Got { id: "
+ std::to_string(data.id) + ", value: "
+ std::to_string(data.value) + " }" << std::endl;
}
}
}
int main() {
std::thread p(producer, 1);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p.join();
c1.join();
c2.join();
return 0;
}

'프로그래밍 > C++' 카테고리의 다른 글
| C++ ] 함수 템플릿, 클래스 템플릿 (0) | 2025.12.30 |
|---|---|
| C++ ] 싱글톤(Singleton) 클래스 (0) | 2025.12.15 |
| C++ ] constexpr (1) | 2025.11.30 |
| C++] fstream의 open mode (0) | 2025.07.24 |
| C++] Intel hex to bin 변환 프로그램 (TI C2000 시리즈) (0) | 2025.05.24 |