#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: 风过无言花易落
# @Date  : 2022/02/14 22:30
# @Desc  : 进程写Kafka,线程调接口from confluent_kafka import Producer
import json
import timeit,time,os
from faker import Faker
import multiprocessing as mp
import random
import datetime
import string
import threading
import math
import requests
import logging
import logging.handlersf = Faker(locale='zh-CN')
class logs(object):def __init__(self, level, logger=None):self.logger = loggerself.logger = logging.getLogger(logger)# 设置输出的等级LEVELS = {'NOSET': logging.NOTSET,'DEBUG': logging.DEBUG,'INFO': logging.INFO,'WARNING': logging.WARNING,'ERROR': logging.ERROR,'CRITICAL': logging.CRITICAL}# 创建文件目录logs_dir = "log"if os.path.exists(logs_dir) and os.path.isdir(logs_dir):passelse:os.mkdir(logs_dir)# 修改log保存位置timestamp = time.strftime("%Y-%m-%d", time.localtime())logfilename = "log-%s.log" % timestamplogfilepath = os.path.join(logs_dir, logfilename)rotatingFileHandler = logging.handlers.RotatingFileHandler(filename=logfilepath,maxBytes=1024 * 1024 * 50,backupCount=500)# 设置输出格式formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')rotatingFileHandler.setFormatter(formatter)# 控制台句柄console = logging.StreamHandler()Level = LEVELS.get(level)console.setLevel(Level)console.setFormatter(formatter)# 添加内容到日志句柄中self.logger.addHandler(rotatingFileHandler)self.logger.addHandler(console)self.logger.setLevel(Level)# 解决重复日志问题self.logger.handlers = self.logger.handlers[:1]def info(self, message):self.logger.info(message)def debug(self, message):self.logger.debug(message)def warning(self, message):self.logger.warning(message)def error(self, message):self.logger.error(message)class CreateIp(object):'''IP随机'''def __init__(self):self.ipv4_prov_prefix = {'10': ['192.168.169.', '192.168.239.', '192.168.135.'],'11': ['192.168.197.', '192.168.243.', '192.168.128.', '192.168.165.'],'13': ['192.168.212.', '192.168.132.', '192.168.166.'],'17': ['192.168.229.', '112.224.240.', '192.168.142.', '192.168.177.', '192.168.198.', '192.168.200.'],'18': ['192.168.217.', '192.168.238.', '192.168.133.', '192.168.167.', '192.168.201.', '192.168.202.'],'19': ['192.168.168.', '192.168.242.', '192.168.241.', '192.168.134.'],'30': ['192.168.178.', '192.168.244.', '192.168.143.'],'31': ['192.168.222.', '192.168.237.', '192.168.129.', '192.168.1168.', '192.168.221.'],'34': ['192.168.176.', '192.168.245.', '192.168.246.', '192.168.232.', '192.168.131.'],'36': ['192.168.168.', '192.168.144.', '192.168.180.'],'38': ['192.168.226.', '192.168.145.', '192.168.181.', '192.168.182.'],'50': ['192.168.185.', '192.168.147.'],'51': ['192.168.216.', '192.168.130.', '192.168.184.', '192.168.199.', '192.168.203.', '192.168.204.','192.168.205.', '192.168.214.', '192.168.215.'],'59': ['192.168.186.', '192.168.148.'],'70': ['192.168.194.', '192.168.156.'],'71': ['192.168.173.', '192.168.139.'],'74': ['192.168.219.', '192.168.140.', '192.168.174.', '192.168.218.'],'75': ['192.168.183.', '192.168.146.'],'76': ['192.168.234.', '192.168.141.', '192.168.175.', '192.168.207.', '192.168.208.', '192.168.223.'],'168': ['192.168.191.', '192.168.153.'],'81': ['192.168.231.', '192.168.149.', '192.168.187.', '192.168.209.', '192.168.230.'],'83': ['192.168.211.', '192.168.150.', '192.168.188.', '192.168.210.'],'84': ['192.168.192.', '192.168.154.'],'85': ['192.168.228.', '192.168.151.', '192.168.189.', '192.168.227.'],'86': ['192.168.225.', '192.168.152.', '192.168.190.', '192.168.224.'],'87': ['192.168.193.', '192.168.236.', '192.168.235.', '192.168.155.'],'88': ['192.168.195.', '192.168.157.'],'89': ['192.168.213.', '192.168.233.', '192.168.158.', '192.168.196.'],'90': ['192.168.171.', '192.168.137.'],'91': ['192.168.192.', '192.168.240.', '192.168.136.', '192.168.170.'],'97': ['192.168.172.', '192.168.138.']}def create_ipv4(self,roam_type, prov):if roam_type == 0:ipv4_add = random.choice(self.ipv4_prov_prefix[prov]) + str(random.randint(1, 125))elif roam_type == 1:del self.ipv4_prov_prefix[prov]new_prov = str(random.sample(self.ipv4_prov_prefix.keys(),1)[0])ipv4_add = random.choice(self.ipv4_prov_prefix[new_prov]) + str(random.randint(1, 125))return ipv4_adddef getRandomString(number):'''随机字符串'''rule = string.ascii_letters + string.digitsstr = random.sample(rule, number)return "".join(str)def randomtimes(start, end, n, frmt="%Y-%m-%d %H:%M:%S"):'''随机时间区间'''stime = datetime.datetime.strptime(start, frmt)etime = datetime.datetime.strptime(end, frmt)time_datetime=[random.random() * (etime - stime) + stime for _ in range(n)]time_str=[t.strftime(frmt) for t in time_datetime]return time_str[0]def create_phone():'''随机手机号'''# 第二位数字second = [3, 4, 5, 7, 8][random.randint(1, 4)]#第三位数字third = {0: random.randint(0, 9),4: [5, 7, 9][random.randint(0, 2)],5: [i for i in range(10) if i != 4][random.randint(0, 8)],7: [i for i in range(10) if i not in [4, 9]][random.randint(0, 7)],8: random.randint(0, 9), }[second]# 最后八位数字suffix = random.randint(9999999, 100000000)# 拼接手机号return "1{}{}{}".format(second, third, suffix)# 这里的参数包括一个基准点,和一个距离基准点的距离
def generate_random_gps(base_log=None, base_lat=None, radius=None):'''# 随机经纬度# 这里的参数包括一个基准点,和一个距离基准点的距离'''if base_log == None or base_lat == None:base_log = 136.55491base_lat = 49.919034radius = 1000000elif radius == None:print('距离半径不可为空')radius_in_degrees = radius / 111300u = float(random.uniform(0.0, 1.0))v = float(random.uniform(0.0, 1.0))w = radius_in_degrees * math.sqrt(u)t = 2 * math.pi * vx = w * math.cos(t)y = w * math.sin(t)longitude = y + base_loglatitude = x + base_lat# 这里是想保留6位小数点loga = '%.6f' % longitudelata = '%.6f' % latitudereturn loga, latadef delivery_report(err, msg):""" Called once for each message produced to indicate delivery result.Triggered by poll() or flush(). """if err is not None:#print('Message delivery failed: {}'.format(err))with open('failed.log','a') as fobj:fobj.write(str(err)+'\n')else:#print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))with open('Message_delivery.log','a') as fobj:fobj.write('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())+'\n')def c_kafka(*args, **kwds):processId = os.getpid()print('Process', processId)topic,pool_data,sleep_time,count,lock,log_grade = argslog = logs(log_grade, __name__)starttotal = timeit.default_timer()try:p = Producer(kwds)except Exception as e:log.error('Process- {} -kafka:连接失败-{}'.format(processId,e))for i in range(count):start = timeit.default_timer()nowtime = time.time()dateArray = datetime.datetime.fromtimestamp(nowtime)nowtimeapi = dateArray.strftime("%Y-%m-%d %H:%M:%S")ip = CreateIp()prov = random.sample(['10', '11', '13', '17', '18', '19', '30', '31', '34', '36', '38', '50', '51', '59', '70', '71', '74','75', '76', '79', '81', '83', '84', '85', '86', '87', '88', '89', '90', '91', '97'], 1)[0]roam_type = random.randint(0, 1)cdr = {"titleName": "01","mobile": create_phone(),"provinceCode": "013","cityCode": "130","netType": "112_3001","loginTime": nowtime,"loginType": "01","loginState": "01","imei": '0'+create_phone(),"userIp": ip.create_ipv4(roam_type, prov),"appid": "ppp111","iccid": "ppp111","imsi": "ppp111","mac": "F4:BF:80:0E:25:6F","meid": "ppp111","lat": generate_random_gps()[1],"lon": generate_random_gps()[0],"deviceBrand": "HUAWEI","deviceModel": "HMA-AL00","os": "android","osVersion": "android8.2biiiopp","screen": "2244*1080","memorySpace": "1.61 GB","phoneSpace": "52.92 GB","version": "android@8.2buuupouu"}api_data = {"UNI_BSS_HEAD": {"APP_ID": "tyfkpAPPID","TIMESTAMP": "{}.429".format(nowtimeapi),"TRANS_ID": "2021811061409363562951","TOKEN": "6dc6f60246bf79cfc2c513fea5194402"},"UNI_BSS_BODY": {"LOGIN_USER_RISK_CONTROL_REQ": {"USER_ID": cdr["mobile"],"RISK_CONTROL_CODE": "PloyEventIdCE001","HANDLE_TIME": nowtimeapi,"USER_IP": cdr["userIp"],"LON": cdr["lon"],"LAT": cdr["lat"],"IMEI":cdr["imei"]}}}# json_cdr = json.dumps(cdr,indent = 4)json_api = json.dumps(api_data, indent=4)log.info('Process- {} -Time consuming data construction:{}'.format(processId,timeit.default_timer() - start))p.poll(0)p.produce(topic, json.dumps(cdr).encode('utf-8'), callback=delivery_report)putkafkatime = timeit.default_timer()json_api = {'key': [putkafkatime, json_api]}if sleep_time > 0:p.flush()pool_data.put_nowait(json_api)time.sleep(int(sleep_time))else:p.flush()pool_data.put_nowait(json_api)end = timeit.default_timer() - starttotalreturn [processId, end, count]class Creat_Thread(threading.Thread):def __init__(self, t_msg, pool_data, lock, url, sleeptime,threadsleeptime,log_grade):threading.Thread.__init__(self)self.t_msg = t_msgself.pool_data = pool_dataself.lock = lockself.url = urlself.sleeptime = sleeptimeself.threadsleeptime = threadsleeptimeself.log = logs(log_grade, __name__)def run(self):start = timeit.default_timer()count = 0while True:count += 1if self.pool_data.empty():if count <= self.threadsleeptime:self.log.warning('Thread- {} - Wait for 1 second when the queue is empty'.format(self.t_msg))time.sleep(1)continueelse:self.log.error('Thread- {} - All items have been taken off the queue'.format(self.t_msg))breakelse:try:dataFromQueue = self.pool_data.get_nowait()except Exception as e:passelse:end1 = timeit.default_timer()if (end1 - dataFromQueue['key'][0]) >= self.sleeptime:self.log.warning('Thread- {} - Call interval:{}'.format(self.t_msg,(end - dataFromQueue['key'][0])))headers = {'Content-Type': 'application/json','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0','Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'}end = timeit.default_timer()try:rsp = requests.post(url=self.url, data=dataFromQueue['key'][1], headers=headers, verify=False,timeout=10)  # 请求except Exception as e:self.log.info('Thread- {} - Interface call failed:{}'.format(self.t_msg,e))self.log.info('Thread- {} - Asynchronous call time consuming:{}'.format(self.t_msg, (timeit.default_timer() - end)))rsp_json = json.dumps(rsp.json(), indent=4, ensure_ascii=False)  # 响应文本json化# print(dataFromQueue['key'][1])print(rsp_json)response = {'msg':dataFromQueue['key'][1],'rsp':rsp.json()}with open('response', 'a') as fobj:fobj.write(json.dumps(response, indent=4) + '\n')else:self.pool_data.put_nowait(dataFromQueue)self.log.warning('Thread- {} - Rewrite queue'.format(self.t_msg))self.log.error('Thread- {} - Total asynchronous call time:{}'.format(self.t_msg,timeit.default_timer() - start))def thread_run(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade):t_msg = 0threads = []for tName in range(thread_num):t_msg += 1thread = Creat_Thread(t_msg,pool_data,lock,url,sleeptime,threadsleeptime,log_grade)thread.start()threads.append(thread)if __name__ == "__main__":fake = Faker(locale='zh_CN')print('父进程', os.getpid())print('Resource preparation in progress')# -------------------------------日志级别-----------------------------------#log_grade = 'DEBUG'# -------------------------------进程配置-----------------------------------#count_ms = 200  # 消息数processes = 10  # 进程数 至少为2count = int(count_ms / (processes - 1))remainder = count_ms % (processes - 1)manager = mp.Manager()pool_data = manager.Queue()lock = manager.Lock()  # 初始化一把锁# -------------------------------kafka配置-----------------------------------#sleep_time = 0  # 推Kafka休眠topic = 'topicname'conf = {"bootstrap.servers": "192.18.0.82:3007","security.protocol": "SASL_PLAINTEXT","sasl.mechanisms": "SCRAM-SHA-256","sasl.username": "test","sasl.password": "text",'queue.buffering.max.kbytes': 2000000,'queue.buffering.max.messages': 1000000}#-----------------------------接口线程调用配置--------------------------------#thread_num = 100  # 调用API资源线程数url = 'http://192.168.1.1:5555/' #生产sleeptime = 5 #设置 等待调用接口时间threadsleeptime = 10 #线程等待时间# 创建新线程threads = []t_msg = 0  # 自定义线程号 Python的太难获取#--------------------------------------------------------------------------## 创建进程result = []pool = mp.Pool(processes=processes)  # processes_num 进程池数for p_name in range(processes-1):if (p_name+1) == processes-1 and remainder != 0:count += remainderresult.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade),kwds={**conf}))else:result.append(pool.apply_async(func=c_kafka, args=(topic, pool_data, sleep_time,count,lock,log_grade), kwds={**conf}))result.append(pool.apply_async(func=thread_run, args=(thread_num,pool_data,lock,url,sleeptime,threadsleeptime,log_grade),))pool.close()pool.join()for res in range(len(result)-1):process, rtime ,count = result[res].get()print("process(%s) done. --Running time: %s Seconds" % (process, rtime),count)

