import socket
import threading
import time
import cv2
from PIL import Image,ImageDraw,ImageFont
import xgoscreen.LCD_2inch as LCD_2inch
import numpy as np
import struct
class CAMERAXGO():
def __init__(self, conn):
self.display = LCD_2inch.LCD_2inch()
self.display.Init()
self.display.clear()
self.splash = Image.new("RGB", (320, 240), "black")
self.display.ShowImage(self.splash)
self.camera_still = False
self.cap = None
self.conn = conn
def open_camera(self):
if self.cap == None:
self.cap = cv2.VideoCapture(0)
self.cap.set(3, 320)
self.cap.set(4, 240)
def xgoCamera(self,switch):
global camera_still
if switch:
self.open_camera()
self.camera_still=True
t = threading.Thread(target=self.camera_mode)
t.start()
else:
self.camera_still=False
time.sleep(0.5)
splash = Image.new("RGB",(320,240),"black")
self.display.ShowImage(splash)
def camera_mode(self):
self.camera_still=True
while 1:
success,frame = self.cap.read()
b,g,r = cv2.split(frame)
image = cv2.merge((r,g,b))
image = cv2.flip(image,1)
imgok = Image.fromarray(image)
self.display.ShowImage(imgok)
#socket发送数据
img_encode = cv2.imencode('.jpg', frame)[1]
str_encode = img_encode.tobytes()
image_length = len(str_encode)
packed_length = struct.pack('!I', image_length)
self.conn.sendall(packed_length)
self.conn.sendall(str_encode)
# data_encode = np.array(img_encode)
# str_encode = data_encode.tobytes() # tostring()
# image_length = len(str_encode)
# packed_length = struct.pack('!I', image_length)
# conn.sendall(packed_length)
# conn.sendall(str_encode)
if not self.camera_still:
break
if __name__ == '__main__':
address = ('0.0.0.0', 5005) # 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
print('waiting。。。')
conn, addr = ser.accept()
# 调用上面定义的函数
ca = CAMERAXGO(conn)
ca.xgoCamera(True)
# time.sleep(3)
# ca.xgoCamera(False)
# print("This script is running as the main program.")