当前位置: 首页 > news >正文

做收集信息的网站最新国内新闻10条

做收集信息的网站,最新国内新闻10条,创意设计报告模板,做wish如何利用数据网站进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地#xff0c;命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写#xff0c;java改成…进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写java改成了kotlin 配置文件 # Required connection configs for Kafka producer, consumer, and admin bootstrap.servers security.protocolSASL_SSL sasl.mechanismsPLAIN sasl.username sasl.password group.id auto.offset.resetearliest # Best practice for higher availability in librdkafka clients prior to 1.7 session.timeout.ms45000 import timefrom confluent_kafka import Producer, Consumer import asyncio import threadingclass KafkaClient:def __init__(self, config_file):self.config self.read_config(config_file)def read_config(self, config_file):config {}with open(config_file) as fh:for line in fh:line line.strip()if len(line) ! 0 and line[0] ! #:parameter, value line.strip().split(, 1)config[parameter] value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer Producer(self.config)# Produces a sample messageproducer.produce(topic, keykey, valuevalue)print(fProduced message to topic {topic}: key {key:12} value {value:12})# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callbackNone, group_idpython-group-1, auto_offset_resetearliest):# Sets the consumer group ID and offsetself.config[group.id] group_idself.config[auto.offset.reset] auto_offset_resetconsumer Consumer(self.config)consumer.subscribe([topic])loop asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callbackNone):thread threading.Thread(targetself.consume_async, args(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg consumer.poll(1.0)if msg is not None:breakif not msg.error():key msg.key().decode(utf-8)value msg.value().decode(utf-8)print(fConsumed message: key {key:12} value {value:12})except KeyboardInterrupt:passfinally:consumer.close()config_file_path .\\client.properties topic test key key value valuekafka_client KafkaClient(config_file_path) kafka_client.produce(topic, key, value) thread kafka_client.consume(topic, consume_async) 配置文件 # Required connection configs for Kafka producer, consumer, and admin bootstrap.servers security.protocolSASL_SSL sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameGHFXZDIOMQW3IPKA passwordTimUk7hj/EwTiB031lA95LeKfXN3t2DdnwizhKx37wFxZKMLGEqTOnneTKrlQQ; sasl.mechanismPLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookupuse_all_dns_ips key.serializerorg.apache.kafka.common.serialization.StringSerializer value.serializerorg.apache.kafka.common.serialization.StringSerializer # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms45000 topic group.id auto.offset.resetearliest key.deserializerorg.apache.kafka.common.serialization.StringDeserializer value.deserializerorg.apache.kafka.common.serialization.StringDeserializer # Best practice for Kafka producer to prevent data loss acksall java(kotiln) import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import java.io.Closeable import java.io.FileInputStream import java.io.IOException import java.nio.file.Files import java.nio.file.Paths import java.time.Duration import java.util.*class KafkaClientT, V : Closeable {private var producer: KafkaProducerT, V? nullprivate var fileConfig: Properties? nullval TOPIC topicprivate val DURATION 100Lprivate val POOLSIZE 10private val DISPATCHER newFixedThreadPoolContext(POOLSIZE, CoroutinePool)private val SCOPE CoroutineScope(DISPATCHER)constructor(configPath: String? null, config: Properties? null) {if (config null configPath null) throw Exception(dont have any config)var config1 Properties()if (configPath ! null) {fileConfig readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config ! null) {config1.putAll(config)}producer KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecordsT, V) - Unit) {val consumer: KafkaConsumerT, V KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecordsT, V consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException($configFile not found.)}val config Properties()FileInputStream(configFile).use { inputStream - config.load(inputStream) }return config}override fun close() {producer?.close()} }fun main() {val cli KafkaClientString, String(D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties)cli.consume {println(test beg)for (record in it) {println(String.format(Consumed message from topic %s: key %s value %s, cli.TOPIC, record.key(), record.value()))}println(test end)}// Give some time for the consumer to startThread.sleep(2000)cli.produce(key1, test)// Give some time for the consumer to consume the messageThread.sleep(5000) }
http://www.hkea.cn/news/14576329/

相关文章:

  • 自己做网站需要做啥郑州比较大的网络公司
  • 包装公司网站模板动漫做羞羞的网站
  • 专业外贸公司网站业务平台低价
  • 电话投放小网站郑州企业网站seo
  • 网站新闻后台怎么做网站一键备案
  • 阜宁专业做网站的公司短视频seo营销
  • 百家利网站开发深圳物流公司联系电话
  • 做动态图网站越秀金融大厦北塔
  • 广西建设科技与建筑节能协会网站如何用html制作网站
  • 做分色找工作网站wordpress 绑定多个二级域名
  • 上海做网站比较好的公司公司邮箱怎么申请的
  • 做一些网站的弹出页面模板网站没有源代码
  • 南京网站开发公司排名国际品牌的ui设计公司
  • 中文网站建设小组oa系统登录入口
  • 智慧政务网站怎么做wordpress 管理员权限
  • 怎么制作学校网站wordpress rest post
  • 来安县城乡规划建设局网站成都建设网站专业公司
  • 网站建设的软件叫啥室内设计毕业设计代做网站
  • 丽江市网站建设海南网站建设基本流程
  • c c也能干大事网站开发设计师必须知道的十个网站
  • 科协科普网站建设中国建设银行天津分行网站
  • 南京网站优化快速排名申请了域名怎么做网站
  • 唐山门户网站建设樱花动漫做网站
  • 那些网站可以做问答中华建设网算什么级别网站
  • 登陆空间商网站wordpress 查询
  • 泉州市网站建设网站建设药店
  • 江苏网站建设联系方式wap建站php源码
  • 江门市智企互联网站建设市场营销策划ppt免费模板
  • 做网站能给公司带来什么好处郑州便宜网站建设费用
  • 网站模板红黑app网站开发定制