博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于RabbitMQ rpc实现的主机管理
阅读量:6543 次
发布时间:2019-06-24

本文共 4155 字,大约阅读时间需要 13 分钟。

可以对指定机器异步的执行多个命令

    例子:
    >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 
        task id: 45334
    >>: check_task 45334 
    >>:
    注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果

 

基于rabbitmq的rpc,输入执行命令,直接返回ID,进入到rpc中,通过指定的IP地址建立socket,建立queue(“rpc”) 然后执行命令,返回结果,返回的结果和id组成字典,输入ID就可以查看结果

测试:在windows下
备注:本实验在windows下进行的所以在执行命令是要使用windows的命令

 

 

1 import pika 2 import subprocess 3  4 connection =pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 5 channel = connection.channel() 6 channel.queue_declare(queue="rpc") 7  8 def command(cmd): 9     res = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)10     # print(res)11     msg = res.stdout.read().decode("gbk")12     print(msg)13     return msg14 15 def on_request(ch,method,props,body):16     cmd = body.decode()17     response = command(cmd)18     ch.basic_publish(exchange="",19                      routing_key=props.reply_to,20                      properties=pika.BasicProperties(correlation_id=props.correlation_id),21                      body=response22                      )23     ch.basic_ack(delivery_tag=method.delivery_tag)24 25 channel.basic_qos(prefetch_count=1)26 channel.basic_consume(on_request,queue="rpc")27 print("Waiting RPC requests")28 channel.start_consuming()
server
1 import pika 2 import uuid 3 import random 4  5 class Client(object): 6     def __init__(self): 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 8         self.channel = self.connection.channel() 9 10         result = self.channel.queue_declare(exclusive=True)11         self.callback_queue = result.method.queue12 13         self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue)14 15     def on_response(self,ch,method,props,body):16         if self.corr_id == props.correlation_id:17             self.response = body18 19     def call(self,command,host):20         self.response = None21         self.corr_id = str(uuid.uuid4())22         self.channel.basic_publish(exchange="",23                                    routing_key="rpc",24                                    properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),25                                    body=str(command)26                                    )27         while self.response is None:28             self.connection.process_data_events()29 30         self.corr_id = random.randint(10000,99999)31         id = self.corr_id32         res = self.response.decode()33         dict[id] =res34         print('task_id: %s host: %s cmd: %s ' % (self.corr_id, host, command))35         return self.corr_id, self.response.decode()36 37 def help():38     print("run 'df -h' --hosts 127.0.0.1  10.10.10.1")39     print("check_task id")40     print("check_task_all")41 42 def start(command_input):43     command_list = command_input.split()44     if command_list[0]== "run":45         try:46             obj = command_input.split("run")47             host_obj =obj[1].split("--hosts")48             hosts = host_obj[1].strip().split()49             command= command_input.split("\"")[1]50             for host in hosts:51                 try:52                     rpc_client.call(command,host)53                 except TypeError and AssertionError:54                     break55         except IndexError:56             print('-bash: %s command not found' % command_input)57             help()58     elif command_list[0] =="check_task":59         try:60             print(dict[int(command_list[1])])61         except IndexError:62             help()63     elif command_list[0] =="check_task_all":64         for index, key in enumerate(dict.keys()):65             print(index,key)66     elif command_list[0]=="help":67         help()68     else:69         print('-bash: %s command not found' % command_input)70         help()71 72 73 rpc_client = Client()74 dict={}75 help()76 while True:77     command_input =input("请输入命令>>>:").strip()78     if len(command_input)==0:79         continue80     else:81         start(command_input)
client

 

转载于:https://www.cnblogs.com/garrett0220/articles/8403510.html

你可能感兴趣的文章
hadoop无法启动DataNode问题
查看>>
java泛型中<?>和<T>区别
查看>>
这里是指推送通知跟NSNotification有区别:
查看>>
用户ID的代码生成
查看>>
win7经常出现“关闭xxxx前您必须关闭所有会话框”
查看>>
SNMP安全配置的两种方法(也可同一时候兼顾配置两种方法)
查看>>
MongoDB 自己定义函数
查看>>
Summary Day30
查看>>
逆向输出回环数组
查看>>
高清摄像头MIPI CSI2接口浅解【转】
查看>>
C# CancellationTokenSource和CancellationToken的实现
查看>>
PCIE BAR空间
查看>>
如何用数学课件制作工具画角平分线
查看>>
VS2015 中统计整个项目的代码行数
查看>>
UWP控件与DataBind
查看>>
bash: php: command not found
查看>>
XVIII Open Cup named after E.V. Pankratiev. Eastern Grand Prix
查看>>
数据恢复软件如何换机使用?
查看>>
《高性能mysql》到手
查看>>
(转)关于如何学好游戏3D引擎编程的一些经验
查看>>