组内做大数据,需要kafka写入数据,最近在看python正好,练练手,网上找了一圈,都是用的pykafka,经过一整圈的安装,最终搞定,代码如下
#coding:u8
import sys
import time
import random
import datetime
import MySQLdb
import codecs
from pykafka import KafkaClient
import logging
import json
import threading
'''
******************
'''
ad=[]
try:
  ini=file("set.txt")
  ad=ini.readline().splitlines()
  ini.close
except Exception as e:
  print "open settings file Error:",type(e)
  ad=["192.168.1.121:9092"]
print "open ini file"
try:
  client = KafkaClient(hosts = ad[0])
  print "Topics:",client.topics
  topic  = client.topics["mytopic"]
except Exception as e:
  print "Opening kafka Error:%s" %(type(e))
  sys.exit(1)
print "before threading"

try:
  with tp.get_sync_producer() as producer:
    producer.produce(str(dct2))
 except Exception as e:
    print "Error:" ,type(e)

print "ini consumer"
  while 1==1:
    print "nn",type(consumer)
    for message in consumer:
      print "mm"
      if message is not None:
        print message.offset, message.value
except Exception as e:
  print e,type(e)

运行结果,可以列出topic,写入的数据也没有报错信息。但是,消费者取不到数据,无论是kafka直接取,还是python写消费者代码。
后来采用了 kafkapython 正常,代码如下:

#coding:utf-8
import sys
import time
import random
import datetime
import codecs
import kafka.kafkaProducer
import logging
import json
import threading

ad=[]
try:
  ini=file("set.txt")
  ad=ini.readline().splitlines()
  ini.close
except Exception as e:
  ad=["192.168.1.121:9092,192.168.1.122.9092"]
  #print "open settings file Error:%d,%s" %(e.args[0],e.args[1])
  print "Opening settings file Error:",e,type(e)
print "Opened ini file"
'''
try:
  client = KafkaClient(hosts = ad[0])
  print "Topics:",client.topics
  topic  = client.topics["mytopic"]
except Exception as e:
  print "Opening kafka Error:%s" %(e.args[0])
  sys.exit(1)
print "before threading"
'''
try:
  producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
except Exception as e:
  print "Opening kafka Error:",e,type(e)
  sys.exit(1)

print "before threading"

threads=[]
for i in range(0,12):
  try:
    threads.append(threading.Thread(target=tf,args=(producer,i)))
    threads[i].start()
  except Exception as e:
    print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))

print "main thread is ended"

代码均有所节略。

Python操作Kafka爬坑相关推荐

  1. Python 操作 Kafka --- kafka-python

    kafka-python:https://github.com/dpkp/kafka-python kafka-python 文档:https://kafka-python.readthedocs.i ...

  2. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  3. Python操作kafka实现数据定时推送和获取,并解决中文乱码问题

    上篇介绍了kafka环境的搭建以及实现了Python如何简单操作kafka.本篇实现如何定时向kafka推送和接收数据,并解决了中文乱码问题. 设置Producer每隔5秒推送一次数据,Consume ...

  4. python操作mysql数据库—坑吭

    一.原始代码 #coding=utf-8import MySQLdb from cProfile import Profileclass MySQLUtils():def __init__(self) ...

  5. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  6. 使用Python读写kafka

    目录 1. Kafka安装与使用 1.1 下载 1.2 安装 1.3 配置 1.4 运行 1.4.1 启动zookeeper 1.4.2 启动kafka 1.5 第一个消息 1.5.1 创建一个top ...

  7. 使用python读取kafka实时topic数据demo,包括安装kafka module

    1. 安装kafka module kafka-python安装,转载:https://blog.csdn.net/see_you_see_me/article/details/78468421 1. ...

  8. python操作excel及爬取美赛证书

    今天美赛成绩出来了,S奖,有点遗憾.但工作还是要继续,因为要下载校队的获奖证书,所以使用python爬虫来爬取证书,省点事. python操作excel 在这里我选择pandas库来操作excel,详 ...

  9. python闲鱼爬虫_Python 爬虫爬坑路(二)——B站图片,咸鱼的正确 GET 姿势

    前言 昨天在写完 入门级爬虫之后 ,马上就迫不及待的着手开始写 B站的图片爬虫了,真的很喜欢这个破站呢 (〜 ̄△ ̄)〜 这里不涉及到 Python 爬虫的高级技巧,没有使用框架,没有考虑反爬机制,没有 ...

最新文章

  1. Python之路【第一篇】:Python简介和入门
  2. 软件工程第二次作业——模仿主流网站:搜狗输入法
  3. yii2之原生sql
  4. 前端HTML5css3阴影凹凸文字
  5. 登陆mysql服务器命令_Mysql服务器登陆,启动,停止等基本操作命令介绍(Linux/Centos环境)...
  6. 【Java从0到架构师】Spring - IoC 控制反转、DI 依赖注入
  7. FGMap学习之--快速入门
  8. 对vue饿了么项目重构之后的一些理解
  9. 用Perl编写Apache模块续 - SVNAuth
  10. .NET发送邮件代码--測試通過
  11. 光伏发电极其并网控制技术 最大功率点跟踪
  12. 计算机地图制图符号制作的心得,计算机地图制图实习报告.docx
  13. google开发者大会的倒计时动画,没有用Flash
  14. shell 脚本中常用的列表
  15. mysql like模糊查询like %someTitle%效率低下
  16. 理解 Flexbox:你需要知道的一切
  17. Maxwell参数化建模
  18. 快来加入木东居士的数据交流群吧~
  19. Kali Linux入门教程(非常详细)从零基础入门到精通,看完这一篇就够了。
  20. 总结Criteria的简要使用概述(Hibernate5.2+)

热门文章

  1. Objective-C浮点数转化整数(向上取整、向下取整)
  2. javascript知识点记录(2)
  3. [转]listview中设置背景图片后 拉动变黑
  4. 《当程序员的那些狗日日子》(三十六)无名的配角
  5. Microsoft Jet SQL 参考在线手册
  6. 给定两个整数m和n,求出m~n这段连续的整数中所有偶数的平方和以及所有奇数的立方和。
  7. python变量标识符_python中的变量和标识符
  8. java添加信息_java – 向异常添加信息
  9. android activity之间传递对象,Android Activity之间的数据传递
  10. oracle v sql不存在,程序包oracle.sql不存在