Forward logs from Alibaba Simple Log Service to QRadar

In this tech blog, we delve into the process of forwarding logs from Alibaba Simple Log Service (SLS) to QRadar, IBM's leading security information and event management (SIEM) solution. Join us as we explore how this integration enables organizations to enhance their security posture and gain valuable insights into their cloud-based environments.

ALIBABA CLOUD

Abhishek Gupta

8/18/20235 min read

Alibaba Simple Log Service
Alibaba Simple Log Service

Syslog is a widely used logging standard that is applicable to most security information and event management (SIEM) systems, such as IBM QRadar and HP ArcSight. This topic describes how to ship logs from Log Service to a SIEM system over Syslog.

Alibaba Log Service SLS
Alibaba Log Service SLS

Background information

  1. Syslog is defined in RFC 5424 and RFC 3164. RFC 3164 was published in 2001, and RFC 5424 was an upgraded version published in 2009. We recommend that you use RFC 5424 because this version is compatible with RFC 3164 and solves more issues than RFC 3164. For more information, see RFC 5424 and RFC 3164.

  2. Syslog over TCP/TLS: Syslog defines the standard format of log messages. Both TCP and UDP support Syslog to ensure the stability of data transmission. RFC 5425 defines the use of Transport Layer Security (TLS) to enable secure transport of Syslog messages. If your SIEM system supports TCP or TLS, we recommend that you send Syslog messages over TCP or TLS. For more information, see RFC 5425.

  3. Syslog facility: the program component defined by earlier versions of UNIX. You can select user as the default facility. For more information, see Program components.

  4. Syslog severity: the severity defined for Syslog messages. You can set the log with specific content to a higher severity level based on your business requirements.

Steps Forward logs from Alibaba log service to QRadar

A. Create RAM User
  1. Obtain an AccessKey pair that is used to access Log Service.

  2. You can use the AccessKey pair of a Resource Access Management (RAM) user to access a Log Service project. For more information, see AccessKey pair and Configure an AccessKey pair for a RAM user to access a source Logstore and a destination Logstore.

  3. You can use the permission assistant feature to grant permissions to a RAM user. For more information, see Use the permission assistant to grant permissions. The following example shows a common permission policy that is configured for a RAM user.

{
"Version": "1",
"Statement": [
{
"Action": [
"log:ListShards",
"log:GetCursorOrData",
"log:GetConsumerGroupCheckPoint",
"log:UpdateConsumerGroup",
"log:ConsumerGroupHeartBeat",
"log:ConsumerGroupUpdateCheckPoint",
"log:ListConsumerGroup",
"log:CreateConsumerGroup"
],
"Resource": [
"acs:log:*:*:project/ecs-project-log-services/logstore/ecs-log-metricstore/*"
],
"Effect": "Allow"
}
]
}

B. Open Firewall rule

Open firewall rule for url ap-southeast-1-intranet.log.aliyuncs.com on port 514

C. Create below script on ECS or Function compute

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# configure logging file
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=1001024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
"""
this consumer will forward logs to Splunk.
"""

def init(self, target_setting=None):
"""
"""


super(SyncData, self).__init__() # remember to call base's init

assert target_setting, ValueError("You need to configure settings of remote target")
assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

self.option = target_setting
self.protocol = self.option['protocol']
self.timeout = int(self.option.get('timeout', 120))
self.sep = self.option.get('sep', "||")
self.host = self.option["host"]
self.port = int(self.option.get('port', 514))
self.cert_path=self.option.get('cert_path', None)

# try connection
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
pass

def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

try:
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
for log in logs:
# Put your sync code here to send to remote.
# the format of log is just a dict with example as below (Note, all strings are unicode):
# Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
# suppose we only care about audit log
timestamp = datetime.fromtimestamp(int(log[u'__time__']))
del log['__time__']

io = six.StringIO()
first = True
for k, v in six.iteritems(log):
io.write("{0}{1}={2}".format(self.sep, k, v))

data = io.getvalue()
client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

except Exception as err:
logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
# TODO: add some error handling here or retry etc.
raise err

logger.info("Complete send data to remote")

self.save_checkpoint(check_point_tracker)


def get_monitor_option():
##########################
# Basic options
##########################

# load connection info env and consumer group name from envs
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')

assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group,
ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

##########################
# Some advanced options
##########################

# DON'T configure the consumer name especially when you need to run this program in parallel
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

# This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
# Could be "begin", "end", "specific time format in ISO", it's log receiving time.
cursor_start_time = "2019-1-1 0:0:0+8:00"

# once a client doesn't report to server heartbeat_interval 2 interval, server will consider it's offline and re-assign its task to another consumer.
# thus don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good.
heartbeat_interval = 20

# if the coming data source data is not so frequent, please don't configure it too small (<1s)
data_fetch_interval = 1

# create one consumer in the consumer group
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time,
heartbeat_interval=heartbeat_interval,
data_fetch_interval=data_fetch_interval)

# syslog options
settings = {
"host": "1.2.3.4", # must
"port": 514, # must, port
"protocol": "tcp", # must, tcp, udp, tls (py3 only)
"sep": "||", # must, separator for key=value
"cert_path": None, # optional, cert path when TLS is configured
"timeout": 120, # optional, default 120
"facility": syslogclient.FAC_USER, # optional, default None means syslogclient.FAC_USER
"severity": syslogclient.SEV_INFO, # optional, default None means syslogclient.SEV_INFO
"hostname": None, # optional, default hostname of local
"tag": None # optional, tag for the log, default -
}

return option, settings


def main():
option, settings = get_monitor_option()

logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)


if name == '__main__':
main()

D. Start job by executing above script.

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<Consumer group name, such as sync_data_to_qradar>

python3 sync_data.py