링 버퍼를 구현하기위해선 큐를 먼저 알아야 한다. 큐는 FIFO(First In First Out) 구조로 먼저들어온 데이터가 먼저 나가는 구조이다.
이와 비교되는 자료구조는 LIFO(Last In First Out) 구조인 스택이 있다. 스택은 나중에 들어온 데이터가 먼저 나오는 구조로 함수 호출을 생각하면 된다. 함수가 호출되면 함수의 매개변수, 호출이 끝난 뒤 돌아갈 반환 주소값, 함수에서 선언된 지역 변수 등이 스택에 넣어지고 함수에서 리턴 시 스택에서 반환 주소값을 POP하여 해당 함수가 호출되기 이전 상태로 되돌아 갈 수 있다.
다시 돌아와서 링버퍼란 고정 크기의 큐를 마치 처음과 끝이 연결된 것처럼 사용하는 구조를 말한다. 그리고 버퍼에 쓰고 읽을 위치를 알기 위한 head 와 tail 포인터가 존재한다.
링버퍼를 구현하려고 할 때 기본적으로 있어야할 API는 다음과 같다.
- isEmpty() : 버퍼가 비었는지 확인한다. head == tail 이면 비어있는 상태.
- enQueue() : 데이터를 버퍼에 입력한다. 그리고 head 포인터를 하나 증가한다.
- deQueue() : 버퍼로부터 데이터를 출력한다. 그리고 tail 포인터를 하나 증가한다.
- isAvailable() : 버퍼에 데이터가 몇개 있는지 확인한다. 없으면 0 리턴
다음은 사이즈가 8인 링버퍼이다.
1. 처음 링버퍼를 생성 후 초기화했을 때의 상태이다. head, tail 포인터가 모두 배열의 0번째 요소를 가리키고 있다. head == tail 이니 현재 버퍼가 비어있는 상태이다.
2. 하나의 데이터가 들어왔다. 버퍼의 head 포인터 위치에 H를 입력하고 head 포인터를 하나 증가한다.
3. 버퍼의 tail 포인터 위치에서 H를 빼서 출력하고 tail 포인터를 하나 증가한다. 다시 head == tail 이 되어 버퍼가 비어있는 상태가 되었다.
4. 버퍼에서 출력해야할 프로세서가 busy한 상태라서 데이터가 계속 입력되기만 하는 상황을 가정해보자. ELLO가 입력되고 head 포인터는 인덱스 5를 가르키고 있다.
5. 다시 busy하지 않은 상태로 돌아와 출력할 여유가 생겼다. ELLO를 출력하고 tail 포인터는 head 와 같은 위치를 가르켜 버퍼가 다시 비어서 출력할 게 없는 상태가 됐다.
6. 공백과 WORLD!\n 입력이 들어왔다. 현재 프로세서가 busy한 상태여서 출력은 되지 않고 있으며, head 포인터가 가르키는 자리에 데이터를 하나씩 입력하고 head 포인터를 그 앞으로 증가 시킨다.
그런데 마지막 개행문자까지 버퍼에 입력하고 head 포인터를 증가해야 할까? 지금은 버퍼가 데이터로 꽉찬 상황인데 이렇게하면 버퍼가 비어있는 상태랑 구분을 하지 못하게 된다. head == tail 이 되기 때문이다. 결국 버퍼가 비어있다고 생각해 WORLD!\n 은 출력되지 못할것이다.
7. 이를 해결하기 위해서는 full 과 empty 상태를 구분할 수 있어야한다. 해결법은 여러개 있을 수 있는데 대표적으로 2가지를 볼 수 있다.
1) 버퍼의 slot 하나를 쓰지 않고 남겨두는 방법
- head+1 == tail → full
- head == tail → empty
2) full 인 상태를 알기 위해 full flag를 두는 방법
- full flag == true → full
- (head == tail) && !full → empty
3) 오래된 데이터를 overwrite하는 방법
- head+1 == tail → full
- head == tail → empty
일단 1번의 경우 버퍼에 쓰는 작업도 한군데서, 그리고 버퍼에서 읽는 작업도 다른 한군데서 한다고 할 때, 버퍼에 쓸 때는 head 포인터만 건드리고 버퍼에서 읽을 때는 tail 포인터만 건드리니 lock을 사용하지 않고도 thread-safe하게 구현할 수 있다.
반면 2번 full flag 를 쓰는 경우 slot 하나를 낭비하지 않을 수 있지만 쓸 때와 읽을 때 둘 다 full flag를 조작해야하니 읽고 쓰는 것 둘 다 polling으로 하지 않는 이상 full flag가 critical section(임계구역)이 되어 mutex(mutual exclusion : 상호배제)를 구현해야 하는 요구사항이 생긴다.
그리고 3번은 full과 empty 상태를 구분하는 법은 1번과 같다. 다만 enqueue 시에 버퍼가 full상태면 head 포인터 위치에 데이터를 입력하고 head와 tail 포인터 둘 다 증가시킨다. 그러면 원래 tail 포인터가 가르켰던 제일 오래된 데이터는 더이상 읽을 수 없게 된다.
비선점형 OS에선 1번 방법을 쓰는게 비교적 간단하다. 아래에서도 1번 방법을 사용한다.
!까지만 입력하고 난 뒤에 head+1 == tail 인 버퍼가 꽉 찬 상태이므로 더 이상 입력하지 않는다. 버퍼에서 출력이 일어나고 tail 포인터가 앞으로 이동하면 그때서야 버퍼에 입력할 수 있다.
Reference : https://embeddedartistry.com/blog/2017/05/17/creating-a-circular-buffer-in-c-and-c/
소스코드
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdbool.h>
#define BUF_LEN 5
typedef struct {
unsigned int head;
unsigned int tail;
char data[BUF_LEN];
} t_ringBuf;
bool isRbAvailable(t_ringBuf* rb)
{
bool ret = false;
ret = (BUF_LEN + rb->head - rb->tail) % BUF_LEN;
return ret;
}
void rbEnQueue(t_ringBuf* rb, char c) {
unsigned int i = (unsigned int)(rb->head + 1) % BUF_LEN;
// 버퍼가 꽉찼는지 검사
if (i != rb->tail)
{
rb->data[rb->head] = c;
rb->head = i;
}
else
{
printf("버퍼가 꽉참\n");
}
}
char rbDeQueue(t_ringBuf* rb) {
char c = '\0';
// 버퍼가 비었는지 검사
if (rb->head == rb->tail)
{
printf("버퍼가 비어있음\n");
}
else
{
c = rb->data[rb->tail];
rb->tail = (unsigned int)(rb->tail + 1) % BUF_LEN;
}
return c;
}
void printRb(t_ringBuf* rb) {
int i = rb->tail;
if (rb->head == rb->tail)
{
printf("버퍼가 비어있음\n");
}
else
{
while (i != rb->head)
{
printf("%c ", rb->data[i]);
i = (i + 1) % BUF_LEN;
}
printf("\n");
}
}
int main()
{
t_ringBuf BUFF = { 0, 0, { 0 } };
t_ringBuf* rb_handle = &BUFF;
bool exit = false;
while (!exit) {
int menu;
char c;
printf("1. 삽입 , 2. 삭제, 3. 출력, 4. 종료\n");
scanf("%d", &menu);
rewind(stdin);
switch (menu)
{
case 1:
printf("문자 입력 : ");
scanf("%c", &c);
rbEnQueue(rb_handle, c);
rewind(stdin);
break;
case 2:
rbDeQueue(rb_handle);
break;
case 3:
printRb(rb_handle);
break;
case 4:
exit = true;
break;
}
}
return 0;
}
링버퍼의 크기가 5라면 hello 를 입력한다고 했을 때 slot 하나를 못쓰니 hell 밖에 입력이 안된다. deQueue로 h를 빼줘야만 o를 입력할 수 있다.
만약 3번의 방식으로 구현하려면 EnQueue 함수를 아래와 같이 수정해주면 된다. 그치만 보통은 1, 2번 방법으로 구현하고 버퍼가 꽉차는 경우가 생길 때는 버퍼사이즈를 늘려서 해결하는 게 좋을 것 같다.
void rbEnQueue(t_ringBuf* rb, char c) {
unsigned int i = (unsigned int)(rb->head + 1) % BUF_LEN;
// 버퍼가 꽉찬경우
if (i == rb->tail)
{
printf("버퍼가 꽉차서 오래된 데이터 overwrite함\n");
rb->tail = (unsigned int)(rb->tail + 1) % BUF_LEN;
}
rb->data[rb->head] = c;
rb->head = i;
}
그럼 이 링버퍼는 언제 사용되나? 주로 통신쪽에서 연속적인 데이터 스트림을 처리하는 데 유용하게 사용된다. 펌웨어 쪽에서는 직접 구현이 필수적이라고 할 수 있다.
링버퍼 사용시 얻을 수 있는 이점은 뭘까?
1. 데이터 수신과 처리의 분리
인터럽트 핸들러는 가능한 빨리 반환되어야 하는데 링버퍼를 사용하면 인터럽트 핸들러 안에서는 데이터를 버퍼에 저장하는 간단한 작업만 수행하고, 데이터 처리 작업은 나중에 메인 루프에서 수행할 수 있다.
2. 동기화 문제 최소화
링버퍼를 사용하면 생산자(Enqueu 작업이 일어나는 인터럽트 핸들러)와 소비자(Dequeu 작업이 일어나는 메인 루프)가 동시에 데이터를 안전하게 읽고 쓸 수 있다. 단일 생산자 단일 소비자 모델에서는 각각 head와 tail 포인터를 독립적으로 변경하기 때문에 복잡한 동기화 메커니즘 없이도 데이터 무결성을 유지할 수 있다.
3. 데이터 손실 방지
링버퍼는 버퍼가 가득 차면 더이상 데이터를 쓰지 않거나 덮어쓰기를 할 수 있으므로 데이터 손실을 방지하거나, 최소한 데이터 손실이 발생할 가능성을 미리 예측하여 버퍼 사이즈를 늘리는 결정을 할 수 있다.
4. 효율적인 메모리 사용
링버퍼는 정해진 크기의 메모리 블록을 순환하며 사용하므로 제한된 자원을 효율적으로 사용할 수 있다.
'임베디드 개발 > 펌웨어' 카테고리의 다른 글
CAN 통신 (Controller Area Network) (1) | 2023.01.24 |
---|---|
unsigned int 형 tick 변수가 오버플로우 나는 날 (0) | 2023.01.07 |
CAN 통신 ] PCAN 사용하기 (2) | 2022.12.22 |
정수 리터럴 뒤에 접미사 UL을 붙이는 이유 (0) | 2022.12.14 |
ELF 파일 포맷 (0) | 2022.12.02 |