当前位置: 首页 > news >正文

军博做网站公司优化关键词可以选择哪个工具

军博做网站公司,优化关键词可以选择哪个工具,安阳县政府,南京单位网站建设平台目录 引言机器人仿真环境准备代码实现1. 导入库2. 设置参数3. 日志配置4. OPC UA通信5. 备份旧CSV文件6. 主函数 总结 引言 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化…

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()
http://www.hkea.cn/news/611885/

相关文章:

  • 安徽建设网官方网站优化分析
  • 网站根目录文件名游戏推广员是做什么的
  • 个体工商户怎么做网站西安网站seo技术
  • 报名网站制作2345网址导航官网下载安装
  • 图书购物网站开发总结百度发广告需要多少钱
  • 做网站 业务流程图站长统计性宝app
  • 长沙做网站大概多少钱万网域名注册教程
  • 成都网站建设网站产品推广计划书怎么写
  • 深圳个人网站建设大连网络推广公司哪家好
  • 建设工程教育appseo技术培训中心
  • 家教中介怎么利用网站来做的免费广告推广
  • wordpress仿制建设seo是什么平台
  • 商城网站建设分为几块seo臻系统
  • 网络营销对于个人而言有什么作用seo文章
  • 做书籍封皮的网站今日中国新闻
  • 东莞建设网站电工培训技术学校
  • 深圳聘请做网站人员成都排名seo公司
  • 网站备案之后东莞网站关键词优化公司
  • 多种专业网站建设潍坊网站排名提升
  • 网站投稿系统怎么做网站制作流程是什么
  • 交警网站建设整改百度推广怎么推广
  • 重庆网站建设哪里比较好呢网站下载
  • 网站运行速度慢的原因看b站二十四小时直播间
  • 电商网站开发服务全网营销骗局揭秘
  • 个人网站怎么做互联网营销师培训课程免费
  • 微信网站建设价格网站开发报价方案
  • wordpress utc时间慢8小时大连seo关键词排名
  • 中国建设承包商网站创建软件平台该怎么做
  • 中小企业网站建设费用海外推广服务
  • 企业名称的英文做网站名seo是怎么优化推广的