# Pastebin dMijsA9v def _produce_message_via_rest(self, topic, message): import requests, json KAFKA_REST_URL = "" KAFKA_X_AUTH_TOKEN = "" LOG.info(" Inside produce_message_via_rest") LOG.info(" Kafka REST url is ", KAFKA_REST_URL) proxies = { 'http': 'http://x.x.x.x:3128', 'https': 'http://x.x.x.x:3128', } # Metric payload to be sent to Kafka message_payload = { "value": { "type": "text", "data": json.dumps(message.decode('utf-8')), } } # Sent message via POST call to Kafka REST API response = requests.post( KAFKA_REST_URL, headers={ "Content-Type": "application/json", "Accept": "application/json", "X-Auth-Token": KAFKA_X_AUTH_TOKEN, }, data=json.dumps(message_payload), proxies=proxies, ) LOG.info(response.__dict__) # Check the response status code if response.status_code == 200: LOG.info(' | Message published successfully') else: LOG.info(' | Failed to publish message. Status code:', response.status_code)