123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- #!/usr/bin/env python3
- """Take screen captures from DS1000Z-series oscilloscopes
-
- This program captures either the waveform or the whole screen of a Rigol
- DS1000Z series oscilloscope, then saves it on the computer as a CSV, PNG
- or BMP file.
-
- The program uses the LXI protocol, so the computer must have a LAN
- connection with the oscilloscope.
- """
-
- from enum import Enum, auto
- import argparse
- import logging
- import os
- import platform
- import subprocess
- import sys
- import time
-
- from Rigol_functions import *
- from telnetlib_receive_all import Telnet
-
- __version__ = 'v1.1.0'
- # Added TMC Blockheader decoding
- # Added possibility to manually allow run for scopes other then DS1000Z
- __author__ = 'RoGeorge'
-
- #
- # TODO: Write all SCPI commands in their short name, with capitals
- # TODO: Add ignore instrument model switch instead of asking
- #
- # TODO: Detect if the scope is in RUN or in STOP mode (looking at the length of data extracted)
- # TODO: Add logic for 1200/mdep points to avoid displaying the 'Invalid Input!' message
- # TODO: Add message for csv data points: mdep (all) or 1200 (screen), depending on RUN/STOP state, MATH and WAV:MODE
- # TODO: Add STOP scope switch
- #
- # TODO: Add debug switch
- # TODO: Clarify info, warning, error, debug and print messages
- #
- # TODO: Add automated version increase
- #
- # TODO: Extract all memory datapoints. For the moment, CSV is limited to the displayed 1200 datapoints.
- # TODO: Use arrays instead of strings and lists for csv mode.
- #
- # TODO: variables/functions name refactoring
- # TODO: Fine tune maximum chunk size request
- # TODO: Investigate scaling. Sometimes 3.0e-008 instead of expected 3.0e-000
- # TODO: Add timestamp and mark the trigger point as t0
- # TODO: Use channels label instead of chan1, chan2, chan3, chan4, math
- # TODO: Add command line parameters file path
- # TODO: Speed-up the transfer, try to replace Telnet with direct TCP
- # TODO: Add GUI
- # TODO: Add browse and custom filename selection
- # TODO: Create executable distributions
- #
-
- # Set the desired logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- filename=os.path.basename(sys.argv[0]) + '.log',
- filemode='w')
-
- logging.info("***** New run started...")
- logging.info("OS Platform: " + str(platform.uname()))
- log_running_python_versions()
-
- # Update the next lines for your own default settings:
- path_to_save = "captures/"
- IP_DS1104Z = "192.168.1.3"
-
- # Rigol/LXI specific constants
- port = 5555
-
- big_wait = 10
- smallWait = 1
-
- company = 0
- model = 1
- serial = 2
-
-
- # Read/verify file type
- class FileType(Enum):
- png = auto()
- bmp = auto()
- csv = auto()
-
-
- # Check network response (ping)
- def test_ping(hostname):
- """Ping hostname once"""
- if platform.system() == "Windows":
- command = ['ping', '-n', '1', hostname]
- else:
- command = ['ping', '-c', '1', hostname]
- completed = subprocess.run(command, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
-
- if completed.returncode != 0:
- print()
- print("WARNING! No response pinging", hostname)
- print("Check network cables and settings.")
- print("You should be able to ping the oscilloscope.")
-
- def run(hostname, filename, filetype):
- test_ping(hostname)
-
- # Open a modified telnet session
- # The default telnetlib drops 0x00 characters,
- # so a modified library 'telnetlib_receive_all' is used instead
- tn = Telnet(hostname, port)
- instrument_id = command(tn, "*IDN?").decode() # ask for instrument ID
-
- # Check if instrument is set to accept LAN commands
- if instrument_id == "command error":
- print ("Instrument reply:", instrument_id)
- print ("Check the oscilloscope settings.")
- print ("Utility -> IO Setting -> RemoteIO -> LAN must be ON")
- sys.exit("ERROR")
-
- # Check if instrument is indeed a Rigol DS1000Z series
- id_fields = instrument_id.split(",")
- if (id_fields[company] != "RIGOL TECHNOLOGIES") or \
- (id_fields[model][:3] != "DS1") or (id_fields[model][-1] != "Z"):
- print ("Found instrument model '{}' from '{}'".format(id_fields[model], id_fields[company]))
- print ("WARNING: No Rigol from series DS1000Z found at", hostname)
- print ()
- typed = raw_input("ARE YOU SURE YOU WANT TO CONTINUE? (No/Yes):")
- if typed != 'Yes':
- sys.exit('Nothing done. Bye!')
-
- print ("Instrument ID:", instrument_id)
-
- # Prepare filename as C:\MODEL_SERIAL_YYYY-MM-DD_HH.MM.SS
- timestamp = time.strftime("%Y-%m-%d_%H.%M.%S", time.localtime())
- if filename is None:
- filename = "{}{}_{}_{}.{}".format(path_to_save, id_fields[model],
- id_fields[serial], timestamp,
- filetype.name)
-
- if filetype in {FileType.png, FileType.bmp}:
- # Ask for an oscilloscope display print screen
- print ("Receiving screen capture...")
-
- if filetype is FileType.png:
- buff = command(tn, ":DISP:DATA? ON,OFF,PNG")
- else:
- buff = command(tn, ":DISP:DATA? ON,OFF,BMP8")
-
- expectedBuffLen = expected_buff_bytes(buff)
- # Just in case the transfer did not complete in the expected time, read the remaining 'buff' chunks
- while len(buff) < expectedBuffLen:
- logging.warning("Received LESS data then expected! (" +
- str(len(buff)) + " out of " + str(expectedBuffLen) + " expected 'buff' bytes.)")
- tmp = tn.read_until(b"\n", smallWait)
- if len(tmp) == 0:
- break
- buff += tmp
- logging.warning(str(len(tmp)) + " leftover bytes added to 'buff'.")
-
- if len(buff) < expectedBuffLen:
- logging.error("After reading all data chunks, 'buff' is still shorter then expected! (" +
- str(len(buff)) + " out of " + str(expectedBuffLen) + " expected 'buff' bytes.)")
- sys.exit("ERROR")
-
- # Strip TMC Blockheader and keep only the data
- tmcHeaderLen = tmc_header_bytes(buff)
- expectedDataLen = expected_data_bytes(buff)
- buff = buff[tmcHeaderLen: tmcHeaderLen+expectedDataLen]
-
- # Write raw data to file
- with open(filename, 'wb') as f:
- f.write(buff)
- print('Saved raw data to {}'.format(filename))
-
- # TODO: Change WAV:FORM from ASC to BYTE
- elif filetype is FileType.csv:
- # Put the scope in STOP mode - for the moment, deal with it by manually stopping the scope
- # TODO: Add command line switch and code logic for 1200 vs ALL memory data points
- # tn.write("stop")
- # response = tn.read_until("\n", 1)
-
- # Scan for displayed channels
- chanList = []
- for channel in ["CHAN1", "CHAN2", "CHAN3", "CHAN4", "MATH"]:
- response = command(tn, ":" + channel + ":DISP?")
-
- # If channel is active
- if response == '1\n':
- chanList += [channel]
-
- # the meaning of 'max' is - will read only the displayed data when the scope is in RUN mode,
- # or when the MATH channel is selected
- # - will read all the acquired data points when the scope is in STOP mode
- # TODO: Change mode to MAX
- # TODO: Add command line switch for MAX/NORM
- command(tn, ":WAV:MODE NORM")
- command(tn, ":WAV:STAR 0")
- command(tn, ":WAV:MODE NORM")
-
- csv_buff = ""
-
- # for each active channel
- for channel in chanList:
- print ()
-
- # Set WAVE parameters
- command(tn, ":WAV:SOUR " + channel)
- command(tn, ":WAV:FORM ASC")
-
- # MATH channel does not allow START and STOP to be set. They are always 0 and 1200
- if channel != "MATH":
- command(tn, ":WAV:STAR 1")
- command(tn, ":WAV:STOP 1200")
-
- buff = ""
- print ("Data from channel '" + str(channel) + "', points " + str(1) + "-" + str(1200) + ": Receiving...")
- buffChunk = command(tn, ":WAV:DATA?")
-
- # Just in case the transfer did not complete in the expected time
- while buffChunk[-1] != "\n":
- logging.warning("The data transfer did not complete in the expected time of " +
- str(smallWait) + " second(s).")
-
- tmp = tn.read_until(b"\n", smallWait)
- if len(tmp) == 0:
- break
- buffChunk += tmp
- logging.warning(str(len(tmp)) + " leftover bytes added to 'buff_chunks'.")
-
- # Append data chunks
- # Strip TMC Blockheader and terminator bytes
- buff += buffChunk[tmc_header_bytes(buffChunk):-1] + ","
-
- # Strip the last \n char
- buff = buff[:-1]
-
- # Process data
- buff_list = buff.split(",")
- buff_rows = len(buff_list)
-
- # Put read data into csv_buff
- csv_buff_list = csv_buff.split(os.linesep)
- csv_rows = len(csv_buff_list)
-
- current_row = 0
- if csv_buff == "":
- csv_first_column = True
- csv_buff = str(channel) + os.linesep
- else:
- csv_first_column = False
- csv_buff = str(csv_buff_list[current_row]) + "," + str(channel) + os.linesep
-
- for point in buff_list:
- current_row += 1
- if csv_first_column:
- csv_buff += str(point) + os.linesep
- else:
- if current_row < csv_rows:
- csv_buff += str(csv_buff_list[current_row]) + "," + str(point) + os.linesep
- else:
- csv_buff += "," + str(point) + os.linesep
-
- # Save data as CSV
- scr_file = open(filename, "wb")
- scr_file.write(csv_buff)
- scr_file.close()
-
- print ("Saved file:", "'" + filename + "'")
-
- tn.close()
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Take screen captures from"
- " DS1000Z-series oscilloscopes")
- parser.add_argument("-t", "--type",
- choices=FileType.__members__,
- help="Optional type of file to save")
- parser.add_argument("hostname",
- help="Hostname or IP address of the oscilloscope")
- parser.add_argument("filename", nargs="?",
- help="Optional name of output file")
-
- args = parser.parse_args()
-
- # If no type is specified, auto-detect from the filename
- if args.type is None:
- if args.filename is None:
- parser.error("Either a file type or a filename must be specified")
- args.type = os.path.splitext(args.filename)[1][1:]
-
- try:
- args.type = FileType[args.type]
- except KeyError:
- parser.error("Unknown file type: {}".format(args.type))
-
- run(args.hostname, args.filename, args.type)
|