This commit is contained in:
azertop 2024-01-01 19:05:40 +01:00
parent 7462aaa375
commit 17578c3441
14 changed files with 372 additions and 2 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

2
.gitattributes vendored
View File

@ -1,2 +0,0 @@
# Auto detect text files and perform LF normalization
* text=auto

51
README.md Normal file
View File

@ -0,0 +1,51 @@
# Real-Time Data Pipeline with Dockerized Services
## Overview
This project encapsulates a real-time data processing and visualization pipeline within a Docker ecosystem. The core functionality simulates a random walk data generation through a Python application. This data is then streamed into a Kafka cluster, consisting of 3 brokers for robust data handling and redundancy. Telegraf acts as a data collection and publishing agent that subscribes to Kafka topics and persists the data into InfluxDB, a time-series database optimized for high-write loads and real-time analytics. Finally, Grafana, a leading open-source platform for monitoring and visualization, reads the stored data from InfluxDB and presents it in a user-friendly dashboard format.
## Architecture
![Random Walk Simulation](architecture.png "Random Walk Data Pipeline's Architecture")
- **Python Application**: Dockerized service simulating random walk data generation, with the output being a stream of data points.
- **Kafka**: A high-throughput distributed messaging system that manages the data stream provided by the Python application. Implemented with 3 brokers to ensure data availability and fault tolerance.
- ** **: A UI interface to visualize what's happening inside Kafka.
- **Telegraf**: Ingests data from Kafka and writes to InfluxDB, ensuring that the data flow is seamless and efficient.
- **InfluxDB**: Stores the time-series data and provides capabilities for fast data retrieval, which is essential for real-time analytics.
- **Grafana**: Connects to InfluxDB and provides real-time visualization of the data, with capabilities for creating comprehensive dashboards, setting alerts, and exploring metrics.
## Workflow
1. The Dockerized Python application generates the data and publishes it to Kafka.
2. Kafka receives the data and ensures it's distributed appropriately across the brokers, maintaining the data's integrity and availability.
3. Telegraf, configured within the same Docker network, subscribes to Kafka topics, collects the data, and writes it into InfluxDB.
4. InfluxDB, running as a Docker service, stores the data with high write and read efficiency.
5. Grafana, also part of the Docker composition, reads the data from InfluxDB and provides a powerful interface for data visualization.
## Docker Composition
The project is orchestrated using Docker Compose, which manages the lifecycle of all the services involved. Each service is containerized, ensuring consistency across different environments and simplifying the setup and scaling processes. The `docker-compose.yml` file includes the configuration for each service, including environment variables, volume mappings, network settings, and dependencies.
## Getting Started
To deploy this project:
1. Clone the repository to your local machine.
2. Navigate to the directory containing the `docker-compose.yml` file.
3. Run `docker-compose up` to start all the services.
4. Once the services are up, access Grafana's web interface to visualize the data.
## Dependencies
- Docker and Docker Compose must be installed on the machine where the project is deployed.
## Contributing
Contributions to the project are welcome. Contributors can clone the repository, make their changes, and submit a pull request for review.
## Conclusion
This project demonstrates a complete end-to-end real-time data pipeline, showcasing the power of Dockerized applications for data generation, streaming, storage, and visualization. It provides a template for similar applications where real-time data processing is required.
For detailed instructions, please refer to the README.md file within the project repository.

9
app/Dockerfile Normal file
View File

@ -0,0 +1,9 @@
FROM python:3.10
WORKDIR /usr/src/app
RUN pip install confluent-kafka
COPY main.py .
CMD ["python", "main.py"]

28
app/main.py Normal file
View File

@ -0,0 +1,28 @@
from random import uniform
from time import sleep
from confluent_kafka import Producer
from json import dumps
def delivery_report(err, msg):
if err is not None:
print(f"An error occured: {err}")
else:
print(f'Message send to {msg.topic()} [{msg.partition()}]')
conf = {
'bootstrap.servers': 'kafka1:9092',
}
producer = Producer(conf)
prob = [0.33, 0.66]
walk = 0
while True:
n = uniform(0, 1)
if n < prob[0] :
walk-=1
elif n > prob[1] :
walk += 1
data = dumps({"walk" : walk})
producer.produce('random_walk', data.encode('utf-8'), callback=delivery_report)
producer.poll(0)
sleep(5)

1
app/requirements.txt Normal file
View File

@ -0,0 +1 @@
confluent-kafka==1.8.2

BIN
architecture.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 711 KiB

118
docker-compose.yml Normal file
View File

