summaryrefslogtreecommitdiffstats
path: root/DeDRM_plugin/subasyncio.py
blob: de084d303fcc72ce25e9213ab87f8f1d91bfdb03 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab

import os, sys
import signal
import threading
import subprocess
from subprocess import Popen, PIPE, STDOUT

# **heavily** chopped up and modfied version of asyncproc.py
# to make it actually work on Windows as well as Mac/Linux
# For the original see:
# "http://www.lysator.liu.se/~bellman/download/"
# author is  "Thomas Bellman <[email protected]>"
# available under GPL version 3 or Later

# create an asynchronous subprocess whose output can be collected in
# a non-blocking manner

# What a mess!  Have to use threads just to get non-blocking io
# in a cross-platform manner

# luckily all thread use is hidden within this class

class Process(object):
    def __init__(self, *params, **kwparams):
        if len(params) <= 3:
            kwparams.setdefault('stdin', subprocess.PIPE)
        if len(params) <= 4:
            kwparams.setdefault('stdout', subprocess.PIPE)
        if len(params) <= 5:
            kwparams.setdefault('stderr', subprocess.PIPE)
        self.__pending_input = []
        self.__collected_outdata = []
        self.__collected_errdata = []
        self.__exitstatus = None
        self.__lock = threading.Lock()
        self.__inputsem = threading.Semaphore(0)
        self.__quit = False

        self.__process = subprocess.Popen(*params, **kwparams)

        if self.__process.stdin:
            self.__stdin_thread = threading.Thread(
                name="stdin-thread",
                target=self.__feeder, args=(self.__pending_input,
                                            self.__process.stdin))
            self.__stdin_thread.setDaemon(True)
            self.__stdin_thread.start()

        if self.__process.stdout:
            self.__stdout_thread = threading.Thread(
                name="stdout-thread",
                target=self.__reader, args=(self.__collected_outdata,
                                            self.__process.stdout))
            self.__stdout_thread.setDaemon(True)
            self.__stdout_thread.start()

        if self.__process.stderr:
            self.__stderr_thread = threading.Thread(
                name="stderr-thread",
                target=self.__reader, args=(self.__collected_errdata,
                                            self.__process.stderr))
            self.__stderr_thread.setDaemon(True)
            self.__stderr_thread.start()

    def pid(self):
        return self.__process.pid

    def kill(self, signal):
        self.__process.send_signal(signal)

    # check on subprocess (pass in 'nowait') to act like poll
    def wait(self, flag):
        if flag.lower() == 'nowait':
            rc = self.__process.poll()
        else:
            rc = self.__process.wait()
        if rc != None:
            if self.__process.stdin:
                self.closeinput()
            if self.__process.stdout:
                self.__stdout_thread.join()
            if self.__process.stderr:
                self.__stderr_thread.join()
        return self.__process.returncode

    def terminate(self):
        if self.__process.stdin:
            self.closeinput()
        self.__process.terminate()

    # thread gets data from subprocess stdout
    def __reader(self, collector, source):
        while True:
            data = os.read(source.fileno(), 65536)
            self.__lock.acquire()
            collector.append(data)
            self.__lock.release()
            if data == "":
                source.close()
                break
        return

    # thread feeds data to subprocess stdin
    def __feeder(self, pending, drain):
        while True:
            self.__inputsem.acquire()
            self.__lock.acquire()
            if not pending  and self.__quit:
                drain.close()
                self.__lock.release()
                break
            data = pending.pop(0)
            self.__lock.release()
            drain.write(data)

    # non-blocking read of data from subprocess stdout
    def read(self):
        self.__lock.acquire()
        outdata = "".join(self.__collected_outdata)
        del self.__collected_outdata[:]
        self.__lock.release()
        return outdata

    # non-blocking read of data from subprocess stderr
    def readerr(self):
        self.__lock.acquire()
        errdata = "".join(self.__collected_errdata)
        del self.__collected_errdata[:]
        self.__lock.release()
        return errdata

    # non-blocking write to stdin of subprocess
    def write(self, data):
        if self.__process.stdin is None:
            raise ValueError("Writing to process with stdin not a pipe")
        self.__lock.acquire()
        self.__pending_input.append(data)
        self.__inputsem.release()
        self.__lock.release()

    # close stdinput of subprocess
    def closeinput(self):
        self.__lock.acquire()
        self.__quit = True
        self.__inputsem.release()
        self.__lock.release()