Issue
I'm new to OPC UA and Python, but with the asyncua examples, I created the example that I need for a real project. Now I need to add security to the server and client, and for now, using a username and password is fine.
Here is my functional code without security. If someone knows what functions I need to use to create two users, one with admin privileges and the other with none, please let me know.
import asyncio
from asyncua import Server, ua
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_struct_field
@uamethod
def func(parent, value):
return value * 2
async def main():
server = Server()
await server.init()
server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await server.nodes.objects.add_method(
ua.NodeId("ServerMethod", idx),
ua.QualifiedName("ServerMethod", idx),
func,
[ua.VariantType.Int64],
[ua.VariantType.Int64],
)
struct = await new_struct(server, idx, "MyStruct", [
new_struct_field("FirstValue", ua.VariantType.Float, 0.0),
new_struct_field("SecondValue", ua.VariantType.Float, 0.0),
new_struct_field("ThirdValue", ua.VariantType.Float, 0.0),
new_struct_field("FourthValue", ua.VariantType.Float, 0.0),
new_struct_field("FifthValue", ua.VariantType.Float, 0.0),
])
custom_objs = await server.load_data_type_definitions()
mystruct = await myobj.add_variable(idx, "my_struct", ua.Variant(ua.MyStruct(), ua.VariantType.ExtensionObject))
await mystruct.set_writable()
await myvar.set_writable()
print("Starting server!")
async with server:
while True:
await asyncio.sleep(0.5)
n_struct = await mystruct.get_value()
var = await myvar.read_value()
print ("\n%f\n%f\n%f\n%f\n%f\n%f" % (var, n_struct.FirstValue, n_struct.SecondValue, n_struct.ThirdValue, n_struct.FourthValue, n_struct.FifthValue))
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print('Async event loop already running. Adding coroutine to the event loop.')
tsk = loop.create_task(main())
tsk.add_done_callback(
lambda t: print(f'Task done with result={t.result()} << return val of main()'))
else:
print('Starting new event loop')
result = asyncio.run(main(), debug=True)
I tried to use the encryption example from Asyncua, but I can't make it work. So, I tried to read the functions of the main code from Asyncua server and do something by myself, but I only got errors.
Thank you.
Solution
You have to create a class that implements get_user
class CustomUserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
if username == "admin":
if password == 'secret_admin_pw':
return User(role=UserRole.Admin)
elif username == "user":
if password == 'secret_pw':
return User(role=UserRole.User)
return None
To use the manager in your code:
user_manager = CustomUserManager()
server = Server(user_manager=cert_user_manager)
await server.init()
server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
Answered By - Schroeder
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.