@ -0,0 +1,118 @@
services:
app:
build: ./app
container_name: python_app
kafka1:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka1
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- ./kafka/run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka1:9092"]
interval: 30s
timeout: 10s
retries: 5
kafka2:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka2
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./kafka/run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka2:9092"]
interval: 30s
timeout: 10s
retries: 5
kafka3:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka3
environment:
KAFKA_NODE_ID: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka3:9092,CONTROLLER://kafka3:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./kafka/run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD-SHELL", "kafka-broker-api-versions.sh --bootstrap-server=kafka3:9092"]
interval: 30s
timeout: 10s
retries: 5
#Ici nous définissons un service nous permettant d'avoir une interface graphique pour kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8888:8080" # Port pour accéder à Kafka UI
environment:
KAFKA_CLUSTERS_0_NAME: "local" # Nom du cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:9092,kafka2:9092,kafka3:9092" # Brokers Kafka
depends_on:
- kafka1
- kafka2
- kafka3
telegraf:
image: telegraf:1.19
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:rw
env_file:
- ./telegraf/.env
- ./influxdb/.env
influxdb:
image: influxdb:2.1.1
volumes:
- ./influxdb/entrypoint.sh:/docker-entrypoint-initdb.d/entrypoint.sh:ro
- influxdb:/var/lib/influxdb2:rw
env_file:
- ./influxdb/.env
entrypoint: ["./entrypoint.sh"]
restart: on-failure:10
ports:
- 8086:8086
grafana:
image: grafana/grafana-oss:8.4.3
env_file:
- ./grafana/.env
volumes:
- grafana-storage:/var/lib/grafana:rw
depends_on:
- influxdb
ports:
- ${GRAFANA_PORT}:3000
volumes:
influxdb:
grafana-storage:

1
grafana/.env Normal file
View File

@ -0,0 +1 @@
GRAFANA_PORT=3000

18
influxdb/.env Normal file
View File

@ -0,0 +1,18 @@
DOCKER_INFLUXDB_INIT_MODE=setup
DOCKER_INFLUXDB_INIT_CLI_CONFIG_NAME=configname
DOCKER_INFLUXDB_INIT_USERNAME=admin
DOCKER_INFLUXDB_INIT_PASSWORD=adminadmin
#To generate a new token you can use openssl rand -base64 32
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=yxbiitUY9gktf5F0pHOdJGqbrzKuzqwhFiKZuboCD80=
DOCKER_INFLUXDB_INIT_ORG=myorg
DOCKER_INFLUXDB_INIT_BUCKET=my_first_bucket
#The retention date determine how long will the data stay in the db
DOCKER_INFLUXDB_INIT_RETENTION=4d
DOCKER_INFLUXDB_INIT_PORT=8086
DOCKER_INFLUXDB_INIT_HOST=influxdb

25
influxdb/entrypoint.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
# Protects script from continuing with an error
set -eu -o pipefail
# Check if InfluxDB is already set up
if [ -f "/var/lib/influxdb2/influxd.bolt" ]; then
echo "InfluxDB is already set up."
exit 0
fi
# Ensures environment variables are set
export DOCKER_INFLUXDB_INIT_MODE=$DOCKER_INFLUXDB_INIT_MODE
export DOCKER_INFLUXDB_INIT_USERNAME=$DOCKER_INFLUXDB_INIT_USERNAME
export DOCKER_INFLUXDB_INIT_PASSWORD=$DOCKER_INFLUXDB_INIT_PASSWORD
export DOCKER_INFLUXDB_INIT_ORG=$DOCKER_INFLUXDB_INIT_ORG
export DOCKER_INFLUXDB_INIT_BUCKET=$DOCKER_INFLUXDB_INIT_BUCKET
export DOCKER_INFLUXDB_INIT_RETENTION=$DOCKER_INFLUXDB_INIT_RETENTION
export DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=$DOCKER_INFLUXDB_INIT_ADMIN_TOKEN
export DOCKER_INFLUXDB_INIT_PORT=$DOCKER_INFLUXDB_INIT_PORT
export DOCKER_INFLUXDB_INIT_HOST=$DOCKER_INFLUXDB_INIT_HOST
# Conducts initial InfluxDB using the CLI
influx setup --skip-verify --bucket ${DOCKER_INFLUXDB_INIT_BUCKET} --retention ${DOCKER_INFLUXDB_INIT_RETENTION} --token ${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN} --org ${DOCKER_INFLUXDB_INIT_ORG} --username ${DOCKER_INFLUXDB_INIT_USERNAME} --password ${DOCKER_INFLUXDB_INIT_PASSWORD} --host http://${DOCKER_INFLUXDB_INIT_HOST}:8086 --force

7
kafka/run_workaround.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/sh
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t NqnEdODVKkiLTfJvqd1uqQ== -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure

1
telegraf/.env Normal file
View File

@ -0,0 +1 @@
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=yxbiitUY9gktf5F0pHOdJGqbrzKuzqwhFiKZuboCD80=

113
telegraf/telegraf.conf Normal file
View File

@ -0,0 +1,113 @@
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://${DOCKER_INFLUXDB_INIT_HOST}:8086"]
## Token for authentication.
token = "$DOCKER_INFLUXDB_INIT_ADMIN_TOKEN"
## Organization is the name of the organization you wish to write to; must exist.
organization = "$DOCKER_INFLUXDB_INIT_ORG"
## Destination bucket to write into.
bucket = "$DOCKER_INFLUXDB_INIT_BUCKET"
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["kafka1:9092","kafka2:9092","kafka3:9092"]
consumer_group = "telegraf"
## Topics to consume.
topics = ["random_walk"]
offset = "newest"
data_format = "value"
data_type = "string"