diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..4646018 Binary files /dev/null and b/.DS_Store differ diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index dfe0770..0000000 --- a/.gitattributes +++ /dev/null @@ -1,2 +0,0 @@ -# Auto detect text files and perform LF normalization -* text=auto diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc65922 --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ + +# Real-Time Data Pipeline with Dockerized Services + +## Overview + +This project encapsulates a real-time data processing and visualization pipeline within a Docker ecosystem. The core functionality simulates a random walk data generation through a Python application. This data is then streamed into a Kafka cluster, consisting of 3 brokers for robust data handling and redundancy. Telegraf acts as a data collection and publishing agent that subscribes to Kafka topics and persists the data into InfluxDB, a time-series database optimized for high-write loads and real-time analytics. Finally, Grafana, a leading open-source platform for monitoring and visualization, reads the stored data from InfluxDB and presents it in a user-friendly dashboard format. + +## Architecture + +![Random Walk Simulation](architecture.png "Random Walk Data Pipeline's Architecture") +- **Python Application**: Dockerized service simulating random walk data generation, with the output being a stream of data points. +- **Kafka**: A high-throughput distributed messaging system that manages the data stream provided by the Python application. Implemented with 3 brokers to ensure data availability and fault tolerance. +- ** **: A UI interface to visualize what's happening inside Kafka. +- **Telegraf**: Ingests data from Kafka and writes to InfluxDB, ensuring that the data flow is seamless and efficient. +- **InfluxDB**: Stores the time-series data and provides capabilities for fast data retrieval, which is essential for real-time analytics. +- **Grafana**: Connects to InfluxDB and provides real-time visualization of the data, with capabilities for creating comprehensive dashboards, setting alerts, and exploring metrics. + +## Workflow + +1. The Dockerized Python application generates the data and publishes it to Kafka. +2. Kafka receives the data and ensures it's distributed appropriately across the brokers, maintaining the data's integrity and availability. +3. Telegraf, configured within the same Docker network, subscribes to Kafka topics, collects the data, and writes it into InfluxDB. +4. InfluxDB, running as a Docker service, stores the data with high write and read efficiency. +5. Grafana, also part of the Docker composition, reads the data from InfluxDB and provides a powerful interface for data visualization. + +## Docker Composition + +The project is orchestrated using Docker Compose, which manages the lifecycle of all the services involved. Each service is containerized, ensuring consistency across different environments and simplifying the setup and scaling processes. The `docker-compose.yml` file includes the configuration for each service, including environment variables, volume mappings, network settings, and dependencies. + +## Getting Started + +To deploy this project: + +1. Clone the repository to your local machine. +2. Navigate to the directory containing the `docker-compose.yml` file. +3. Run `docker-compose up` to start all the services. +4. Once the services are up, access Grafana's web interface to visualize the data. + +## Dependencies + +- Docker and Docker Compose must be installed on the machine where the project is deployed. + +## Contributing + +Contributions to the project are welcome. Contributors can clone the repository, make their changes, and submit a pull request for review. + +## Conclusion + +This project demonstrates a complete end-to-end real-time data pipeline, showcasing the power of Dockerized applications for data generation, streaming, storage, and visualization. It provides a template for similar applications where real-time data processing is required. + +For detailed instructions, please refer to the README.md file within the project repository. diff --git a/app/Dockerfile b/app/Dockerfile new file mode 100644 index 0000000..1a029f2 --- /dev/null +++ b/app/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.10 + +WORKDIR /usr/src/app + +RUN pip install confluent-kafka + +COPY main.py . + +CMD ["python", "main.py"] diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..929346a --- /dev/null +++ b/app/main.py @@ -0,0 +1,28 @@ +from random import uniform +from time import sleep +from confluent_kafka import Producer +from json import dumps + +def delivery_report(err, msg): + if err is not None: + print(f"An error occured: {err}") + else: + print(f'Message send to {msg.topic()} [{msg.partition()}]') + +conf = { + 'bootstrap.servers': 'kafka1:9092', +} + +producer = Producer(conf) +prob = [0.33, 0.66] +walk = 0 +while True: + n = uniform(0, 1) + if n < prob[0] : + walk-=1 + elif n > prob[1] : + walk += 1 + data = dumps({"walk" : walk}) + producer.produce('random_walk', data.encode('utf-8'), callback=delivery_report) + producer.poll(0) + sleep(5) \ No newline at end of file diff --git a/app/requirements.txt b/app/requirements.txt new file mode 100644 index 0000000..10b3bdd --- /dev/null +++ b/app/requirements.txt @@ -0,0 +1 @@ +confluent-kafka==1.8.2 \ No newline at end of file diff --git a/architecture.png b/architecture.png new file mode 100644 index 0000000..ef674b1 Binary files /dev/null and b/architecture.png differ diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8f87ecc --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,118 @@ +services: + + app: + build: ./app + container_name: python_app + + kafka1: + image: confluentinc/cp-kafka:7.2.1 + container_name: kafka1 + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + volumes: + - ./kafka/run_workaround.sh:/tmp/run_workaround.sh + command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'" + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka1:9092"] + interval: 30s + timeout: 10s + retries: 5 + + + kafka2: + image: confluentinc/cp-kafka:7.2.1 + container_name: kafka2 + environment: + KAFKA_NODE_ID: 2 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' + volumes: + - ./kafka/run_workaround.sh:/tmp/run_workaround.sh + command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'" + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka2:9092"] + interval: 30s + timeout: 10s + retries: 5 + + kafka3: + image: confluentinc/cp-kafka:7.2.1 + container_name: kafka3 + environment: + KAFKA_NODE_ID: 3 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka3:9092,CONTROLLER://kafka3:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' + volumes: + - ./kafka/run_workaround.sh:/tmp/run_workaround.sh + command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'" + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka3:9092"] + interval: 30s + timeout: 10s + retries: 5 + +#Ici nous définissons un service nous permettant d'avoir une interface graphique pour kafka + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: kafka-ui + ports: + - "8888:8080" # Port pour accéder à Kafka UI + environment: + KAFKA_CLUSTERS_0_NAME: "local" # Nom du cluster + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:9092,kafka2:9092,kafka3:9092" # Brokers Kafka + depends_on: + - kafka1 + - kafka2 + - kafka3 + + telegraf: + image: telegraf:1.19 + volumes: + - ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:rw + env_file: + - ./telegraf/.env + - ./influxdb/.env + + influxdb: + image: influxdb:2.1.1 + volumes: + - ./influxdb/entrypoint.sh:/docker-entrypoint-initdb.d/entrypoint.sh:ro + - influxdb:/var/lib/influxdb2:rw + env_file: + - ./influxdb/.env + entrypoint: ["./entrypoint.sh"] + restart: on-failure:10 + ports: + - 8086:8086 + + grafana: + image: grafana/grafana-oss:8.4.3 + env_file: + - ./grafana/.env + volumes: + - grafana-storage:/var/lib/grafana:rw + depends_on: + - influxdb + ports: + - ${GRAFANA_PORT}:3000 + +volumes: + influxdb: + grafana-storage: + + \ No newline at end of file diff --git a/grafana/.env b/grafana/.env new file mode 100644 index 0000000..38c5fc7 --- /dev/null +++ b/grafana/.env @@ -0,0 +1 @@ +GRAFANA_PORT=3000 \ No newline at end of file diff --git a/influxdb/.env b/influxdb/.env new file mode 100644 index 0000000..c06ea9a --- /dev/null +++ b/influxdb/.env @@ -0,0 +1,18 @@ +DOCKER_INFLUXDB_INIT_MODE=setup +DOCKER_INFLUXDB_INIT_CLI_CONFIG_NAME=configname + + +DOCKER_INFLUXDB_INIT_USERNAME=admin +DOCKER_INFLUXDB_INIT_PASSWORD=adminadmin +#To generate a new token you can use openssl rand -base64 32 +DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=yxbiitUY9gktf5F0pHOdJGqbrzKuzqwhFiKZuboCD80= + + +DOCKER_INFLUXDB_INIT_ORG=myorg +DOCKER_INFLUXDB_INIT_BUCKET=my_first_bucket + +#The retention date determine how long will the data stay in the db +DOCKER_INFLUXDB_INIT_RETENTION=4d + +DOCKER_INFLUXDB_INIT_PORT=8086 +DOCKER_INFLUXDB_INIT_HOST=influxdb \ No newline at end of file diff --git a/influxdb/entrypoint.sh b/influxdb/entrypoint.sh new file mode 100755 index 0000000..c347389 --- /dev/null +++ b/influxdb/entrypoint.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Protects script from continuing with an error +set -eu -o pipefail + +# Check if InfluxDB is already set up +if [ -f "/var/lib/influxdb2/influxd.bolt" ]; then + echo "InfluxDB is already set up." + exit 0 +fi + + +# Ensures environment variables are set +export DOCKER_INFLUXDB_INIT_MODE=$DOCKER_INFLUXDB_INIT_MODE +export DOCKER_INFLUXDB_INIT_USERNAME=$DOCKER_INFLUXDB_INIT_USERNAME +export DOCKER_INFLUXDB_INIT_PASSWORD=$DOCKER_INFLUXDB_INIT_PASSWORD +export DOCKER_INFLUXDB_INIT_ORG=$DOCKER_INFLUXDB_INIT_ORG +export DOCKER_INFLUXDB_INIT_BUCKET=$DOCKER_INFLUXDB_INIT_BUCKET +export DOCKER_INFLUXDB_INIT_RETENTION=$DOCKER_INFLUXDB_INIT_RETENTION +export DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=$DOCKER_INFLUXDB_INIT_ADMIN_TOKEN +export DOCKER_INFLUXDB_INIT_PORT=$DOCKER_INFLUXDB_INIT_PORT +export DOCKER_INFLUXDB_INIT_HOST=$DOCKER_INFLUXDB_INIT_HOST + +# Conducts initial InfluxDB using the CLI +influx setup --skip-verify --bucket ${DOCKER_INFLUXDB_INIT_BUCKET} --retention ${DOCKER_INFLUXDB_INIT_RETENTION} --token ${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN} --org ${DOCKER_INFLUXDB_INIT_ORG} --username ${DOCKER_INFLUXDB_INIT_USERNAME} --password ${DOCKER_INFLUXDB_INIT_PASSWORD} --host http://${DOCKER_INFLUXDB_INIT_HOST}:8086 --force diff --git a/kafka/run_workaround.sh b/kafka/run_workaround.sh new file mode 100755 index 0000000..305a152 --- /dev/null +++ b/kafka/run_workaround.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure + +sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure + +echo "kafka-storage format --ignore-formatted -t NqnEdODVKkiLTfJvqd1uqQ== -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure \ No newline at end of file diff --git a/telegraf/.env b/telegraf/.env new file mode 100644 index 0000000..c5fcebe --- /dev/null +++ b/telegraf/.env @@ -0,0 +1 @@ +DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=yxbiitUY9gktf5F0pHOdJGqbrzKuzqwhFiKZuboCD80= \ No newline at end of file diff --git a/telegraf/telegraf.conf b/telegraf/telegraf.conf new file mode 100644 index 0000000..d204fd2 --- /dev/null +++ b/telegraf/telegraf.conf @@ -0,0 +1,113 @@ +# Telegraf Configuration +# +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs, and sent to the declared outputs. +# +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. +# +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config +# file would generate. +# +# Environment variables can be used anywhere in this config file, simply surround +# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"), +# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR}) + + +# Global tags can be specified here in key="value" format. +[global_tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + ## Environment variables can be used as tags, and throughout the config file + # user = "$USER" + + +# Configuration for telegraf agent +[agent] + ## Default data collection interval for all inputs + interval = "10s" + ## Rounds collection interval to 'interval' + ## ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + + ## Telegraf will send metrics to outputs in batches of at most + ## metric_batch_size metrics. + ## This controls the size of writes that Telegraf sends to output plugins. + metric_batch_size = 1000 + + ## Maximum number of unwritten metrics per output. Increasing this value + ## allows for longer periods of output downtime without dropping metrics at the + ## cost of higher maximum memory usage. + metric_buffer_limit = 10000 + + ## Collection jitter is used to jitter the collection by a random amount. + ## Each plugin will sleep for a random time within jitter before collecting. + ## This can be used to avoid many plugins querying things like sysfs at the + ## same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + ## Collection offset is used to shift the collection by the given amount. + ## This can be be used to avoid many plugins querying constraint devices + ## at the same time by manually scheduling them in time. + # collection_offset = "0s" + + ## Default flushing interval for all outputs. Maximum flush_interval will be + ## flush_interval + flush_jitter + flush_interval = "10s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + ## Collected metrics are rounded to the precision specified. Precision is + ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s). + ## Valid time units are "ns", "us" (or "µs"), "ms", "s". + ## + ## By default or when set to "0s", precision will be set to the same + ## timestamp order as the collection interval, with the maximum being 1s: + ## ie, when interval = "10s", precision will be "1s" + ## when interval = "250ms", precision will be "1ms" + ## + ## Precision will NOT be used for service inputs. It is up to each individual + ## service input to set the timestamp at the appropriate precision. + precision = "" + + + ## Override default hostname, if empty use os.Hostname() + hostname = "" + ## If set to true, do no set the "host" tag in the telegraf agent. + omit_hostname = false + +############################################################################### +# OUTPUT PLUGINS # +############################################################################### + + + +[[outputs.influxdb_v2]] + ## The URLs of the InfluxDB cluster nodes. + ## + ## Multiple URLs can be specified for a single cluster, only ONE of the + ## urls will be written to each interval. + ## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] + urls = ["http://${DOCKER_INFLUXDB_INIT_HOST}:8086"] + + ## Token for authentication. + token = "$DOCKER_INFLUXDB_INIT_ADMIN_TOKEN" + + ## Organization is the name of the organization you wish to write to; must exist. + organization = "$DOCKER_INFLUXDB_INIT_ORG" + + ## Destination bucket to write into. + bucket = "$DOCKER_INFLUXDB_INIT_BUCKET" + + +[[inputs.kafka_consumer]] + ## Kafka brokers. + brokers = ["kafka1:9092","kafka2:9092","kafka3:9092"] + consumer_group = "telegraf" + ## Topics to consume. + topics = ["random_walk"] + offset = "newest" + data_format = "value" + data_type = "string" \ No newline at end of file