Configuring Logstash with Winston for Efficient Logging |

Configuring Logstash with Winston for Efficient Logging

Posted on Feb 6, 2024

Introduction to Winston and Logstash

Winston is a popular logging library for Node.js, known for its flexibility and ease of use. It allows developers to log messages in various transports (such as console, file, and remote services) with different levels of severity.

Logstash is a server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to a “stash” like Elasticsearch. It’s a powerful tool for managing logs and event data.

Creating a Custom Logstash Transport for Winston

To integrate Winston with Logstash, we’ll create a custom transport that sends log messages from Winston to Logstash over TCP. This approach provides a reliable and efficient way to process and analyze logs in real-time.

Step 1: Define the LogstashTransport Class

First, we need to create a custom transport class, LogstashTransport, which extends Winston’s Transport class. This custom transport will handle the connection to Logstash and the forwarding of log messages.

import * as Transport from 'winston-transport';
import { Socket } from 'net';

interface LogstashTransportOptions extends Transport.TransportStreamOptions {
  host: string;
  port: number;
}

class LogstashTransport extends Transport {
  private logstashHost: string;
  private logstashPort: number;
  private client: Socket;

  constructor(options: LogstashTransportOptions) {
    super(options);
    this.logstashHost = options.host;
    this.logstashPort = options.port;
    this.client = new Socket();

    this.client.on('error', (err: Error) => {
      console.error('Logstash connection error:', err);
      this.client.destroy();
      this.client = new Socket(); // Reinitialize the socket
    });

    this.client.connect(this.logstashPort, this.logstashHost, () => {
      console.log('Connected to Logstash');
    });
  }

  log(info: any, callback: () => void): void {
    const message = JSON.stringify(info);
    if (!this.client.destroyed) {
      this.client.write(message + '\n', callback);
    } else {
      console.error('Logstash client is not connected.');
      callback();
    }
  }
}

Step 2: Configure Winston to Use the Custom Transport

After defining the custom transport, we need to configure Winston to use it. We’ll do this by creating a logger instance and adding the LogstashTransport to its transports.

import { createLogger } from 'winston';

export function startLogger() {
  const logger = createLogger({
    transports: [
      new LogstashTransport({
        host: process.env.LOGSTASH_HOST,
        port: +process.env.LOGSTASH_PORT,
      }),
    ],
  });

  return logger;
}

Configuring Logstash

To receive and process logs sent by Winston, Logstash needs to be configured to listen for incoming TCP connections on the specified port. Below is a minimal Logstash configuration that listens on port 5000 and forwards logs to Elasticsearch.

Create a file called logstash.conf and Add any filters in filter block for additional processing.


input {
  tcp {
    port => 5000
    codec => json_lines
  }
}

filter {
}

output {
  elasticsearch {
    index => "logstash-%{+YYYY.MM.dd}"
    hosts => "${ELASTIC_HOSTS}"
    user => "${ELASTIC_USER}"
    password => "${ELASTIC_PASSWORD}"
    cacert => "certs/ca/ca.crt"
  }
}

Additional Config

docker-compose.yml

version: '3.8'

