Hacer frente a los registros sin sentido con Kafka
*Este es el tercer artículo de una serie acerca de la inteligencia sobre amenazas eficiente. Consulta la [parte 1](https://www.fastly.com/blog/lean-threat-intelligence-part-1-plan) y la [parte 2](https://www.fastly.com/blog/lean-threat-intelligence-part-2-foundation).*
En [Inteligencia sobre amenazas eficiente \(parte 2\): la base](https://www.fastly.com/blog/lean-threat-intelligence-part-2-foundation), explicamos cómo creamos nuestro sistema de gestión de registros, Graylog, con Chef. A continuación, vamos a ver cómo creamos una canalización de mensajes que nos permite enrutar los mensajes hacia diferentes puntos de conexión para analizarlos o enriquecerlos. Alojar únicamente Graylog tiene una limitación, y es que debes configurar todos tus hosts para que envíen mensajes a la instancia de Graylog, y no puedes modificar, enriquecer ni inspeccionar ninguno de estos registros durante el proceso. En esta entrada del blog voy a explicar en detalle cómo abordamos ese problema implementando una cola de mensajes de Kafka, que formaba parte de nuestro diagrama de sistemas original:
![systems](//images.contentful.com/6pk8mg3yh2ee/4MbYA7VwgEyyK0aWOyQsuA/1f88db5a85742f440bfa8ac00d28a04b/systems.png)
En esta entrada, contaré por qué las colas de mensajes son un componente necesario para mover una inmensa cantidad de datos a un punto centralizado. La cola de mensajes de Kafka no solo logra este objetivo, sino que también ayuda a Fastly a buscar en los registros los indicadores de riesgo antes de que se inserten en Graylog.
### Colas de mensajes
En el contexto de nuestro diagrama de sistemas, una cola de mensajes nos permite centralizar los registros como mensajes en un solo servicio. En cuanto llegan al servicio en cuestión, hay diversos resultados posibles. Esto tiene varias ventajas para nuestra configuración de clústeres de registro:
1. **Escalabilidad:** a medida que aumenta el volumen de registros/mensajes, podemos agregar más nodos para reforzar el rendimiento de la cola de mensajes.
2. **Servicios de desvinculación:** los mensajes se pueden enviar o extraer de la cola. Este punto central de recopilación es muy interesante para los consumidores y los productores de datos porque *casi todo* depende de la integración con la cola de mensajes, y no tienes que crear integraciones con cada uno de los distintos servicios.
3. **Canalizaciones lógicas:** en cuanto al último punto, podría haber muchas partes interesadas en usar un fragmento de datos de tu cola de mensajes. Algunas de ellas también podrían querer insertar de nuevo en la cola un mensaje enriquecido o modificado, en el que también estuviera interesado otro servicio posterior. Estas canalizaciones son fáciles de entender si se vuelve a la cola una y otra vez, en lugar de crear una canalización confusa y complicada con muchas dependencias.
Si quieres más información sobre las colas de mensajes, encontrarás buenas explicaciones en los sitios de cloudamqp1 e IBM2.
Después de valorar varias tecnologías de cola de mensajes, nos decantamos por Kafka. Kafka tiene una historia plagada de éxitos en empresas de enorme prestigio3, pero la elegimos principalmente por su modelo publish/subscribe4, su estrategia de cola de registros5, su rendimiento6 y su facilidad de integración con Graylog y con otras tecnologías que utilizamos. Toda esta funcionalidad representa una ventaja para la escalabilidad de Fastly y la gran cantidad de datos que tenemos que procesar. Datadog7 tiene una fantástica introducción a Kafka y sus características, pero por ahora vamos a pasar directamente a cómo crear un nodo de Kafka.
### Desplegar Kafka mediante Vagrant
Añade las siguientes líneas \(o crea una nueva para probar Kafka\) al archivo Berksfile de la [parte 2](https://www.fastly.com/blog/lean-threat-intelligence-part-2-foundation):
cookbook 'apt'
cookbook 'runit'
cookbook 'apache_kafka'
cookbook 'zookeeper', git: 'git://github.com/evertrue/zookeeper-cookbook'
Usaremos la guía `apache_kafka` ascendente y la guía `zookeeper` de `evertrue`. Con esto tenemos `recipes` suficientes para iniciar un despliegue de un nodo único de Kafka.
Añade las siguientes líneas al archivo Vagrantfile:
# -* -mode: ruby -*-
# vi: set ft=ruby :
Vagrant.configure(2) do |config|
config.vm.define 'kafka01' do |kafka_config|
kafka_config.vm.box = 'ubuntu/trusty64'
kafka_config.berkshelf.enabled = true
kafka_config.berkshelf.berksfile_path = './Berksfile'
kafka_config.vm.network 'private_network', ip: '192.168.50.101'
kafka_config.vm.provider 'virtualbox' do |v|
v.memory = 512
end
kafka_config.vm.provision :chef_solo do |chef|
chef.add_recipe('apt')
chef.add_recipe('java')
chef.add_recipe('zookeeper')
chef.add_recipe('zookeeper::service')
chef.add_recipe('apache_kafka')
chef.json = {
'apt': {
'compile_time_update': true
},
'java': {
'oracle': {
'accept_oracle_download_terms': true,
},
'install_flavor': 'oracle',
'jdk_version': 8
},
'apache_kafka': {
'scala_version': '2.11',
'version': '0.9.0.0',
'checksum': '6e20a86cb1c073b83cede04ddb2e92550c77ae8139c4affb5d6b2a44447a4028',
'md5_checksum': '084fb80cdc8c72dc75bc3519a5d2cc5c'
}
}
end
end
end
Esto generará un virtualbox vm en Ubuntu 14\.04, actualizará el equipo mediante `apt-get update`, e instalará Java, Zookeeper y Kafka. Cada nodo de Kafka envía estadísticas como el estado del nodo, información sobre topics e información sobre el consumidor a Zookeeper. Si quieres saber más sobre esta relación, consulta el artículo7 de Datadog.
En cuanto tengas listos los archivos Vagrantfile y Berksfile, escribe `vagrant up kafka01`. Inicia sesión en `vagrant ssh kafka01` y ejecuta `sudo netstat -apunt`. Deberías ver a Zookeeper a la escucha en el puerto 2181 y a Kafka en el puerto 9092\. Verifica la conexión de Kafka con Zookeeper. Para ello, comprueba si hay un puerto etéreo en el equipo local conectado al puerto 2181\.
vagrant@vagrant-ubuntu-trusty-64:~$ sudo netstat -apunt | grep 2181
tcp6 0 0 :::2181 :::* LISTEN 15745/java
tcp6 0 0 127.0.0.1:51867 127.0.0.1:2181 ESTABLISHED 15806/java
tcp6 0 0 127.0.0.1:2181 127.0.0.1:51867 ESTABLISHED 15745/java
### Lectura y escritura
Dado que Kafka es un gestor de registros y una cola de mensajes, vamos a enviarle algunos registros para que los almacene como mensajes. Kafka utiliza el paradigma publish/subscribe, con el que se publican mensajes en un `topic` al que hay que suscribirse para recibirlos. Los topics pueden crearse de forma automática cuando un lector o un escritor se conectan a Kafka, pero vamos a crear uno de manera explícita.
Primero, escribe `sudo /usr/local/kafka/kafka_2.11-0.9.0.0/bin/kafka-topics.sh --create -topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181`
A continuación, escribe `sudo /usr/local/kafka/kafka_2.11-0.9.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181`
Deberías ver lo siguiente:
vagrant@vagrant-ubuntu-trusty-64:~$ sudo /usr/local/kafka/kafka_2.11-0.9.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Este resultado se explica claramente en el sitio web8 de Kafka:
>
> Vamos a explicar el resultado. La primera línea contiene un resumen de todas las particiones, y cada línea adicional ofrece información sobre una partición. Como solo tenemos una partición para este topic, solo hay una línea.
> El «leader» \(líder\) es el nodo responsable de todas las lecturas y escrituras de una partición concreta. Cada nodo será el líder de una parte de las particiones seleccionada al azar.
> «replicas» es la lista de nodos que replican el registro de esta partición independientemente de que sean el «leader» o incluso de si están activos.
> «sr» es el conjunto de réplicas «in\-sync». Se trata del subconjunto de la lista de réplicas que están activas y al alcance del líder.
Para interactuar con Kafka tanto para la lectura como para la escritura, me gusta usar `kafkacat`. La línea de comandos consumer/producer puede contener errores y `kafkacat` tiene un potente conjunto de herramientas de argumentos de línea de comandos que facilita muchísimo la interacción. El siguiente fragmento instala git, las bibliotecas necesarias de Kafka, clona el código, lo genera y lo inserta en `/bin`.
sudo apt-get install git librdkafka-dev libyajl-dev -y
git clone https://github.com/edenhill/kafkacat.git
cd kafkacat
./bootstrap.sh
sudo mv kafkacat /bin
Escribe un mensaje JSON en un archivo `echo '{"msg":"foo"}' >> foo` y, a continuación, escríbelo en tu clúster de Kafka `kafkacat -P -b localhost -t test -p 0 foo`. Esto especifica un agente en localhost, un topic específico \(en nuestro caso `test`\), una partición y un nombre de archivo. Para leer los mensajes de este topic, ejecuta `kafkacat -b localhost -t test`. Debería devolvernos lo siguiente:
![kafkacat](//images.contentful.com/6pk8mg3yh2ee/4Taczhm33qYKomGyMMKsgI/c87ced0fcd09e21c00984fb25d520c38/kafkacat.png)
\* Nota: Tengo dos mensajes en el topic como resultado de la prueba, pero cada ejecución del primer comando `kafkacat` colocará un mensaje en la cola.
En este resultado puede observarse lo siguiente: los mensajes en la cola, empezando desde el primer mensaje en offset 0 hasta el último mensaje en offset N, donde N es el número total de mensajes producidos en el topic. Cuando colocamos un mensaje en un topic, se le asigna un offset dentro de la cola de mensajes. Esto puede ser confuso, porque en las colas tradicionales de informática no se suelen rastrear los offsets ni solemos ocuparnos de ellos. Como lector de Kafka, puedes especificar un punto de comprobación en un topic con un offset determinado y empezar a leer a partir de ese offset. Este mecanismo ayuda a prevenir la pérdida de datos y los errores en caso de que se interrumpa el servicio. Tu consumidor puede recibir un offset en el pasado y empezar a trabajar a partir de esa posición hasta el final de la lista. En teoría, en el futuro podremos almacenar mensajes de Kafka durante un largo periodo de tiempo y realizar análisis detallados de los datos de registro.
### Kafka y Graylog
La integración de Kafka con Graylog es nativa, por lo que puedes consumir de inmediato mensajes de topics de Kafka con una nueva entrada, y Graylog la gestionará en segundo plano. Para habilitar la entrada de Kafka, ve a System \-> Inputs.
![graylog](//images.contentful.com/6pk8mg3yh2ee/1aYnf8VukGACUcsAYssY4u/bab9eced50b9c30a2830ac546d97fa90/graylog.png)
En el menú desplegable «Launch new input», escribe «Kafka» y verás tres tipos de entrada de Kafka. La opción que ahora necesitamos es Raw/Plaintext de Kafka; selecciona esta opción e indica los servidores de Zookeeper con los puertos en forma de lista CSV. En «Topic Filter Regex», indica el topic del que quieres leer y haz clic en «Launch». Para ver los mensajes que lleguen de Kafka, haz clic en «Show received messages» y, a continuación, podrás leer el cuerpo del mensaje y aplicar transformaciones para poder hacer búsquedas en los campos.
### Usos de Kafka en la inteligencia sobre amenazas
Este tipo de cola de mensajes tiene sus ventajas para los programas de inteligencia sobre amenazas, ya que puedes usarla como intermediaria entre tu SIEM y los diferentes puntos de conexión de recopilación de registros. Los recopiladores de registros producen datos en un topic de Kafka mientras que tu SIEM es consumidor de cada uno de esos topics. Eso facilita la búsqueda de datos en tu SIEM, porque puedes afinar la búsqueda de topics de Kafka, que pueden representar un tipo de registro.
Además, la relación entre el productor y el consumidor dentro de Kafka no tiene por qué ser de uno a uno. Un mismo topic puede ser leído por muchos consumidores. Por ejemplo, puedes establecer un topic de Kafka que se llame «ssh\-logins» para que lo consuma el departamento de seguridad y el departamento de TI. Ambos departamentos tienen consumidores independientes que realizan un seguimiento de sus propios offsets dentro de la cola y que pueden hacer lo que quieran con los datos que consumen.
Y, por último, Kafka se puede utilizar como una canalización de enriquecimiento. A partir de los puntos anteriores, se puede establecer una canalización de topics, consumidores y productores que enriquezcan los datos antes de que lleguen a un SIEM. Existen varias formas de enriquecer datos: añadir datos sobre reputación de IP, DNS pasivos o información de hash de archivo recopilada con antelación a partir de fuentes de distribución de datos gratuitas o comerciales. Además, los consumidores de Kafka pueden leer los mensajes, procesarlos y luego generar sus propias alertas observando los comportamientos estadísticos o los mensajes entre los dispositivos. Fastly utiliza Kafka como cola de mensajes intermediaria entre Graylog y otros servicios de enriquecimiento. Como hemos visto, en el diagrama de sistemas colocamos Kafka entre el servicio de procesamiento de flujos y nuestro gestor de registros Graylog.
![systems](//images.contentful.com/6pk8mg3yh2ee/4MbYA7VwgEyyK0aWOyQsuA/1f88db5a85742f440bfa8ac00d28a04b/systems.png)
Podemos usar los topics para crear canalizaciones de procesamiento que permitan enriquecer continuamente los datos de Kafka hasta que lleguen al final de una canalización, que no es más que otro topic de Kafka que consume Graylog. Veamos este ejemplo de canalización:
Registros del WAF \-> Syslog \-> Envío a Kafka \-> Extracción de dirección IP \-> Enriquecimiento mediante la base de datos de inteligencia sobre amenazas \-> Envío de vuelta a Kafka \-> Envío a Graylog
Un mensaje de syslog de un registro del WAF \(por ejemplo, un registro de cadena de agente\-usuario bloqueado\) se puede enviar al topic de Kafka «waf\-logs». Un consumidor puede suscribirse a «waf\-logs» y usar regexes para extraer las direcciones IP de ese registro bloqueado. Ese mismo consumidor puede consultar una base de datos para enriquecer el registro del WAF con información contextual de reputación sobre la combinación usuario\-agente/IP. El consumidor envía el registro enriquecido a un topic aparte, «enriched\-waf\-log», donde Graylog puede consumir el registro y añadirlo a sus índices. Por último, Graylog puede configurarse para que envíe alertas sobre cualquier mensaje consumido de «enriched\-waf\-log». Como resultado, el registro en Graylog tendrá información contextual y enriquecida sobre el evento, lo que facilita a los miembros de operaciones la respuesta al evento.
### A continuación: enviar registros a Kafka
Ahora que ya has cargado las recetas de Kafka en tu archivo Berksfile y has insertado la definición Vagrant del entorno de Kafka, puedes habilitar las entradas de Kafka en tu instancia de Graylog, así como inspeccionar los datos en Graylog y aplicar transformaciones para extraer los mensajes y que puedan buscarse fácilmente en Graylog.
En mi próxima entrada hablaré sobre cómo enviar registros fácilmente a Kafka desde tus hosts de syslog. Decidir cómo quieres hacer llegar los registros a Kafka puede ser una toda una aventura. Existen bibliotecas de código y complementos de syslog para escribir datos en un topic, pero también hay que ocuparse de la seguridad de esos registros cuando están en tránsito. Fastly utiliza Filebeat y Logstash para enviar y transformar los registros en un formato presentable en Kafka. Esto permite a nuestros consumidores de Kafka descendentes extraer datos relevantes de los registros para realizar el enriquecimiento, lo que, a su vez, facilita la búsqueda en Graylog. ¡No os lo perdáis\!
Referencias \(pueden estar en inglés\)
1[https://www.cloudamqp.com/blog/2014\-12\-03\-what\-is\-message\-queuing.html](https://www.cloudamqp.com/blog/2014-12-03-what-is-message-queuing.html)
2[https://www.ibm.com/support/knowledgecenter/SSFKSJ\_8\.0\.0/com.ibm.mq.pro.doc/q002620\_.htm](https://www.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.pro.doc/q002620_.htm)
3[https://cwiki.apache.org/confluence/display/KAFKA/Powered\+By](https://cwiki.apache.org/confluence/display/KAFKA/Powered+By)
4[http://kafka.apache.org/documentation.html\#intro\_topics](http://kafka.apache.org/documentation.html#intro_topics)
5[http://kafka.apache.org/documentation.html\#uses\_logs](http://kafka.apache.org/documentation.html#uses_logs)
6[https://engineering.linkedin.com/kafka/benchmarking\-apache\-kafka\-2\-million\-writes\-second\-three\-cheap\-machines](https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)
7[https://www.datadoghq.com/blog/monitoring\-kafka\-performance\-metrics/](https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/)
8http://kafka.apache.org/081/quickstart.html
9[http://kafka.apache.org/documentation.html\#intro\_distribution](http://kafka.apache.org/documentation.html#intro_distribution)