AI & Data를 활용하는 기술경영자

Stream Processing 본문

Data Engineering

Stream Processing

Data_Lover 2022. 11. 1. 15:27
728x90

Stream Processing

데이터 스트리밍 플랫폼은 이벤트와 프로세스를 수집하거나, 이벤트 스트림을 변환하며 이벤트 스트림 프로세싱은 데이터 스트림에서 패턴을 찾는데 사용될 수 있다.

Flume

https://flume.apache.org

 

Welcome to Apache Flume — Apache Flume

Welcome to Apache Flume Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault t

flume.apache.org

정의

 

로그 데이터를 수집하는 기술로 여러 서버에서 생산된 대용량 로그 데이터를 효과적으로 ingest하여 HDFS과 같은 원격 목적지에 데이터를 전송하는 기능을 제공합니다.

즉, 여러 서버에 설치되는 소프트웨어로 수집한 로그를 원격지의 Data Lake로 전송하는 비동기 채널입니다.

 

특징

  • 자바로 작성된 유연한 로그 수집하는 툴이다.
  • 구조가 단순하고 유연하여 다양한 유형의 스트리밍 데이터 플로우 아키텍처(Streaming Data Flow)를 구성할 수 있다.
  • 신뢰성, 규모 확장성 및 기능 확정성을 확보할 수 있게 된다.
  • 서버 장애 시에도 수집된 로그 유실을 방지할 수 있고, Scale-up/ Scale-down방식의 확장을 모두 지원합니다.
  • 즉, 새로운 기능을 쉽게 커스터마이징 할 수 있습니다.
  • 분산 환경에서 로그를 모으는 소프트웨어이다.
  • 여러 곳에 위치하는 로그를 한 곳으로 모을 수 있다.
  • 로그를 배치로 한 번에 보내는 것이 아닌 스트리밍하게 지속적으로 보낸다.
  • 비동기 방식으로 처리한다.
  • 로그를 수집하는 역할과 로그를 전송하는 역할은 개별적으로 움직인다.(Source와 Sink는 개별적인 Thread이다)

구성 요소

  • Source: 외부 event가 생성되어 수집되는 영역이다
    • 1개로 구성되어있고, 복수 Channel 지정
  • Channel: Source와 Sink간의 버퍼 구간
    • 채널 별로 1개의 Sink 지정
  • Sink: 수집된 로그/이벤트를 HDFS와 같은 목적지에 전달한다
  • Interceptor: 수집된 로그/이벤트를 가공하는 부분

작동 과정

작동원리: 수집 대상 데이터(로그/이벤트) 생성하여 Source에 수신하고 그 메시지를 Channel에 전달한다. 그 이후 Sink가 메시지를 가져와서 목적지에 데이터를 전달/저장 한다.

설치

https://flume.apache.org/download.html

 

Download — Apache Flume

 

flume.apache.org

$ cd /usr/local/lib 
#Flume 설치 디렉터리 /usr/local/lib/flume로 가정합니다. 
#이 디렉터리는 아래에서 {FLUME_HOME}으로 참조하겠습니다.


$ sudo tar -zxvf apache-flume-1.8.0-bin.tar.gz
$ sudo mv apache-flume-1.8.0 flume

 

Flume 기본 설정과 실행 방법

Flume 설정 및 실행 예제

  • Agent Node
    • 로그가 발생하는 서버
    • WAS 로그가 저장되는 디렉터리를 spooling하여 로그 메모리 채널에 전송
    • Avro Sink가 채널의 로그를 Collecting node로 전송
    • port: 4545
  • Collecting node
    • Avro Source: Server A로부터 Avro통신을 통해서 로그 수집하여 메모리 채널에 전달한다
    • HDFS Sink: 수집된 이벤트를 HDFS에 저장

Agent 노드 Flume 설정 및 실행

 

"""
로그를 수집하는 에이전트 노드에 다음과 같은 Flume 설정을 추가합니다. 파일명은 flume.conf입니다. 
다음 예제는 /data/waslogs에 추가되는 로그 파일을 컬렉트 노드에 전송하는 설정입니다. 
전송 포맷은 avro입니다.

이 설정 파일은 /usr/local/lib/flume/conf에 하는 것으로 가정합니다.
"""
agentDataSource.sources = logsrc
agentDataSource.channels = logChannel
agentDataSource.sinks = avroSink

# Source : 지정된 디렉터리에 추가되는 파일 로그 전송, 전송후 파일명 변경
agentDataSource.sources.logsrc.type = spooldir
agentDataSource.sources.logsrc.channels = logChannel
agentDataSource.sources.logsrc.spoolDir = /data/waslogs

# Sink : Avro
agentDataSource.sinks.avroSink.type = avro
agentDataSource.sinks.avroSink.channel = logChannel
agentDataSource.sinks.avroSink.hostname = 142.3.3.1
agentDataSource.sinks.avroSink.port = 4545

# Channel : 메모리 채널
agentDataSource.channels.logChannel.type = memory
agentDataSource.channels.logChannel.capacity = 100
################################################

 

에이전트 노드의 flume은 다음과 같은 명령으로 실행됩니다.

cd /usr/local/lib/flume
./bin/flume-ng agent --conf-file ./conf/flume.conf --name agent01

Flume의 유연한 구성

 

