計劃做一個工業(yè)二級系統(tǒng),上連Mes,下連一級plc,遂學習。
與plc鏈接和數(shù)據(jù)庫的部分基礎是寫完了,遇到的問題是數(shù)據(jù)庫服務需要先寫入一條總數(shù)據(jù),數(shù)據(jù)的objectid唯一,然后將objectid發(fā)送給plc讀寫服務,plc讀寫服務才能寫入對應的數(shù)據(jù)。涉及到服務之間的通信,沒有想到更好的方式,遂選擇RabbitMQ作為信息中間件來進行進程間的交互。
安裝RabbitMQ的時候踩了一個坑,因為計算機的名稱是中文導致了管理模塊進不去,修改了計算機名稱之后能進去了,但是沒有監(jiān)聽端口,找了一圈才發(fā)現(xiàn)是需要重新修復一下。
進入rabbitmq安裝目錄的sbin文件夾后,先關閉rabbitmq服務,在cmd里面按順序執(zhí)行下列操作:
rabbitmq-service.bat removeset
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
然后就修好了。
回到正題,測試用的生產(chǎn)者初始化很簡單:
user_info = pika.PlainCredentials('guest', 'guest') # 用戶名和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', user_info))
channel = connection.channel()
queue = 'testqueue'
channel.exchange_declare(exchange='exchangedirect',exchange_type='direct',passive=True)
定義了一個類來發(fā)送信息:
class Producer:
def __init__(self):
self.message = {
'fuc':self.run,
'you':'2k'
}
json_str = json.dumps(self.message)
self.bytes_data = json_str.encode('utf-8')#轉(zhuǎn)為bytes
def send(self):
channel.basic_publish(exchange='exchangedirect',
routing_key='wang',
body=self.bytes_data,
)
channel.basic_publish(exchange='exchangedirect',
routing_key='jiang',
body=self.bytes_data,
)
if __name__ == "__main__":
producer = Producer()
producer.send()
想要使用json來作為信息類型,比較方便,因為RabbitMQ傳輸時使用的是Bytes格式數(shù)據(jù),因此要先把json轉(zhuǎn)為bytes字符串。隨便定義了兩個路由'wang'和'jiang'用來測試。
#消費者
import pika
import json
import queue
user_info = pika.PlainCredentials('guest', 'guest') # 用戶名和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', user_info))
channel = connection.channel()
channel.exchange_declare(exchange='exchangedirect',exchange_type='direct',passive=True)
queue_name = "testqueue"
channel.queue_bind(exchange='exchangedirect',
queue=queue_name,
routing_key='wang')
channel.queue_bind(exchange='exchangedirect',
queue=queue_name,
routing_key='chara')
class Customer:
def __init__(self):
super().__init__()
self.recdata = {}
self.task = ''
self.queue = queue.Queue(maxsize=100)
def run(self):
print('MQ祈禱中.....')
channel.basic_consume(queue=queue_name,
auto_ack=False,
on_message_callback=self.callback,
)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
# 構建回調(diào)函數(shù)
def callback(self,ch, method, properties, body):
print(" 接收消息成功,消息為:{} .......".format(body))
encoding = properties.content_encoding or 'utf-8'
json_str = body.decode(encoding)
data = json.loads(json_str)
print(f"收到JSON數(shù)據(jù): {data}")
self.recdata = data
self.puttask()
ch.basic_ack(delivery_tag=method.delivery_tag)
def puttask(self):
task = self.recdata.get('task')
if not len(task) == 0:
task = self.task
self.queue.put(task,block=True, timeout=1)
print(self.task)
if __name__ == "__main__":
customer = Customer()
customer.run()
測試用的消費者,鏈接方面和生產(chǎn)者一樣,run函數(shù)啟動監(jiān)聽,回調(diào)函數(shù)處理收到的信息轉(zhuǎn)為json格式,然后調(diào)用puttask方法將方法加入隊列。還差個任務完成的還沒寫。
浙公網(wǎng)安備 33010602011771號