Reactive-Programming

HotPublisher, ColdPublisher

caporatang 2024. 4. 17. 23:27
반응형

Hot, Cold Publisher

Cold Publisher ?

  • Reactive streams 예시 코드처럼 subscribe가 시작되는 순간 데이터를 생성해서 전송하는 케이스를 Cold Publisher라고 한다.
    Cold Publisher는 파일 읽기, 웹 API 요청 등 subscriber에 따라 독립적인 데이터 스트림을 제공한다.

Hot Publisher ?

  • subscriber가 없더라도 데이터를 생성하고 stream에 push하는 publisher이다. 트위터 게시글 읽기, 공유 리소스 변화 등 여러 subscriber에게 동일한 데이터를 전달한다.

ColdPublisher 구현

public class SimpleColdPublisher implements Flow.Publisher<Integer>{
    // Subscriber 하는 순간부터 데이터를 내려주는 퍼블리셔. -> 동일한 데이터가 보장되는 경우
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var iterator = Collections.synchronizedList(
    // 생성한 값. -> subscribe 하게 된다면 출력될 값 -> cold publisher
                IntStream.range(1, 10).boxed().collect(Collectors.toList()) 
        ).iterator();
        var subscription = new SimpleColdSubscription(iterator, subscriber);
        subscriber.onSubscribe(subscription);
    }

    @RequiredArgsConstructor
    public class SimpleColdSubscription implements Flow.Subscription {
        private final Iterator<Integer> iterator;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();

        @Override
        public void request(long n) {
       // 새로운 스레드에서 요청 받은 만큼 값을 넘기는 형태
            executorService.submit(() -> {
                for (int i = 0; i < n; i++) {
                    if(iterator.hasNext()) {
                        var number = iterator.next();
                        iterator.remove();
                        subscriber.onNext(number);
                    } else {
                        // iterator 값이 없어지면 shutdown
                        subscriber.onComplete();
                        executorService.shutdown();
                        break;
                    }

                }
            });
        }

        @Override
        public void cancel() {
            subscriber.onComplete();
        }
    }
}

Subscriber

@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {
// Publisher에에게 넘겨받을 subscription을 처리하는 SubsCriber 

    private Flow.Subscription subscription;
    private final String name;

    public SimpleNamedSubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
        log.info("onSubscribe");
    }

    @Override
    public void onNext(Object item) {
        log.info("name: {}, onNext: {}", name, item);
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("onError: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }

    public void cancel() {
        log.info("cancel");
        this.subscription.cancel();
    }
}

ColdPublisher 실행 클래스

public class SimpleColdPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        // create publisher
        var publisher = new SimpleColdPublisher();

        // create subscriber1
        var subscriber = new SimpleNamedSubscriber<Integer>("subscriber1");
        publisher.subscribe(subscriber);

        // 5초 뒤에
        Thread.sleep(5000);

        // create subscriber2
        var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
        publisher.subscribe(subscriber2);
    }
}
  • ColdPublisher는 subscribe한 순간에 데이터를 만들어서 내려주는 Publisher이기 때문에 실행 결과는
    OnNext가 실행되는 만큼 즉, 1 ~ 10 까지를 2번 출력하고 onComplete가 호출되고 shutdown된다.

HotPublisher

@Slf4j
public class SimpleHotPublisher implements Flow.Publisher<Integer>{
    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();
    private final Future<Void> task;
    private static List<Integer> numbers = new ArrayList<>();
    private static List<SimpleHotSubscription> subscriptions = new ArrayList<>();

    public SimpleHotPublisher() {
    // 누군가가 구독을 하지 않는다고 하더라도 값은 계속 생성되고 있어야한다.
    // ex) SNS를 생각해보면 사용자가 접속을 하지 않는다고 SNS에 새로운 글이나 뭔가 활동이 없는것이 아니다.
    // SimpleHotPublisher가 생성되는 순간 계속해서 값이 생성
        numbers.add(1);
        task = publisherExecutor.submit(() -> {
            for (int i = 2;!Thread.interrupted(); i++) {
                numbers.add(i);
                // 새로운 값이 생성되면 전달.
                subscriptions.forEach(SimpleHotSubscription::wakeUp);

                Thread.sleep(100);
            }
           return null;
        });
    }

    public void shutdown() {
        this.task.cancel(true);
        publisherExecutor.shutdown();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
// 새로운 값이 생겼다. -> subscriber 들에게 새로운 값이 생성 되었음을 전파해야함.
// 전파할때, subscriber들에게 요청한 값이 있는지 확인하고 줄수있는 상황이면 바로 줘버리고, 
// 만약 따로 request가 오지 않았다면 새로운 값이 추가 되었음을 알고만 있어.. 라는 설정이 필요함

        // subscriber에게 줄 subscription 생성
        var subscription = new SimpleHotSubscription(subscriber);
        subscriber.onSubscribe(subscription);
        subscriptions.add(subscription);
    }

