실시간 변동 데이터 처리를 위한 스트리밍 데이터 처리 기법
![[[6.1] 스트리밍 데이터 처리 (1).png]]
-- 웹 사이트 클릭 로그가 스트림으로 실시간으로 들어온다.
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:01" }
{ "user_id": 204, "page": "/main", "timestamp": "2024-04-25 10:00:03" }
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:05" }
SELECT
page,
COUNT(*) AS click_count
FROM clicks
GROUP BY
-- 10초 간격의 텀블링 윈도우를 정의
TUMBLE(timestamp, INTERVAL '10' SECOND),
page;
-- 0 ~ 10분 사이 윈도우1
-- 10 ~ 20분 사이 윈도우2
-- Apache Flink SQL, Kafk
a SQL(ksqlDB)
-- 10:00:10 시점에 출력
{ "window_end": "10:00:10", "page": "/products/1", "click_count": 15 }
{ "window_end": "10:00:10", "page": "/main", "click_count": 8 }
-- 10:00:20 시점에 출력
{ "window_end": "10:00:20", "page": "/cart", "click_count": 12 }-- 웹 사이트 클릭 로그가 스트림으로 실시간으로 들어온다.
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:01" }
{ "user_id": 204, "page": "/main", "timestamp": "2024-04-25 10:00:03" }
{ "user_id": 101, "page": "/products/1", "timestamp": "2024-04-25 10:00:05" }
SELECT
page,
COUNT(*) AS click_count
FROM clicks
GROUP BY
-- 10초 간격의 텀블링 윈도우를 정의
TUMBLE(timestamp, INTERVAL '10' SECOND),
page;
-- 0 ~ 10분 사이 윈도우1
-- 10 ~ 20분 사이 윈도우2
-- Apache Flink SQL, Kafa SQL(ksqlDB)
-- 10:00:10 시점에 출력
{ "window_end": "10:00:10", "page": "/products/1", "click_count": 15 }
{ "window_end": "10:00:10", "page": "/main", "click_count": 8 }
-- 10:00:20 시점에 출력
{ "window_end": "10:00:20", "page": "/cart", "click_count": 12 }
수천억대 데이터 처리를 위한 초대용량 배치 처리
-- 스케줄러 & 스크립트의 조합
INSERT INTO daily_product_sales (product_id, total_sales_amount, total_sales_count, target_date)
SELECT
product_id,
SUM(sale_price * quantity),
SUM(quantity),
'2024-05-20'
FROM
orders
WHERE
created_at >= '2024-05-20 00:00:00' AND created_at < '2024-05-21 00:00:00'
GROUP BY
product_id;
-- 1. 처리 시간 증가
-- 2. DB 부하로 인한 서비스 영향도
-- 3. 장애 대응에 대한 문제
-- 맵 리듀스 : 거대한 데이터 처리 작업을 위해, 분할 정복 방식으로 해결하는 개념
-- 아파치 스파크 : 맵 리듀스를 구현하고, 인메모리 기술로 성능을 극대화 하는 시스템
-- 1. 계산 위임
-- 2. 병렬 데이터 로딩
-- 1. MySQL의 'orders' 테이블을 Spark에서 병렬로 읽어와 임시 뷰로 정의
CREATE OR REPLACE TEMPORARY VIEW orders_view
USING "jdbc"
OPTIONS (
url "jdbc:mysql://your-mysql-server:3306/service_db",
dbtable "orders",
user "spark_user",
password "spark_password",
-- ▼ 병렬 읽기를 위한 핵심 옵션들 ▼
partitionColumn "order_id", -- 데이터를 나눌 기준 컬럼 (숫자 타입, PK 권장)
lowerBound "1", -- 기준 컬럼의 최소값
upperBound "100000000", -- 기준 컬럼의 최대값
numPartitions "100" -- 데이터를 몇 개로 나눌지 (파티션 개수)
);
-- Executor 1이 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 1 AND order_id < 1000000;
-- Executor 2가 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 1000000 AND order_id < 2000000;
-- Executor 100이 실행하는 쿼리
SELECT * FROM orders WHERE order_id >= 99000000 AND order_id <= 100000000;
-- 2. 실제 계산은 Spark 클러스터에서 병렬로 수행
-- (이 쿼리는 더 이상 MySQL이 아닌 Spark 엔진이 실행)
CREATE OR REPLACE TEMPORARY VIEW daily_summary AS
SELECT
product_id,
SUM(sale_price * quantity) AS total_amount,
COUNT(*) AS total_count
FROM
orders_view
WHERE
created_at >= '2024-05-20 00:00:00' AND created_at < '2024-05-21 00:00:00'
GROUP BY
product_id;
-- 3. 집계된 결과를 다시 다른 DB 테이블에 저장
INSERT INTO analytics_db.daily_product_sales
SELECT * FROM daily_summary;
-- Predicate, PushDown
-- 데이터 셔플링
실시간 데이터 동기화를 위한 디자인 설계 패턴
![[[6.3] 운영 레벨의 다양한 시스템 아키텍처 [ 실시간 데이터 동기화 ].png]] ![[[6.3] Debezium 동작 과정.png]]
-- 1. 이중 쓰기 작업
-- Anti-pattern: Dual Writes
public void registerProduct(Product product) {
-- 1. MySQL에 저장
mysqlRepository.save(product);
-- 2. Elasticsearch에 인덱싱
try {
elasticsearchClient.index(product);
} catch (Exception e) {
-- ??? 어떻게 처리해야 할까? 롤백? 재시도? 무시?
}
-- 3. 캐시 업데이트
redisClient.update(product);
}
-- 2. 배치 폴링
-- 5분마다 실행되는 스케줄러
SELECT * FROM products WHERE updated_at > '5분 전 시간';
-- Debezium
{
"schema": { ... }, // 메시지 구조를 정의하는 스키마 정보
"payload": {
"before": { // 변경 전 데이터
"id": 123,
"name": "Old Product Name",
"price": 10000.00
},
"after": { // 변경 후 데이터
"id": 123,
"name": "New Product Name",
"price": 12000.00
},
"source": { // 변경이 발생한 소스 정보
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql.prod-db.main",
"ts_ms": 1678886400000,
"db": "mydatabase",
"table": "products",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000123", // Binlog 파일명
"pos": 4567, // Binlog 위치
"row": 0
},
"op": "u", // Operation 타입: 'c'(create), 'u'(update), 'd'(delete)
"ts_ms": 1678886400500, // Debezium이 이벤트를 처리한 시간
"transaction": null
}
}
-- 1. 멱등성 보장
-- 2. 메시지 처리 순서 보장
견고한 비동기 작업을 위한 MySQL 작업 큐 활용법
작업큐로 mysql을?
-- Anti-pattern: Synchronous Processing
public void signUp(User user) {
-- 1. 사용자 정보를 DB에 저장 (빠름)
userRepository.save(user);
-- 2. 환영 이메일 발송 (느릴 수 있음)
emailService.sendWelcomeEmail(user.getEmail());
-- 3. 신규 가입 쿠폰 발급 (느릴 수 있음)
couponService.issueSignUpCoupon(user.getId());
-- 4. 모든 작업이 끝나야 사용자에게 응답이 감
}
CREATE TABLE `jobs` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`job_type` VARCHAR(50) NOT NULL COMMENT '작업 종류 (e.g., SEND_EMAIL, ISSUE_COUPON)',
`payload` JSON NOT NULL COMMENT '작업에 필요한 데이터',
`status` VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '작업 상태 (PENDING, RUNNING, DONE, FAILED)',
`priority` INT NOT NULL DEFAULT 100 COMMENT '우선순위 (낮을수록 높음)',
`retry_count` INT NOT NULL DEFAULT 0 COMMENT '재시도 횟수',
`last_error_message` TEXT NULL COMMENT '마지막 에러 메시지',
`run_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '실행될 시간 (지연 작업용)',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
INDEX `idx_status_priority_runat` (`status`, `priority`, `run_at`) COMMENT '워커가 작업을 가져가기 위한 복합 인덱스'
);
BEGIN; -- 트랜잭션 시작
-- 1. 핵심 비즈니스 로직
INSERT INTO users (name, email) VALUES ('John Doe', 'john.doe@example.com');
SET @user_id = LAST_INSERT_ID();
-- 2. 비동기 작업 등록
INSERT INTO jobs (job_type, payload, priority)
VALUES ('SEND_WELCOME_EMAIL', JSON_OBJECT('user_id', @user_id), 100);
INSERT INTO jobs (job_type, payload, priority)
VALUES ('ISSUE_SIGNUP_COUPON', JSON_OBJECT('user_id', @user_id), 200);
COMMIT; -- 트랜잭션 종료
--
BEGIN; -- 트랜잭션 시작
-- 1. 처리할 작업을 찾고, 해당 행에 배타적 락(Exclusive Lock)을 건다.
-- 다른 워커는 이 행에 접근할 수 없게 된다.
SET @job_id = (
SELECT id FROM jobs
WHERE status = 'PENDING' AND run_at <= NOW()
ORDER BY priority ASC, id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED -- MySQL 8.0+ : 락이 걸린 행은 건너뛰고 다음 행을 찾음
);
-- 2. 만약 처리할 작업이 있다면 (job_id가 NULL이 아니라면)
IF @job_id IS NOT NULL THEN
-- 상태를 'RUNNING'으로 변경하여 다른 워커가 가져가지 못하도록 명시한다.
UPDATE jobs SET status = 'RUNNING' WHERE id = @job_id;
END IF;
COMMIT; -- 트랜잭션 종료. 락이 해제된다.
-- 실패 처리 로직 (의사 코드)
SET @new_retry_count = retry_count + 1;
SET @delay = POWER(10, @new_retry_count); -- 10, 100, 1000... 초 지연 (예시)
UPDATE jobs
SET
status = 'PENDING', -- 재시도를 위해 PENDING으로 변경
retry_count = @new_retry_count,
last_error_message = '...',
run_at = NOW() + INTERVAL @delay SECOND -- 다음 실행 시간을 뒤로 미룸
WHERE id = @job_id;
MySQL과 NoSQL 결합한 현대적인 시스템 아키텍처
![[[6.5] (1).png]]
-- 1. MySQL & NoSQL
-- > MySQL : 상품의 정보만을 저장
-- > MongoDB : 상품별로 다른 옵션이나 상세 설명과 같은 스키마를 가진다.
-- 2. 장바구니나, 세션 관리
-- > Redis
-- 3. Full Text 검색
-- WHERE comment LIKE '%검색어%'
-- > ElasticSearch
-- --------
-- 1. 시스템 요구사항을 고려해라
-- 2. 분산 트랜잭션