기본적으로
자바
코드를 기반으로 설명됩니다.
공유 자원
이라고 한다.공유 자원
에 여러 프로세스 or 스레드가 접근하는 경우, 실행에 문제가 될 수 있습니다.임계 구역(critical section)
이라고 한다.임계 구역
의 코드를 실행해 문제가 발생하는 것을 레이스 컨디션
이라고 한다.+1
증가 연산을 하는 작업 (쓰레드) 2개가 있습니다.실제로는, 엄청 미세한 타이밍으로 엄청 빠르게 처리하기 때문에 사진과 완전 동일하게 처리한다고 할 수는 없다. 설명을 위한 예시
val = 2
라고 생각하겠지만, 1
이 나오게 된다.val
을 메모리에서 읽을때, 0
으로 읽고 증가하여 저장하였기 때문이다.공유자원
인 val에 접근하는데, 읽고 증감하여 저장하는 임계 구역
의 코드가 문제되어 레이스 컨디션
이 발생 하였다.더 극단적으로 동기화 문제를 보여주기 위해서, 하나의 스레드는 공유 데이터
static int count
를 10,000 만큼 증가 시키도록 구성하였다.
Counter
public class Counter implements Runnable {
@Override
public void run() {
for(int i=0; i<100_000; i++) {
Main.count = Main.count + 1;
}
}
}
Main
public class Main {
public static int count = 0;
public static final int THREAD_COUNT = 2;
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for(int i=0; i<THREAD_COUNT; i++) {
executor.submit(counter);
}
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10);
}
System.out.println("최종 val 값: " + count);
}
}
결과
최종 val 값: 133684 //이 값은 랜덤임
레이스 컨디션
문제는 멀티쓰레드 환경의 구조를 가지고 있는 여러 개발 환경에서 문제가 된다.임계 구역
을 정의하고 해당 구역은 순차적으로
실행 되게 해야 한다.동기화(synchronization)
을 진행해줘야 한다.동기화
를 하는 여러가지 기법을 소개한다.상호 배제
를 보장하는 동기화 도구를 말합니다.
- 임계 영역에 접근하고자 한다면 반드시 락(Lock)을 획득(acquire) 해야 한다
- 임계 구역에서의 작업이 끝났다면 락을 해제(release) 해야 한다.
acquire()
함수는 락 획득을 위한 함수, release()
는 락 반납을 위한 함수이다.acquire()
을 시도하는데, 이미 다른 쓰레드에서 공유 자원의 락을 가지고 있다면, 락 획득을 할때 까지 대기 한다.당연히, 무한 대기를 하게 된다면 의미 없이 쓰레드가 계속 자원을 소모하게 되어 낭비가 된다. 보통 시간 혹은 횟수를 제한 두어 timeout 등을 발생시킨다.
다양한 프로그래밍 언어에서는 acquire()
와 release()
를 제공하고 있어, 직접 구현할 필요가 없다.
Java 에서는, ReentrantLock
을 통해 뮤텍스 락을 지원한다.
다음은 Java의 ReentrantLock
을 활용하여 뮤텍스 락을 구현한 모습이다.
MutexCounter
public class MutexCounter implements Runnable{
private final ReentrantLock lock;
public MutexCounter(ReentrantLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 100_000; i++) {
lock.lock();
try {
Main.count = Main.count + 1;
} finally {
lock.unlock();
}
}
}
}
Main
public class Main {
public static int count = 0;
public static final int THREAD_COUNT = 2;
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
MutexCounter counter = new MutexCounter(lock);
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for(int i=0; i<THREAD_COUNT; i++) {
executor.submit(counter);
}
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10);
}
System.out.println("최종 val 값: " + count);
}
}
결과
최종 val 값: 200000
wait()
, signal()
) 을 사용한다.Semaphore
를 직접적으로 지원해주고, wait()
, signal()
과 동일한 기능을 하는 acquire()
, release()
를 제공한다.Main
public class Main {
public static int count = 0;
public static final int THREAD_COUNT = 2;
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
SemaphoreCounter counter = new SemaphoreCounter(semaphore);
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for(int i=0; i<THREAD_COUNT; i++) {
executor.submit(counter);
}
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10);
}
System.out.println("최종 val 값: " + count);
}
}
SemaphoreCounter
public class SemaphoreCounter implements Runnable {
private final Semaphore semaphore;
public SemaphoreCounter(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
for (int i = 0; i < 100_000; i++) {
try {
semaphore.acquire();
Main.count = Main.count + 1;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}
}
}
조건 변수
를 이해해야 한다.조건 변수
란, 실행 순서를 제어 하기 위한 동기화 도구이다.조건 변수
는 지금 살펴보고 있는 동기화 기법이 아닌, 스레드 동기화
를 위한 고급 기능이다.세마포
방식과 뮤텍스
방식은 락이 획득 가능한가 만 판단하여 처리를 하였다면, 조건 변수는 signal()
, wait()
, notify()
등을 통해 특정 조건을 통해 대기/실행 상태를 변경해주는 고급 동기화 도구
다
wait()
: 호출한 프로세스 및 스레드의 상태를 대기 상태로 전환하는 함수signal()
: wait()로 일시 중지된 프로세스 및 스레드의 실행을 재개하는 함수생산자 소비자
패턴이다.
대표적인 예가 생산자-소비자 패턴이다.
조건 변수
를 활용하여 실행 순서를 제어하면 정확히 필요한 순간에만 쓰레드를 깨워 효율적으로 동기화가 가능하다.Condition
을 통해 조건 변수를 사용할 수 있다.ConditionExample
public class ConditionExample {
private final Queue<Integer> buffer = new LinkedList<>();
private final int CAPACITY = 2;
private final int WORK_COUNT = 5;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
// 로그 출력 메서드
private void log(String msg) {
String time = new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
Thread t = Thread.currentThread();
String threadName = t.getName();
Thread.State state = t.getState();
System.out.println("[" + time + "][" + threadName + "][" + state + "] " + msg);
}
// 생산자
class Producer extends Thread {
@Override
public void run() {
for (int i = 1; i <= WORK_COUNT; i++) {
lock.lock();
try {
while (buffer.size() == CAPACITY) {
log("버퍼가 가득 참, Producer 대기상태 진입");
notFull.await();
}
buffer.offer(i);
log("생산: " + i + " (버퍼: " + buffer.size() + ")");
notEmpty.signal();
} catch (InterruptedException e) {
log("생산자 인터럽트 발생");
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
System.out.println("=====Producer 작업 완료=====");
}
}
// 소비자
class Consumer extends Thread {
@Override
public void run() {
for (int i = 1; i <= WORK_COUNT; i++) {
lock.lock();
try {
while (buffer.isEmpty()) {
log("버퍼가 빔, Consumer 대기상태 진입");
notEmpty.await();
}
int value = buffer.poll();
log("소비: " + value + " (버퍼: " + buffer.size() + ")");
notFull.signal();
} catch (InterruptedException e) {
log("소비자 인터럽트 발생");
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
System.out.println("=====Consumer 작업 완료=====");
}
}
public void runExample() throws InterruptedException {
Producer producer = new Producer();
Consumer consumer = new Consumer();
producer.setName("Producer");
consumer.setName("Consumer");
producer.start();
consumer.start();
producer.join();
consumer.join();
log("종료");
}
}
Main
public static void main(String[] args) throws InterruptedException {
ConditionExample example = new ConditionExample();
example.runExample();
}
실행 결과
[21:42:03.267][Producer][RUNNABLE] 생산: 1 (버퍼: 1)
[21:42:03.276][Producer][RUNNABLE] 생산: 2 (버퍼: 2)
[21:42:03.276][Producer][RUNNABLE] 버퍼가 가득 참, Producer 대기상태 진입
[21:42:03.277][Consumer][RUNNABLE] 소비: 1 (버퍼: 1)
[21:42:03.277][Consumer][RUNNABLE] 소비: 2 (버퍼: 0)
[21:42:03.277][Consumer][RUNNABLE] 버퍼가 빔, Consumer 대기상태 진입
[21:42:03.277][Producer][RUNNABLE] 생산: 3 (버퍼: 1)
[21:42:03.278][Producer][RUNNABLE] 생산: 4 (버퍼: 2)
[21:42:03.278][Producer][RUNNABLE] 버퍼가 가득 참, Producer 대기상태 진입
[21:42:03.278][Consumer][RUNNABLE] 소비: 3 (버퍼: 1)
[21:42:03.278][Consumer][RUNNABLE] 소비: 4 (버퍼: 0)
[21:42:03.278][Consumer][RUNNABLE] 버퍼가 빔, Consumer 대기상태 진입
[21:42:03.279][Producer][RUNNABLE] 생산: 5 (버퍼: 1)
=====Producer 작업 완료=====
[21:42:03.279][Consumer][RUNNABLE] 소비: 5 (버퍼: 0)
=====Consumer 작업 완료=====
[21:42:03.279][main][RUNNABLE] 종료
TMI) Condition 은 Thread safe한 자료구조 등을 만들때 사용된다. 실제로
ArrayBlockingQueue
에는 Condtion을 활용하여 멀티 스레드에서 안정적인 동작을 하도록 만들어 두었다.
조건 변수
를 활용하여 동기화를 시키는 기법이 바로 모니터 기법
이다모니터
내로 진입하고, 이 모니터 내에서 실행 중인 프로세스 및 쓰레드는 큐에 쌓여 하나씩 실행된다.
조건 변수
와 함께 활용되면 실행 순서 제어를 위한 동기화도 구현이 가능하다synchronized
키워드와 synchronized 블록
으로 이를 구현할 수 있다.생산자 소비자
문제를 synchronized
로 푼다면 다음과 같다.
MonitorExample
public class MonitorExample {
private final Queue<Integer> buffer = new LinkedList<>();
private final int CAPACITY = 2;
private final int WORK_COUNT = 5;
// 로그 출력 메서드
private void log(String msg) {
String time = new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
Thread t = Thread.currentThread();
String threadName = t.getName();
Thread.State state = t.getState();
System.out.println("[" + time + "][" + threadName + "][" + state + "] " + msg);
}
// 생산자
class Producer extends Thread {
@Override
public void run() {
for (int i = 1; i <= WORK_COUNT; i++) {
synchronized (buffer) {
while (buffer.size() == CAPACITY) {
log("버퍼가 가득 참, Producer 대기상태 진입");
try {
buffer.wait();
} catch (InterruptedException e) {
log("생산자 인터럽트 발생");
throw new RuntimeException(e);
}
}
buffer.offer(i);
log("생산: " + i + " (버퍼: " + buffer.size() + ")");
buffer.notifyAll(); // 상태 변화 알림
}
}
System.out.println("=====Producer 작업 완료=====");
}
}
// 소비자
class Consumer extends Thread {
@Override
public void run() {
for (int i = 1; i <= WORK_COUNT; i++) {
synchronized (buffer) {
while (buffer.isEmpty()) {
log("버퍼가 빔, Consumer 대기상태 진입");
try {
buffer.wait();
} catch (InterruptedException e) {
log("소비자 인터럽트 발생");
throw new RuntimeException(e);
}
}
int value = buffer.poll();
log("소비: " + value + " (버퍼: " + buffer.size() + ")");
buffer.notifyAll(); // 상태 변화 알림
}
}
System.out.println("=====Consumer 작업 완료=====");
}
}
public void runExample() throws InterruptedException {
Producer producer = new Producer();
Consumer consumer = new Consumer();
producer.setName("Producer");
consumer.setName("Consumer");
producer.start();
consumer.start();
producer.join();
consumer.join();
log("종료");
}
}
ReentrantLock
으로 임계 영역을 보호하고, Conditon
을 활용해 조건 변수를 구현하였다.synchronized
블록을 사용해 임계 영역 보호와 조건 변수를 동시에 활용하였다.구분 | 모니터(Monitor) | 세마포어(Semaphore) | 뮤텍스(Mutex) |
---|---|---|---|
목적 | 상호배제 + 조건 동기화(순서 제어) | 동시 접근 허용(카운터 기반) | 상호배제(1명만 진입) |
구성 | 락 + 조건변수 | 카운터 | 락 |
동작 | 1명만 진입, 조건 대기/신호 가능 | 여러 명 동시 진입(카운터만큼) | 1명만 진입 |
자바 예시 | synchronized/wait/notify | java.util.concurrent.Semaphore | ReentrantLock 등 |
레이스 컨디션
이 발생하지 않는 것을 의미한다.ArrayList
, Integer
, Long
, HashMap
등 자주 사용되는 대부분의 자료 구조등의 class는 thread-safe 하지 않다.Vector
, Collections.synchronizedList()
, ConcurrentHashMap
, AtomicInteger
, AtomicLong
등이 있다.TMI)
AtomicInteger
와AtomicLong
등은 CAS(Compare-And-Swap, 비교 후 교환) 기반의 원자적 연산을 통해 thread-safe를 보장한다.
관련된 내용은 (Java) 멀티 스레드 동기화와 원자적 연산 (CAS)에서 더 자세히 확인할 수 있다.
또한, CAS는 “낙관적 락(Optimistic Lock)” 방식이기 때문에, 이번에 다룬 락 기법(뮤텍스, 모니터 등)과는 동작 방식이 조금 다르다.