services:
  setup:
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
    user: "0"
    command: >
      bash -c '
        if [ x${ELASTIC_PASSWORD} == x ]; then
          echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
          exit 1;
        elif [ x${KIBANA_PASSWORD} == x ]; then
          echo "Set the KIBANA_PASSWORD environment variable in the .env file";
          exit 1;
        fi;
        if [ ! -f config/certs/ca.zip ]; then
          echo "Creating CA";
          bin/elasticsearch-certutil ca --silent --pem -out config/certs/ca.zip;
          unzip config/certs/ca.zip -d config/certs;
        fi;
        if [ ! -f config/certs/certs.zip ]; then
          echo "Creating certs";
          echo -ne \
          "instances:\n"\
          "  - name: es01\n"\
          "    dns:\n"\
          "      - es01\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          "  - name: kibana\n"\
          "    dns:\n"\
          "      - kibana\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          > config/certs/instances.yml;
          bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
          unzip config/certs/certs.zip -d config/certs;
        fi;
        echo "Setting file permissions"
        chown -R root:root config/certs;
        find . -type d -exec chmod 750 \{\} \;;
        find . -type f -exec chmod 640 \{\} \;;
        echo "Waiting for Elasticsearch availability";
        until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
        echo "Setting kibana_system password";
        until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
        echo "All done!";
      '      
    healthcheck:
      test: ["CMD-SHELL", "[ -f config/certs/es01/es01.crt ]"]
      interval: 1s
      timeout: 5s
      retries: 120

  es01:
    depends_on:
      setup:
        condition: service_healthy
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    labels:
      co.elastic.logs/module: elasticsearch
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata01:/usr/share/elasticsearch/data
    ports:
      - ${ES_PORT}:9200
    environment:
      - node.name=es01
      - cluster.name=${CLUSTER_NAME}
      - discovery.type=single-node
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es01/es01.key
      - xpack.security.http.ssl.certificate=certs/es01/es01.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es01/es01.key
      - xpack.security.transport.ssl.certificate=certs/es01/es01.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
    mem_limit: ${ES_MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120

  kibana:
     depends_on:
       es01:
         condition: service_healthy
     image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
     labels:
       co.elastic.logs/module: kibana
     volumes:
       - certs:/usr/share/kibana/config/certs
       - kibanadata:/usr/share/kibana/data
     ports:
       - ${KIBANA_PORT}:5601
     environment:
       - SERVERNAME=kibana
       - ELASTICSEARCH_HOSTS=https://es01:9200
       - ELASTICSEARCH_USERNAME=kibana_system
       - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
       - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
       - XPACK_SECURITY_ENCRYPTIONKEY=${ENCRYPTION_KEY}
       - XPACK_ENCRYPTEDSAVEDOBJECTS_ENCRYPTIONKEY=${ENCRYPTION_KEY}
       - XPACK_REPORTING_ENCRYPTIONKEY=${ENCRYPTION_KEY}
     mem_limit: ${KB_MEM_LIMIT}
     healthcheck:
       test:
         [
           "CMD-SHELL",
           "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
         ]
       interval: 10s
       timeout: 10s
       retries: 120

  logstash01:
    ports:
      - 5000:5000
    depends_on:
      es01:
        condition: service_healthy
      kibana:
        condition: service_healthy
    image: docker.elastic.co/logstash/logstash:${STACK_VERSION}
    labels:
      co.elastic.logs/module: logstash
    user: root
    volumes:
      - certs:/usr/share/logstash/certs
      - logstashdata01:/usr/share/logstash/data
      - "./logstash_ingest_data/:/usr/share/logstash/ingest_data/"
      - "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
    environment:
      - xpack.monitoring.enabled=false
      - ELASTIC_USER=elastic
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - ELASTIC_HOSTS=https://es01:9200


  nodejs:
    image: local/app:latest
    ports: 
      - "3030:3030"
    restart: always
    environment:
      # PORT
      PORT: 3030

      # OTHERS
      LOGSTASH_HOST: logstash01
      LOGSTASH_PORT: 5000

volumes:
  certs:
    driver: local
  esdata01:
    driver: local
  kibanadata:
    driver: local
  logstashdata01:
   driver: local

networks:
 default:
   name: elastic
   external: false

.env

# Project namespace (defaults to the current folder name if not set)
#COMPOSE_PROJECT_NAME=myproject

# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD=changeme

# Password for the 'kibana_system' user (at least 6 characters)
KIBANA_PASSWORD=changeme

# Version of Elastic products
STACK_VERSION=8.7.1

# Set the cluster name
CLUSTER_NAME=docker-cluster

# Set to 'basic' or 'trial' to automatically start the 30-day trial
LICENSE=basic
#LICENSE=trial

# Port to expose Elasticsearch HTTP API to the host
ES_PORT=9200

# Port to expose Kibana to the host
KIBANA_PORT=5601

# Increase or decrease based on the available host memory (in bytes)
ES_MEM_LIMIT=1073741824
KB_MEM_LIMIT=1073741824
LS_MEM_LIMIT=1073741824

# SAMPLE Predefined Key only to be used in POC environments
ENCRYPTION_KEY=c34d38b3a14956121ff2170e5030b471551370178f43e5626eec58b04a30fae2 # Change it