실시간 변동 데이터 처리를 위한 스트리밍 데이터 처리 기법

![[[6.1] 스트리밍 데이터 처리 (1).png]]

-- 웹 사이트 클릭 로그가 스트림으로 실시간으로 들어온다.

{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:01" }
{ "user_id": 204, "page": "/main", "timestamp": "2024-04-25 10:00:03" }
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:05" }


SELECT
    page,
    COUNT(*) AS click_count
FROM clicks
GROUP BY
    -- 10초 간격의 텀블링 윈도우를 정의
    TUMBLE(timestamp, INTERVAL '10' SECOND),
    page;

-- 0 ~ 10분 사이 윈도우1
-- 10 ~ 20분 사이 윈도우2

-- Apache Flink SQL, Kafk

a SQL(ksqlDB)

-- 10:00:10 시점에 출력
{ "window_end": "10:00:10", "page": "/products/1", "click_count": 15 }
{ "window_end": "10:00:10", "page": "/main", "click_count": 8 }

-- 10:00:20 시점에 출력
{ "window_end": "10:00:20", "page": "/cart", "click_count": 12 }-- 웹 사이트 클릭 로그가 스트림으로 실시간으로 들어온다.
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:01" }
{ "user_id": 204, "page": "/main", "timestamp": "2024-04-25 10:00:03" }
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:05" }


SELECT
    page,
    COUNT(*) AS click_count
FROM clicks
GROUP BY
    -- 10초 간격의 텀블링 윈도우를 정의
    TUMBLE(timestamp, INTERVAL '10' SECOND),
    page;

-- 0 ~ 10분 사이 윈도우1
-- 10 ~ 20분 사이 윈도우2

-- Apache Flink SQL, Kafa SQL(ksqlDB)

-- 10:00:10 시점에 출력
{ "window_end": "10:00:10", "page": "/products/1", "click_count": 15 }
{ "window_end": "10:00:10", "page": "/main", "click_count": 8 }

-- 10:00:20 시점에 출력
{ "window_end": "10:00:20", "page": "/cart", "click_count": 12 }

수천억대 데이터 처리를 위한 초대용량 배치 처리

-- 스케줄러 & 스크립트의 조합

INSERT INTO daily_product_sales (product_id, total_sales_amount, total_sales_count, target_date)
SELECT
    product_id,
    SUM(sale_price * quantity),
    SUM(quantity),
    '2024-05-20'
FROM
    orders
WHERE
    created_at >= '2024-05-20 00:00:00' AND created_at < '2024-05-21 00:00:00'
GROUP BY
    product_id;


-- 1. 처리 시간 증가
-- 2. DB 부하로 인한 서비스 영향도
-- 3. 장애 대응에 대한 문제

-- 맵 리듀스 : 거대한 데이터 처리 작업을 위해, 분할 정복 방식으로 해결하는 개념
-- 아파치 스파크 : 맵 리듀스를 구현하고, 인메모리 기술로 성능을 극대화 하는 시스템
-- 1. 계산 위임
-- 2. 병렬 데이터 로딩

-- 1. MySQL의 'orders' 테이블을 Spark에서 병렬로 읽어와 임시 뷰로 정의
CREATE OR REPLACE TEMPORARY VIEW orders_view
USING "jdbc"
OPTIONS (
  url              "jdbc:mysql://your-mysql-server:3306/service_db",
  dbtable          "orders",
  user             "spark_user",
  password         "spark_password",

  -- ▼ 병렬 읽기를 위한 핵심 옵션들 ▼
  partitionColumn  "order_id",         -- 데이터를 나눌 기준 컬럼 (숫자 타입, PK 권장)
  lowerBound       "1",                -- 기준 컬럼의 최소값
  upperBound       "100000000",        -- 기준 컬럼의 최대값
  numPartitions    "100"               -- 데이터를 몇 개로 나눌지 (파티션 개수)
);

-- Executor 1이 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 1 AND order_id < 1000000;

-- Executor 2가 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 1000000 AND order_id < 2000000;

-- Executor 100이 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 99000000 AND order_id <= 100000000;

-- 2. 실제 계산은 Spark 클러스터에서 병렬로 수행
--    (이 쿼리는 더 이상 MySQL이 아닌 Spark 엔진이 실행)
CREATE OR REPLACE TEMPORARY VIEW daily_summary AS
SELECT
    product_id,
    SUM(sale_price * quantity) AS total_amount,
    COUNT(*) AS total_count
FROM
    orders_view
WHERE
    created_at >= '2024-05-20 00:00:00' AND created_at < '2024-05-21 00:00:00'
GROUP BY
    product_id;

-- 3. 집계된 결과를 다시 다른 DB 테이블에 저장
INSERT INTO analytics_db.daily_product_sales
SELECT * FROM daily_summary;

-- Predicate, PushDown
-- 데이터 셔플링

실시간 데이터 동기화를 위한 디자인 설계 패턴

![[[6.3] 운영 레벨의 다양한 시스템 아키텍처 [ 실시간 데이터 동기화 ].png]] ![[[6.3] Debezium 동작 과정.png]]

-- 1. 이중 쓰기 작업
 
-- Anti-pattern: Dual Writes
public void registerProduct(Product product) {
    --  1. MySQL에 저장
    mysqlRepository.save(product);
 
-- 2. Elasticsearch에 인덱싱
    try {
        elasticsearchClient.index(product);
} catch (Exception e) {
        -- ??? 어떻게 처리해야 할까? 롤백? 재시도? 무시?
    }
 
--  3. 캐시 업데이트
    redisClient.update(product);
}
 
-- 2. 배치 폴링
 
-- 5분마다 실행되는 스케줄러
SELECT * FROM products WHERE updated_at > '5분 전 시간';
 
-- Debezium
 
{
  "schema": { ... }, // 메시지 구조를 정의하는 스키마 정보
  "payload": {
    "before": { // 변경 전 데이터
      "id": 123,
      "name": "Old Product Name",
      "price": 10000.00
    },
    "after": { // 변경 후 데이터
      "id": 123,
      "name": "New Product Name",
      "price": 12000.00
    },
    "source": { // 변경이 발생한 소스 정보
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "mysql.prod-db.main",
      "ts_ms": 1678886400000,
      "db": "mydatabase",
      "table": "products",
      "server_id": 1,
      "gtid": null,
      "file": "mysql-bin.000123", // Binlog 파일명
      "pos": 4567,               // Binlog 위치
      "row": 0
    },
    "op": "u", // Operation 타입: 'c'(create), 'u'(update), 'd'(delete)
    "ts_ms": 1678886400500, // Debezium이 이벤트를 처리한 시간
    "transaction": null
  }
}
 
-- 1. 멱등성 보장
-- 2. 메시지 처리 순서 보장
 

견고한 비동기 작업을 위한 MySQL 작업 큐 활용법

작업큐로 mysql을?

-- Anti-pattern: Synchronous Processing
public void signUp(User user) {
    -- 1. 사용자 정보를 DB에 저장 (빠름)
    userRepository.save(user);
 
-- 2. 환영 이메일 발송 (느릴 수 있음)
    emailService.sendWelcomeEmail(user.getEmail());
 
-- 3. 신규 가입 쿠폰 발급 (느릴 수 있음)
    couponService.issueSignUpCoupon(user.getId());
 
-- 4. 모든 작업이 끝나야 사용자에게 응답이 감
}
 
CREATE TABLE `jobs` (
    `id` BIGINT NOT NULL AUTO_INCREMENT,
    `job_type` VARCHAR(50) NOT NULL COMMENT '작업 종류 (e.g., SEND_EMAIL, ISSUE_COUPON)',
    `payload` JSON NOT NULL COMMENT '작업에 필요한 데이터',
    `status` VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '작업 상태 (PENDING, RUNNING, DONE, FAILED)',
    `priority` INT NOT NULL DEFAULT 100 COMMENT '우선순위 (낮을수록 높음)',
    `retry_count` INT NOT NULL DEFAULT 0 COMMENT '재시도 횟수',
    `last_error_message` TEXT NULL COMMENT '마지막 에러 메시지',
    `run_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '실행될 시간 (지연 작업용)',
    `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    INDEX `idx_status_priority_runat` (`status`, `priority`, `run_at`) COMMENT '워커가 작업을 가져가기 위한 복합 인덱스'
);
 
BEGIN; -- 트랜잭션 시작
 
-- 1. 핵심 비즈니스 로직
INSERT INTO users (name, email) VALUES ('John Doe', 'john.doe@example.com');
SET @user_id = LAST_INSERT_ID();
 
-- 2. 비동기 작업 등록
INSERT INTO jobs (job_type, payload, priority)
VALUES ('SEND_WELCOME_EMAIL', JSON_OBJECT('user_id', @user_id), 100);
 
INSERT INTO jobs (job_type, payload, priority)
VALUES ('ISSUE_SIGNUP_COUPON', JSON_OBJECT('user_id', @user_id), 200);
 
COMMIT; -- 트랜잭션 종료
 
--
 
BEGIN; -- 트랜잭션 시작
 
-- 1. 처리할 작업을 찾고, 해당 행에 배타적 락(Exclusive Lock)을 건다.
--    다른 워커는 이 행에 접근할 수 없게 된다.
SET @job_id = (
    SELECT id FROM jobs
    WHERE status = 'PENDING' AND run_at <= NOW()
    ORDER BY priority ASC, id ASC
    LIMIT 1
    FOR UPDATE SKIP LOCKED -- MySQL 8.0+ : 락이 걸린 행은 건너뛰고 다음 행을 찾음
);
 
-- 2. 만약 처리할 작업이 있다면 (job_id가 NULL이 아니라면)
IF @job_id IS NOT NULL THEN
  -- 상태를 'RUNNING'으로 변경하여 다른 워커가 가져가지 못하도록 명시한다.
UPDATE jobs SET status = 'RUNNING' WHERE id = @job_id;
END IF;
 
COMMIT; -- 트랜잭션 종료. 락이 해제된다.
 
 
-- 실패 처리 로직 (의사 코드)
SET @new_retry_count = retry_count + 1;
SET @delay = POWER(10, @new_retry_count); -- 10, 100, 1000... 초 지연 (예시)
 
UPDATE jobs
SET
    status = 'PENDING', -- 재시도를 위해 PENDING으로 변경
    retry_count = @new_retry_count,
    last_error_message = '...',
    run_at = NOW() + INTERVAL @delay SECOND -- 다음 실행 시간을 뒤로 미룸
WHERE id = @job_id;
 

MySQL과 NoSQL 결합한 현대적인 시스템 아키텍처

![[[6.5] (1).png]]

-- 1. MySQL & NoSQL
-- > MySQL : 상품의 정보만을 저장
-- > MongoDB : 상품별로 다른 옵션이나 상세 설명과 같은 스키마를 가진다.

-- 2. 장바구니나, 세션 관리
-- > Redis

-- 3. Full Text 검색
-- WHERE comment LIKE '%검색어%'
-- > ElasticSearch


-- --------

-- 1. 시스템 요구사항을 고려해라
-- 2. 분산 트랜잭션