python3多进程写Kafka异步线程调用接口相关推荐

  1. 手写 kafka 异步回调

    玩过 kafka 的小伙伴相信对 kafka 生产者和消费者的异步回调记忆犹新,不由的赞叹这种设计模式真的超赞,心想如果我也能写出这样的异步回调那该多好,今天他来了!!! Callback 模仿 ka ...

  2. python3多进程写时拷贝_Python实现多进程的详解(附示例)

    本篇文章给大家带来的内容是关于Python实现多进程的详解(附示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. fork函数创建子进程 基本使用 Linux 操作系统提供了一个 ...

  3. Python多进程写Kafka

    #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: 风过无言花易落 # @Date : 2020/11/02 14:00 # @Desc ...

  4. python3多进程写时拷贝_python多进程实现复制文件

    [Python] 纯文本查看 复制代码''' 多进程实现复制文件 步骤: (1)获得源文件夹路径: (2)获取源文件夹下各文件的文件名 (3)获得目标文件夹 (4)复制文件(文件夹不能复制) 新增内容 ...

  5. python3多进程写时拷贝_python利用进程池,多进程拷贝文件

    #!/usr/bin/evn python #Author:ELSON_ZENG import os,time import multiprocessing def copy_file(queue, ...

  6. 异步线程RequestContextHolder为空问题

    一.问题 由于session是线程安全的,所以无法直接在各个线程中传递数据,所以在服务间异步线程调用时,就会导致session丢失的问题出现 二.异常复现 package com.xx.control ...

  7. 记录uni-app弹框事件无生命周期问题;uni-popup-dialog打开触发事件;uni-popup-dialog调用接口时机

    项目需求:点击页面的 品牌型号 按钮,打开弹框,将 车架号码 参数传入接口获取到对应的 品牌型号列表,在进行选择后关闭弹框. 实际开发中,我在父组件里面引入了弹框子组件:诡异的事情发生了: 在小程序页 ...

  8. java异步线程池同时请求多个接口数据

    java异步线程池同时请求多个接口数据 一.适合的使用场景 复杂的网页爬虫,如要同时请求多个不同网页的数据,并且需要执行不同的数据处理,这个是非常合适的,执行线程传递的参数到最后callback是会附 ...

  9. 普歌-腾讯云短信+使用node发送短信(3种方法API、SDK)、封装工具、搭建web服务、写接口、调用接口发送短信、时效性判断、验证验证码的正确性(下)

    普歌-结合腾讯云短信服务+node搭建一个简单的发送短信web小项目 涉及技术: 腾讯云服务 后端服务:node+express 前端搭建:html+js 前言:本来这篇博客应该很早就发了,中间有一些 ...

最新文章

  1. swift String
  2. linux bind源码安装,linux下bind的安装
  3. pdf python 位置_如何使用PDFMiner获取PDF中文本的位置?
  4. 【转载】C++运算符之类型转换
  5. 如何从rpm包中提取文件
  6. PyQt5 Pyinstaller时出现错误Cannot find PyQt5 plugin directories
  7. PHP-FPM,Nginx,FastCGI 之间的关系
  8. 【二分法】计蒜客:二分快速幂
  9. 第16 17章节-Python3.5-Django知识点整理 15
  10. 【干货】python正则表达式应用笔记
  11. arm裸板驱动总结(makefile+lds链接脚本+裸板调试)
  12. 【热门主题:银魂win7主题】
  13. 正则方程手写(初步实现)
  14. wxjava 多商户 微信支付在springBoot项目中使用以及血泪教训
  15. 异数OS-织梦师-异数OS虚拟容器交换机(七) 走进4Tbps网络应用时代,加速5G应用真正落地
  16. Stm32中英文手册官网免费
  17. 对输入的两个分数选择‘+’、‘-’、‘*’、‘/’四则运算,并以分数形式输出结果。输入:第一行先输入整数T,表示总共有T组数据。接下来共T行,每行输入分数形式的算术表达式。 输出:最简分数形
  18. 马斯克要买地当「城主」/ 苹果手表引入ChatGPT/ 小鹏辟谣多名核心高管离职…今日更多新鲜事在此...
  19. xctf攻防世界 MISC高手进阶区 saleae
  20. 下列HTML标签是段落标签的是,HTML段落标签

热门文章

  1. 浅谈DevSecOps工具链中的源代码安全保障
  2. 解决Deprecated: Methods with the same name as their class will not be constructors in报错方案
  3. 二叉树的前序,中序,后序遍历Java实现
  4. cookies是什么
  5. springsecurity忽视拦截静态资源
  6. 打造高效能团队之测试能力提升
  7. 前端修仙路之筑基(CSS篇)
  8. 【R语言】使用nnet过程中报错Error in eval(predvars, data, env) : object ‘naulong‘ not found
  9. win7专业版找不到ie浏览器
  10. QQ第三方登陆流程详解