Flume은 다양한 Source와 Sink 타입을 제공합니다. 이런 구현체를 이용하여 다양한 형태의 Data Flow를 디자인할 수 있습니다. 아래의 이미지는 일반적인 Flume 데이터 흐름 모델입니다.

 

Flume Data Flow

  • A 모델: Consolidation Model
    • 여러 서버로부터 로그를 통합하여 수집하고 저장하는 모델
    • 각 서버에 Flume Agent가 설치되어 로그를 통합 Flume에 저장
    • 통합 Flume는 지정된 목적지에 저장
  • B 모델: HA 모델
    • A모델에서 통합 Flume 장애의 SPOF(단일 장애 포인트)에 대한 보완
    • 고가용성을 위해서 통합 Flume을 이중화
  • C 모델: Multi-Target Model
    • A 모델에서 목적지를 복수로 지정
  • D 모델: Flafka Model
    • Flume이 Kafka의 Producer와 Consumer 역할 수행
    • 통합 Flume을 Kafka로 대체하여 고가용성 및 확장성, 유연성을 확보
    • Flume을 이용하여 Kafka 개발 요소 제거

 

Flume와 Kafka의 결합

 

배경

Flume는 Channel로 메모리와 파일 그리고 JDBC를 제공하기에, 메모리 타입은 처리 성능이 좋지만 장애 발생 시 데이터가 유실이 될 수 있는 데이터 안정성 측면에서 큰 단점을 갖고 있습니다.

 

반대로, 파일 타입을 사용하게 될 경우 데이터의 안정성은 향상이 되지만 성능이 크게 떨어질 수 있기에 고가용성 모드로 관리하기가 어렵습니다.

 

위와 같은 문제로 인해서, Flume와 Kafka를 결합하는 아이디어가 탄생을 하게 되었습니다.(D모델)

 

정리

Flume의 장점은 다양한 소스와 목적지에 대한 컴포넌트가 이미 구현되어 있기에  Flume 설치 및 설정만으로 작업을 완료할 수 있습니다.

Flume의 단점은 데이터를 저장하는 부분에서 장애가 발생할 경우, 데이터 유실의 가능성이 있고 확장 구성이 복잡하다는 것입니다.

 

Kafka의 장점은 저장된 데이터를 안전하게 관리할 수 있고, 구성이 간단하고 확장성이 좋다는 것입니다. Kafka의 단점은 데이터 수집기(producer)와 데이터 처리기(Consumer)를 대부분 사용자가 구현해야 한다는 것입니다.

이 구 컴포넌트를 함께 사용하면 각자의 취약점을 보완하고 강점을 부각할 수 있습니다. Kafka의 확장성과 관리 편의성 그리고 데이터 안정성을 확보하면서, Flume 컴포넌트 구성을 통해서 사용 편의성을 높일 수 있습니다.

 

Flume 모니터링

  • Ganglia
  • JMX
  • JSON Reporting

flume 실행 시 -Dflume.monitoring.type=ganglia 옵션을 추가하여 Ganglia에서 모니터링이 가능합니다.

 

flume은 자바로 개발되어 있기에 JMX로 모니터링 가능합니다. JMX로 모니터링하기 위해서는 환경 변수로 다음과 같은 JAVA_OPTS 옵션을 추가하는 방법을 추천합니다.

export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”

Flume은 JSON 리포팅 기능을 제공합니다. 이 기능을 상용하기 위해서는 flume 실행 시 -Dflume.monitoring.type=http 옵션을 추가하여 웹 기반 모니터링이 가능합니다.

 

리포팅 기본 포트는 41414이며 변경 가능합니다. http://<모니터링 대상 Flum IP>:41414/metrics 호출하면 아래와 같은 정보가 출력됩니다.

{
   "SINK.avroSink":{
      "BatchCompleteCount":"133",
      "ConnectionFailedCount":"0",
      "EventDrainAttemptCount":"13300",
      "ConnectionCreatedCount":"1",
      "Type":"SINK",
      "BatchEmptyCount":"0",
      "ConnectionClosedCount":"0",
      "EventDrainSuccessCount":"13300",
      "StopTime":"0",
      "StartTime":"1398215901251",
      "BatchUnderflowCount":"0"
   },
   "SOURCE.otvSource":{
      "OpenConnectionCount":"0",
      "Type":"SOURCE",
      "AppendBatchAcceptedCount":"133",
      "AppendBatchReceivedCount":"133",
      "EventAcceptedCount":"13300",
      "AppendReceivedCount":"0",
      "StopTime":"0",
      "StartTime":"1398215901332",
      "EventReceivedCount":"13300",
      "AppendAcceptedCount":"0"
   },
   "CHANNEL.otvChannel":{
      "EventPutSuccessCount":"13300",
      "ChannelFillPercentage":"0.0",
      "Type":"CHANNEL",
      "EventPutAttemptCount":"13300",
      "ChannelSize":"0",
      "StopTime":"0",
      "StartTime":"1398215901247",
      "EventTakeSuccessCount":"13300",
      "ChannelCapacity":"100",
      "EventTakeAttemptCount":"13301"
   }
}

 

Spark

Flink

 

Fluentd

728x90

'Data Engineering' 카테고리의 다른 글

Database Management System(DBMS)  (0) 2022.11.07
Batch Processing  (0) 2022.11.01
Event Streaming  (1) 2022.11.01
빅데이터 플랫폼(아키텍처) 이해하기  (0) 2022.10.19
빅데이터 처리(Spark & Hadoop)  (0) 2022.10.19