elasticsearch 로 하고 있는것들
최초 개발이유
1. 통합 로그 검색 시스템이 필요 (장애나 이슈 발생시 연관된 수많은 Node(서버,스위치,스토리지 등등) 의 Log를 일일이 접속해서 들여다본다는건 노가다 + 눈알 빠짐)
2. 공짜면서 운영이 쉬워야함 (splunk 등도있으나 결국 비용 발생) / lucene query는 간단하고 배우기 쉽다.
이외 elasticsearch 의 장점 및 기능은 인터넷 검색 …..
1. SaaS 서비스 동시 접속자(ccu) 체크
접속자별 지도 표기(geoip)
하루에 한번 powershell 로 전체 VM(Guest) 리스트 parsing -> VM 리스트 (대략 10,000대 정도) -> 리눅스 script (winexe 활용) -> -> VM별 특정 프로세스(application) ESTABILISH 카운트 -> IP리스트만 parsing 하여 파일로 저장 -> -> Logstash (파싱,Geoip) -> elasticsearch 저장 -> grafana view |
2. 전체 호스트 서버(hypervisor) 중요 시스템 syslog 저장 및 알람
리눅스 서버 : rsyslog 사용
윈도우 서버 : winlogbeat 사용
winlogbeat.yml
winlogbeat.event_logs:
– name: System
level: critical, error, warning, information
event_id: 5120, 1129, 129, 61110, 61117, 153, 17, 6006, 1074 <-중요한 이벤트 code만
ignore_older: 168h
output.elasticsearch:
hosts:
– 1xx.xx.xx:9200
logging.to_files: true
logging.files:
path: C:/ProgramData/winlogbeat/Logs
logging.level: info
알람 : 윈도우: 특정 이벤트 code 예: 5120, 6006 중요한것들
스토리지, 스위치 logs : syslog -> rsyslog VM + filebeat -> logstash VM -> elasticsearch VM
알람 : 스토리지 : disk fault, Queue depth over, scsi.cmd.notReadyCondition <- 서비스 운영상 중요한 로그들 (이외 다수)
3. IP 관리 자동화
filebeat 사용
filebeat.yml 핵심 요약
– input_type: log
paths:
– /root/…/ipam/filebeat/*-sumfile.log
document_type: ipam
– input_type: log
paths:
– /root/…/ipam/filebeat/*-offip.log
document_type: offip
output.logstash:
hosts: [“1xx.xxx.xx:5000”]
매일 하루에 한번 98개 IP 대역에 대하여 -> masscan 으로 특정 Port 및 ICMP 체크
-> 사용중인 IP, 사용 Off IP Parsing 하여 file로 저장 -> filebeat -> logstash grok 파싱 ->
-> elastic 저장 -> grafana view
view 부분은 elastic kibana로도 가능하지만 개인적으로 grafana가 더 쉽고 편해서..
——————————————————————————————————
logstash 환경 설정
input {
beats {
port => 5000
}
file {
type => “Saas”
path => “/data/script/GEOIP/log/ip-list”
start_position => “beginning”
}
}
filter {
if [type] == “apache” {
grok {
match => { “message” => “%{COMBINEDAPACHELOG}” }
}
}
if [type] == “syslog” {
grok {
match => [ “message”, “%{SYSLOGLINE}” ]
}
}
if [type] == “ipam” {
grok {
patterns_dir => “/data/pattern/ipam”
match => [ “message”, “%{IPAM}” ]
}
}
if [type] == “offip” {
grok {
patterns_dir => “/data/pattern/ipam”
match => [ “message”, “%{IPAM}” ]
}
}
if [type] == “Saas” {
grok {
match => { “message” => “%{IP:clientip}” }
}
geoip {
source => “clientip”
}
}
}
output {
if [type] == “apache” {
elasticsearch { hosts => [“x.x.x.x:9200”]
index => “filebeat-%{+YYYY.MM.dd}”
}
}
if [type] == “syslog” {
elasticsearch { hosts => [“x.x.x.x:9200”]
index => “syslog-%{+YYYY.MM.dd}”
}
}
if [type] == “ipam” {
elasticsearch { hosts => [“x.x.x.x:9200”]
index => “ipam-%{+YYYY.MM.dd}”
}
}
if [type] == “offip” {
elasticsearch { hosts => [“x.x.x.x:9200”]
index => “offip-%{+YYYY.MM.dd}”
}
}
if [type] == “Saas” {
elasticsearch { hosts => [“x.x.x.x:9200”]
index => “Saas-%{+YYYY.MM.dd}”
}
}
}
ELK 활용 사례 / 엘라스틱서치 활용사례
전체 서비스 평일 평균 엘라스틱서치 클러스터로 쌓이는 전체 보안 이벤트 로그는 아래와 같은 상황이다.
날짜별 Document 수는 3억2천~3억4천건 정도… 가끔 불특정 다수 해외 IP에서 지속적으로 스캔성 공격 시도를 할때는 늘어나는 경우도 있다.
사람이 일일이 이렇게 방대한 로그를 다 들여다보기란 바닷가 백사장에서 바늘 찾기다. 어떻게 하면 좀더 효율적으로 보안 위협 로그를 선별 할수 있을까?
현재 구성은 IDC 전체 구간 각 서비스별 보안장비 -> arcsight -> ELK 스택(elasticsearch) 이다.
요약하자면, 모든 보안 이벤트 로그를 엘라스틱서치로 받고 있다. 여유가 있다면 xpack 을 유료로 구매하여 Watcher (알림 통보)를 사용하면 된다. 방대한 보안 이벤트중 오픈 소스 “elastalert” 를 사용하여 보안 위협에 대하여 자동으로 알려주는
elasticsearch 활용 사례에 대하여 포스팅한다.
elastalert 설치 과정 type에 대한 설명도 생략….
인터넷 서치하면 많이 나온다.
알려진 C&C 서버로의 통신 탐지
– type 은 blacklist
– blacklist 파일 내용과 매칭되면 알람을 통보해준다.
– c2-list.txt 내용은 아래와 같이 IP를 적어주면된다.
name: “C2 서버 통신 탐지” type: blacklist use_local_time: false use_strftime_index: true index: xxxx-%Y.%m.%d blacklist: – “!file /root/…/elastalert/list/c2-list.txt” # 특정 source IP 및 특정 목적지 Port 는 제외 filter: – query: query_string: query: “NOT sourceAddress:(59.x.x.x OR 1.x.x.x OR 10.20.x.x) AND NOT destinationPort:(25 OR 443 OR 3389)” compare_key: “destinationAddress” alert_text: “Match Count: {0} \n{1} {2} {3} {4} {5} {6} {7} {8} {9}” alert_text_type: “alert_text_only” alert_text_args: [“num_matches”, “@timestamp”, “deviceHostName”, “deviceAction”, “source.country_code2”, “sourceAddress”, “sourcePort”, “destination.country_code2”, “destinationAddress”, “destinationPort”] # 아래는 알림에 포함하고 싶은 field만 정의 #include: [“deviceHostName”, “deviceAction”, “source.country_code2”, “sourceAddress”, “sourcePort”, “destination.country_code2”, “destinationAddress”, “destinationPort”] alert: – slack slack: slack_webhook_url: “https://hooks.slack.com/services/T9YJY3HV0/BBBH341PT/…………………………………………….” email: – “user1@xxxxxxxx.com” – “user2@xxxxxxxx.com” |
탐지 Notification
slack
스캔성 공격 탐지 IP
– type 은 frequency
– 조건은 1분에 1000건 이상 해외 IP중에 accept 된 source IP 탐지후 알람
name: “1분 동안 1000회 이상 접속 Source IP” type: frequency num_events: 1000 timeframe: minutes: 1 use_strftime_index: true index: xxxx-%Y.%m.%d # 해외 IP중 accept 된것만 그리고 icmp는 제외 filter: – query: query_string: query: “NOT source.country_code2:KR AND NOT deviceAction:(deny OR drop OR Block) AND _exists_:source.country_code2 AND NOT destinationPort:2048” top_count_keys: – sourceAddress #top_count_number: 2 realert: minutes: 5 query_key: “sourceAddress” ################ Alert ################# alert_text: “Match Count: {0} \n{1} {2} {3} {4} {5} {6} {7} {8} {9}” alert_text_type: “alert_text_only” alert_text_args: [“num_matches”, “@timestamp”, “deviceHostName”, “deviceAction”, “source.country_code2”, “sourceAddress”, “sourcePort”, “destination.country_code2”, “destinationAddress”, “destinationPort”] alert: – slack slack: slack_webhook_url: “https://hooks.slack.com/services/T9YJY3HV0/BBBH341PT………………………………….” email: – “example01@mail.com” – “example02@mail.com” |
탐지 알람 화면
slack
보안 장비 장애 알람 (특정 이벤트 발생시)
– type은 frequency (다른 type도 가능)
– 조건 특정 필드에 critical 이라고 매칭 되면 알람 통보
name: “보안장비 이슈 발생” type: frequency use_strftime_index: true index: xxxx-%Y.%m.%d num_events: 1 timeframe: minutes: 2 # critical 문자열이 매칭되면 알람 통보 filter: – query: query_string: query: “(level:critical OR pri:critical) AND NOT deviceAction:update” alert_text: “Match Count: {0} \n{1} {2} {3} {4} {5} {6} {7}” alert_text_type: “alert_text_only” alert_text_args: [“num_matches”, “@timestamp”, “deviceHostName”, “deviceAction”, “pri”, “level”, “hbdn_reason”, “reason”] alert: – slack slack: slack_webhook_url: “https://hooks.slack.com/services/T9YJY3HV0/BBBH34…………………………………………………..” email: – “example1@mail.com” – “example2@mail.com” – “example3@mail.com” |
탐지 알람 화면
slack
elastalert 시작은 각 OS 별로 service에 등록하여 실행하는 방법도 있지만
각 rule별로 백그라운드로 실행하면 여러가지 장점이 있다.
예: c2서버 통신 접속 탐지 시작 스크립트
cat elastalert-c2
#/bin/bash
DIR=”/root/…/elastalert”
NAMEs=”c2″
CONFs=”$NAMEs.yaml”
RULEs=”$NAMEs.rule.yaml”
LOGs=”$NAMEs.log”
nohup /usr/local/bin/elastalert –verbose –start NOW –config $DIR/etc/$CONFs –rule $DIR/rules/$RULEs > $DIR/log/$LOGs 2>&1 &
sleep 2
ps aux | grep $NAMEs.rule.yaml | grep -v “grep”
elastalert 프로세스가 아래와 같이 실행된다.
이와 같이 실시간 발생되는 보안 위협에 대하여 elastalert 을 활용하여 알람을 자동화 한다면
여러가지 측면에서 업무 효율성과 실용성을 높일수 있다.
하지만 결국 정.오탐에 대한 판단은 사람이 해야한다.
AI, 머신러닝, 인텔리전스 등 요즘 보안 업계에서도 핫 트렌드다.
하지만 결국 판단과 결정은 사람이 해야한다.
위와 같은 기술을 접목시킨 보안 장비를 여러번 POC, BMT 해 보았으나, 개인적으로 대부분 마음에 들지 않는다
기술이 발전하면 어느정도 편해지고 사람이 하는 일을 덜순 있지만 특히나 보안 이벤트 로그는
너무나도 방대하고 다양하고 벤더(장비) 별로 모두 틀리기 때문에 AI,머신러닝을 도입한다해도
완벽하게 100% 보안 위협을 탐지 한다는건 어불성설이다.
당연히 100%는 없다.
신중히 선택해서 적재적소에 잘 활용한다면 없는것 보다는 있는게 낫다고 본다.
어느 정도 사람이 할일을 덜어 준다는것은 부정할수없다.
하지만, 결국 사람이 판단하고 고민하고 결정해야 하는 일이기에 해킹 피해를 최소화 하고
신속한 대응을 위해서는 보안 장비에 의존하기 보다는 보안 관리자의 거시적 관점에서의
분석 역량과 전체를 바라볼수 있는 통찰력을 기르는게 더 중요하다고 생각한다.