링버퍼를 이해하기 위해선 먼저 큐에 대한 개념을 알고 있어야 한다. 큐는 FIFO(First In First Out) 구조로 먼저들어온 데이터가 먼저 나가는 단순한 구조이다.
이와 비교되는 자료구조는 LIFO(Last In First Out) 구조인 스택이 있다. 스택은 나중에 들어온 데이터가 먼저 나오는 구조로 함수 호출을 생각하면 된다. 함수가 호출되면 함수의 매개변수, 호출이 끝난 뒤 돌아갈 반환 주소값, 함수에서 선언된 지역 변수 등이 스택에 넣어지고 함수에서 리턴 시 스택에서 반환 주소값을 POP하여 해당 함수가 호출되기 이전 상태로 되돌아 갈 수 있다.
다시 돌아와서 링버퍼란 고정 크기의 큐를 마치 처음과 끝이 연결된 것처럼 사용하는 구조를 말한다. 그리고 버퍼에 쓰고 읽을 위치를 알기 위한 head 와 tail 포인터가 존재한다.
링버퍼를 구현하려고 할 때 기본적으로 있어야할 API는 다음과 같다.
- isEmpty() : 버퍼가 비었는지 확인한다. head == tail 이면 비어있는 상태.
- enQueue() : 데이터를 버퍼에 입력한다. 그리고 head 포인터를 하나 증가한다.
- deQueue() : 버퍼로부터 데이터를 출력한다. 그리고 tail 포인터를 하나 증가한다.
- isAvailable() : 버퍼에 데이터가 몇개 있는지 확인한다. 없으면 0 리턴
다음은 사이즈가 8인 링버퍼이다.
1. 처음 링버퍼를 생성 후 초기화했을 때의 상태이다. head, tail 포인터가 모두 배열의 0번째 요소를 가리키고 있다. head == tail 이니 현재 버퍼가 비어있는 상태이다.
2. 하나의 데이터가 들어왔다. 버퍼의 head 포인터 위치에 H를 입력하고 head 포인터를 하나 증가한다.
3. 버퍼의 tail 포인터 위치에서 H를 빼서 출력하고 tail 포인터를 하나 증가한다. 다시 head == tail 이 되어 버퍼가 비어있는 상태가 되었다.
4. 버퍼에서 출력해야할 프로세스가 busy한 상태라서 데이터가 계속 입력되기만 하는 상황을 가정해보자. ELLO가 입력되고 head 포인터는 인덱스 5를 가르키고 있다.
5. 다시 busy하지 않은 상태로 돌아와 출력할 여유가 생겼다. ELLO를 출력하고 tail 포인터는 head 와 같은 위치를 가르켜 버퍼가 다시 비어서 출력할 게 없는 상태가 됐다.
6. 공백과 WORLD!\n 입력이 들어왔다. 현재 프로세스가 busy한 상태여서 출력은 되지 않고 있으며, head 포인터가 가르키는 자리에 데이터를 하나씩 입력하고 head 포인터를 그 앞으로 증가 시킨다.
그런데 마지막 '\n'까지 버퍼에 입력하고 head 포인터를 증가해야 할까? 지금은 버퍼가 데이터로 꽉찬 상황인데 이렇게하면 버퍼가 비어있는 상태랑 구분을 하지 못하게 된다. head == tail 이 되기 때문이다. 결국 버퍼가 비어있다고 생각해 WORLD!\n 은 출력되지 못할것이다.
7. 이를 해결하기 위해서는 버퍼가 비어 있는 상태(empty)와 가득 찬 상태(full)를 명확히 구분할 수 있어야한다. 두 상태 모두에서 head == tail 조건이 성립할 수 있기 때문에, 이를 구분하지 못하면 논리적으로 문제가 발생한다. 이 상황을 해결하기 위한 방법은 여러가지가 있을 수 있는데 대표적으로 몇가지를 살펴 볼 수 있다.
1) 버퍼의 슬롯 하나를 의도적으로 비워두는 방식
- (head + 1) % len == tail → full
- head == tail → empty
가장 일반적인 방식으로 버퍼의 슬롯 하나를 항상 쓰지 않고 비워두어 full과 empty 상태를 구분한다. 즉, 버퍼의 물리적인 크기가 N이라면 실제로 저장할 수 있는 데이터는 최대 N-1개다. 데이터가 N-1개까지 꽉 찬 경우, tail 포인터가 전진해 공간이 확보되기 전까지는 새로운 입력을 받지 않는다.
이 방식의 장점은 단순한 조건식으로 full/empty 상태를 구분할 수 있다는 점이다. 특히 단일 생산자-단일 소비자 환경에서는 head와 tail이 각각 독립적으로만 접근되기 때문에, 별도의 락 없이도 thread-safe한 구현이 가능하다.
2) full 인 상태를 알기 위해 full flag를 두는 방법
- full == true → full
- (head == tail) && (full == false) → empty
head == tail 조건이 성립할 수 있는 상황에서 full 또는 empty 상태를 명확히 구분하기 위해 별도의 full 플래그를 둔다. 이 방식은 버퍼 공간을 100% 활용할 수 있다는 점에서는 효율적이다.
하지만 enqueue와 dequeue 모두에서 full 플래그의 상태를 변경해야 하므로, 생산자와 소비자가 동시에 접근하는 환경이라면 플래그 자체가 임계 구역(Critical Section)이 된다. 따라서 thread-safe한 구현을 위해 mutex 등 동기화 메커니즘을 도입해야 하는 요구사항이 생긴다.
3) 오래된 데이터를 overwrite하는 방법
- full 또는 empty 상태 판별은 1)번과 동일한 조건을 사용한다.
- 다만 full 상태에서 enqueue 시 head 위치에 새 데이터를 저장하고, head와 tail을 동시에 한 칸씩 이동시켜서 가장 오래된 데이터를 자동으로 폐기한다.
이 방식은 데이터 손실을 감수하더라도 입력을 중단시키지 않고 지속적으로 처리해야 하는 시스템에서 유용하다.
셋 중에선 1)번 방식이 가장 단순하면서도 안정적으로 사용할 수 있는 방식이다. 아래 코드에서도 1)번 방법을 사용한다.
다시 위의 예시 케이스로 돌아가면 '!' 문자까지 입력된 시점에서 (head + 1) % len == tail 조건이 성립하므로, 버퍼는 full 상태로 간주되어 더 이상 입력이 이루어지지 않는다. 즉, 그 다음에 입력되어야 할 '\n' 문자는 버퍼에 저장되지 않는다. 이후 버퍼에서 데이터가 출력되어 tail 포인터가 앞으로 이동하면 비어 있는 슬롯이 생기게 되고, 그때서야 새로운 데이터를 다시 입력할 수 있다.
Reference : https://embeddedartistry.com/blog/2017/05/17/creating-a-circular-buffer-in-c-and-c/
소스코드
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdbool.h>
#define BUF_LEN 5
typedef struct {
unsigned int head;
unsigned int tail;
char data[BUF_LEN];
} t_ringBuf;
bool isRbAvailable(t_ringBuf* rb)
{
bool ret = false;
ret = (BUF_LEN + rb->head - rb->tail) % BUF_LEN;
return ret;
}
void rbEnQueue(t_ringBuf* rb, char c) {
unsigned int i = (unsigned int)(rb->head + 1) % BUF_LEN;
// 버퍼가 꽉찼는지 검사
if (i != rb->tail)
{
rb->data[rb->head] = c;
rb->head = i;
}
else
{
printf("버퍼가 꽉참\n");
}
}
char rbDeQueue(t_ringBuf* rb) {
char c = '\0';
// 버퍼가 비었는지 검사
if (rb->head == rb->tail)
{
printf("버퍼가 비어있음\n");
}
else
{
c = rb->data[rb->tail];
rb->tail = (unsigned int)(rb->tail + 1) % BUF_LEN;
}
return c;
}
void printRb(t_ringBuf* rb) {
int i = rb->tail;
if (rb->head == rb->tail)
{
printf("버퍼가 비어있음\n");
}
else
{
while (i != rb->head)
{
printf("%c ", rb->data[i]);
i = (i + 1) % BUF_LEN;
}
printf("\n");
}
}
int main()
{
t_ringBuf BUFF = { 0, 0, { 0 } };
t_ringBuf* rb_handle = &BUFF;
bool exit = false;
while (!exit) {
int menu;
char c;
printf("1. 삽입 , 2. 삭제, 3. 출력, 4. 종료\n");
scanf("%d", &menu);
rewind(stdin);
switch (menu)
{
case 1:
printf("문자 입력 : ");
scanf("%c", &c);
rbEnQueue(rb_handle, c);
rewind(stdin);
break;
case 2:
rbDeQueue(rb_handle);
break;
case 3:
printRb(rb_handle);
break;
case 4:
exit = true;
break;
}
}
return 0;
}
링버퍼의 크기가 5인 경우, 'hello'를 입력하면 슬롯 하나를 비워두는 구조적 제약으로 인해 'hell'까지만 저장되고, 'o'는 입력되지 않는다. 이 상태에서 dequeue를 통해 'h'를 제거하면 비어 있는 슬롯이 생기고, 그제서야 'o'를 추가할 수 있게 된다.
만약 3)번 방식처럼 오래된 데이터를 덮어쓰는 방식으로 구현하려면, enqueue 함수를 다음과 같이 수정하면 된다. 다만 실제로는 대부분 1)번 또는 2)번 방식을 사용하며, 버퍼가 자주 가득 차는 경우에는 구조를 변경하기보다는 버퍼의 크기를 확장하는 방식으로 대응하는 것이 일반적이다.
void rbEnQueue(t_ringBuf* rb, char c) {
unsigned int i = (unsigned int)(rb->head + 1) % BUF_LEN;
// 버퍼가 꽉찬경우
if (i == rb->tail)
{
printf("버퍼가 꽉차서 오래된 데이터 overwrite함\n");
rb->tail = (unsigned int)(rb->tail + 1) % BUF_LEN;
}
rb->data[rb->head] = c;
rb->head = i;
}
🔍 이 링버퍼는 언제 사용될까?
주로 통신 시스템에서 연속적인 데이터 스트림을 처리할 때 유용하게 사용된다. 특히 펌웨어 같이 하드웨어 리소스가 제한된 상황에서는 링버퍼를 구현이 필수적이라고 할 수 있다.
🔍 링버퍼 사용시 얻을 수 있는 이점은?
1. 데이터 수신과 처리의 분리
인터럽트 핸들러는 가능한 한 빠르게 실행을 종료해야 한다. 링버퍼를 사용하면 인터럽트 핸들러에서는 단순히 데이터를 버퍼에 저장하는 작업만 수행하고, 데이터의 실제 처리는 메인 루프나 별도의 작업 스레드에서 처리할 수 있다.
2. 동기화 문제 최소화
링버퍼를 사용하면 생산자(Enqueue 작업이 일어나는 인터럽트 핸들러)와 소비자(Dequeue 작업이 일어나는 메인 루프)가 동시에 데이터를 안전하게 읽고 쓸 수 있다. 단일 생산자 단일 소비자 모델에서는 각각이 head와 tail 포인터만 독립적으로 조작하기 때문에 복잡한 동기화 메커니즘 없이도 데이터 무결성을 유지할 수 있다.
3. 데이터 손실 관리
링버퍼는 버퍼가 가득 찬 경우 더 이상 데이터를 저장하지 않거나, 오래된 데이터를 덮어쓰는 방식 중 하나를 선택할 수 있다. 어느 방식을 선택하든 데이터 손실 발생 가능성을 시스템이 예측할 수 있으므로, 요구사항에 따라 버퍼 크기를 조정하는 등 대응이 가능하다.
4. 효율적인 메모리 사용
링버퍼는 고정 크기의 메모리 블록을 순환 방식으로 사용하므로, 메모리 할당과 해제를 반복하지 않고 일정한 메모리 사용량을 유지할 수 있다. 이 점은 임베디드 시스템에서 매우 유리하다.
'임베디드 개발 > 펌웨어' 카테고리의 다른 글
CAN 통신 (Controller Area Network) (1) | 2023.01.24 |
---|---|
unsigned int 형 tick 변수가 오버플로우 나는 날 (0) | 2023.01.07 |
CAN 통신 ] PCAN 사용하기 (2) | 2022.12.22 |
정수 리터럴 뒤에 접미사 UL을 붙이는 이유 (0) | 2022.12.14 |
ELF 파일 포맷 (0) | 2022.12.02 |