    private static class SimpleHotSubscription implements Flow.Subscription {
// HotPublisher의 특징은 subscriber한 순간부터 데이터를 받아간다는 특징이 있음
// 쌓이고 있는 중간에 subscribe 한다면 쌓이는 데이터 중간부터 값을 받아감
// -> subscription이 최초에 생성되었을 때는, numbers(publisher가 계속 쌓고있는 데이터) 의 마지막 값을 가르켜야 한다.
// 각각의 subscriber는 개인마다 offset이 따로 존재 해야 한다.


// 현재 마지막으로 전달한 부분 -> 실제로 데이터를 준 마지막 데이터
        private int offset; 

// Hot Publisher가 item을 더 이상 생성하지 않는 상황에 100개의 요청이 왔다면 줄 수 없고, 이 정보를 미리 저장해둘 변수
// -> 요청이 이만큼 필요해!
        private int requiredOffset;

// 여러 subscriber에 대한 데이터를 저장해둘 객체
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor(); 

        public SimpleHotSubscription(Flow.Subscriber<? super Integer> subscriber) {
    // 생성자로 인덱스의 마지막 값을 집어넣음
    // 객체가 생성될 때 마지막 인덱스의 값을 가지고 생성 
    // 뭔가 작업이 생긴다면 그 이후부터 값을 가져간다.
            int lastElementIndex = numbers.size() -1;
            this.offset = lastElementIndex;
            this.requiredOffset = lastElementIndex;
            this.subscriber = subscriber;
        }
        @Override
        public void request(long n) {
            // 요청이 n개 만큼 들어오면 requiredOffset에 추가
            requiredOffset += n;

            // offset은 numbers의 사이즈보다 커지면 안된다.
            // 생성된 값보다 커지면 당연히 안된다..
            onNextWhilePossible();
        }

        @Override
        public void cancel() {
            // 더이상 받지 않겠다.
            this.subscriber.onComplete();
            if (subscriptions.contains(this)) {
                subscriptions.remove(this);
            }
            // 별도로 동작하던 스레드를 shutdown
            subscriptionExecutorService.shutdown(); 
        }

        public void wakeUp() {
            // 새로운 값이 생겼으니, subscriber에게 값을 전달 가능한 상황이면 전달해라.
            onNextWhilePossible();
        }

        private void onNextWhilePossible() {
            // 계속 가능한 시간동안 onNext를 하겠다는 함수
            subscriptionExecutorService.submit(() -> {
                while(offset < requiredOffset && offset < numbers.size()) {
                    var item = numbers.get(offset);
                    subscriber.onNext(item);
                    offset++;
                }
            });
        }

    }

}

HotPublisher main

public class SimpleHotPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        // prepare publisher
        var publisher = new SimpleHotPublisher();

        // prepare subscriber1
        var subscriber = new SimpleNamedSubscriber<>("subscriber1"); // subscribe 하는 동안 값이 출력 될거고,
        publisher.subscribe(subscriber);

        // cancel after 5s
        Thread.sleep(5000);
        // 종료 시키고 나면,
        subscriber.cancel(); 

        // prepare subscriber 2, 3
        // 처음부터 시작하고 조회하는게 아니라 1에서 하던 작업에 이어서 조회하고 출력하게 될것
        var subscriber2 = new SimpleNamedSubscriber<>("subscriber2");
        var subscriber3 = new SimpleNamedSubscriber<>("subscriber3");
        publisher.subscribe(subscriber2);
        publisher.subscribe(subscriber3);

        // cancel after 5s
        Thread.sleep(5000); // 5초가 딜레이 되면서 1초동안 데이터가 더 생성되었을거고 그 사이에는 공백이 있을것
        subscriber2.cancel();
        subscriber3.cancel();

        Thread.sleep(1000);

        var subscriber4 = new SimpleNamedSubscriber<>("subscriber4");
        publisher.subscribe(subscriber4);

        //cancel after 5s
        Thread.sleep(5000);
        subscriber4.cancel();

        // shutdown publisher
        publisher.shutdown();
    }
}

  • main 코드를 보면 subscriber2, 3이 실행한 후, 둘다 cancel onComplete가 호출된 후 1초 후에 subscriber4가 동작한다.
    이때 onNext값은 1초 동안 10개가 만들어져서 중간이 비어있게 되는데 이는, offSet기반이고 subscription에서 publisher가 요청을 하지 않더라도 데이터를 계속 생성하고 있기 떄문이다.
  • HotPublisher는 결국, 데이터는 계속 만들어지고 subscribe하는 순간 offset을 넘기고
    offSet과 RequiredOffset을 기반으로 그 위치부터 데이터를 가져가서 데이터를 사용하게 된다.
반응형

'Reactive-Programming' 카테고리의 다른 글

Netty - EventLoop  (0) 2024.05.08
Project reactor  (0) 2024.04.21
Reactive_streams  (0) 2024.04.17
Reactive programming  (0) 2024.04.16
Reactive manifesto  (0) 2024.04.15