可以对指定机器异步的执行多个命令 例子: >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 task id: 45334 >>: check_task 45334 >>: 注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果
基于rabbitmq的rpc,输入执行命令,直接返回ID,进入到rpc中,通过指定的IP地址建立socket,建立queue(“rpc”) 然后执行命令,返回结果,返回的结果和id组成字典,输入ID就可以查看结果 测试:在windows下 备注:本实验在windows下进行的所以在执行命令是要使用windows的命令
1 import pika 2 import subprocess 3 4 connection =pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 5 channel = connection.channel() 6 channel.queue_declare(queue="rpc") 7 8 def command(cmd): 9 res = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)10 # print(res)11 msg = res.stdout.read().decode("gbk")12 print(msg)13 return msg14 15 def on_request(ch,method,props,body):16 cmd = body.decode()17 response = command(cmd)18 ch.basic_publish(exchange="",19 routing_key=props.reply_to,20 properties=pika.BasicProperties(correlation_id=props.correlation_id),21 body=response22 )23 ch.basic_ack(delivery_tag=method.delivery_tag)24 25 channel.basic_qos(prefetch_count=1)26 channel.basic_consume(on_request,queue="rpc")27 print("Waiting RPC requests")28 channel.start_consuming()
1 import pika 2 import uuid 3 import random 4 5 class Client(object): 6 def __init__(self): 7 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 8 self.channel = self.connection.channel() 9 10 result = self.channel.queue_declare(exclusive=True)11 self.callback_queue = result.method.queue12 13 self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue)14 15 def on_response(self,ch,method,props,body):16 if self.corr_id == props.correlation_id:17 self.response = body18 19 def call(self,command,host):20 self.response = None21 self.corr_id = str(uuid.uuid4())22 self.channel.basic_publish(exchange="",23 routing_key="rpc",24 properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),25 body=str(command)26 )27 while self.response is None:28 self.connection.process_data_events()29 30 self.corr_id = random.randint(10000,99999)31 id = self.corr_id32 res = self.response.decode()33 dict[id] =res34 print('task_id: %s host: %s cmd: %s ' % (self.corr_id, host, command))35 return self.corr_id, self.response.decode()36 37 def help():38 print("run 'df -h' --hosts 127.0.0.1 10.10.10.1")39 print("check_task id")40 print("check_task_all")41 42 def start(command_input):43 command_list = command_input.split()44 if command_list[0]== "run":45 try:46 obj = command_input.split("run")47 host_obj =obj[1].split("--hosts")48 hosts = host_obj[1].strip().split()49 command= command_input.split("\"")[1]50 for host in hosts:51 try:52 rpc_client.call(command,host)53 except TypeError and AssertionError:54 break55 except IndexError:56 print('-bash: %s command not found' % command_input)57 help()58 elif command_list[0] =="check_task":59 try:60 print(dict[int(command_list[1])])61 except IndexError:62 help()63 elif command_list[0] =="check_task_all":64 for index, key in enumerate(dict.keys()):65 print(index,key)66 elif command_list[0]=="help":67 help()68 else:69 print('-bash: %s command not found' % command_input)70 help()71 72 73 rpc_client = Client()74 dict={}75 help()76 while True:77 command_input =input("请输入命令>>>:").strip()78 if len(command_input)==0:79 continue80 else:81 start(command_input)