Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

import os 

import struct 

import glob 

import json 

from collections import deque 

 

 

class FifoMemoryQueue(object): 

    """Memory FIFO queue.""" 

 

    def __init__(self): 

        self.q = deque() 

 

    def push(self, obj): 

        self.q.appendleft(obj) 

 

    def pop(self): 

        if self.q: 

            return self.q.pop() 

 

    def close(self): 

        pass 

 

    def __len__(self): 

        return len(self.q) 

 

 

class LifoMemoryQueue(FifoMemoryQueue): 

    """Memory LIFO queue.""" 

 

    def push(self, obj): 

        self.q.append(obj) 

 

 

class FifoDiskQueue(object): 

    """Persistent FIFO queue.""" 

 

    szhdr_format = ">L" 

    szhdr_size = struct.calcsize(szhdr_format) 

 

    def __init__(self, path, chunksize=100000): 

        self.path = path 

        if not os.path.exists(path): 

            os.makedirs(path) 

        self.info = self._loadinfo(chunksize) 

        self.chunksize = self.info['chunksize'] 

        self.headf = self._openchunk(self.info['head'][0], 'ab+') 

        self.tailf = self._openchunk(self.info['tail'][0]) 

        self.tailf.seek(self.info['tail'][2]) 

 

    def push(self, string): 

        hnum, hpos = self.info['head'] 

        hpos += 1 

        szhdr = struct.pack(self.szhdr_format, len(string)) 

        os.write(self.headf.fileno(), szhdr + string) 

        if hpos == self.chunksize: 

            hpos = 0 

            hnum += 1 

            self.headf.close() 

            self.headf = self._openchunk(hnum, 'ab+') 

        self.info['size'] += 1 

        self.info['head'] = hnum, hpos 

 

    def _openchunk(self, number, mode='r'): 

        return open(os.path.join(self.path, 'q%05d' % number), mode) 

 

    def pop(self): 

        tnum, tcnt, toffset = self.info['tail'] 

        if [tnum, tcnt] >= self.info['head']: 

            return 

        tfd = self.tailf.fileno() 

        szhdr = os.read(tfd, self.szhdr_size) 

        if not szhdr: 

            return 

        size, = struct.unpack(self.szhdr_format, szhdr) 

        data = os.read(tfd, size) 

        tcnt += 1 

        toffset += self.szhdr_size + size 

        if tcnt == self.chunksize and tnum <= self.info['head'][0]: 

            tcnt = toffset = 0 

            tnum += 1 

            self.tailf.close() 

            os.remove(self.tailf.name) 

            self.tailf = self._openchunk(tnum) 

        self.info['size'] -= 1 

        self.info['tail'] = tnum, tcnt, toffset 

        return data 

 

    def close(self): 

        self.headf.close() 

        self.tailf.close() 

        self._saveinfo(self.info) 

        if len(self) == 0: 

            self._cleanup() 

 

    def __len__(self): 

        return self.info['size'] 

 

    def _loadinfo(self, chunksize): 

        infopath = self._infopath() 

        if os.path.exists(infopath): 

            with open(infopath) as f: 

                info = json.load(f) 

        else: 

            info = { 

                'chunksize': chunksize, 

                'size': 0, 

                'tail': [0, 0, 0], 

                'head': [0, 0], 

            } 

        return info 

 

    def _saveinfo(self, info): 

        with open(self._infopath(), 'w') as f: 

            json.dump(info, f) 

 

    def _infopath(self): 

        return os.path.join(self.path, 'info.json') 

 

    def _cleanup(self): 

        for x in glob.glob(os.path.join(self.path, 'q*')): 

            os.remove(x) 

        os.remove(os.path.join(self.path, 'info.json')) 

exit        if not os.listdir(self.path): 

            os.rmdir(self.path) 

 

 

 

class LifoDiskQueue(object): 

    """Persistent LIFO queue.""" 

 

    SIZE_FORMAT = ">L" 

    SIZE_SIZE = struct.calcsize(SIZE_FORMAT) 

 

    def __init__(self, path): 

        self.path = path 

        if os.path.exists(path): 

            self.f = open(path, 'rb+') 

            qsize = self.f.read(self.SIZE_SIZE) 

            self.size, = struct.unpack(self.SIZE_FORMAT, qsize) 

            self.f.seek(0, os.SEEK_END) 

        else: 

            self.f = open(path, 'wb+') 

            self.f.write(struct.pack(self.SIZE_FORMAT, 0)) 

            self.size = 0 

 

    def push(self, string): 

        self.f.write(string) 

        ssize = struct.pack(self.SIZE_FORMAT, len(string)) 

        self.f.write(ssize) 

        self.size += 1 

 

    def pop(self): 

        if not self.size: 

            return 

        self.f.seek(-self.SIZE_SIZE, os.SEEK_END) 

        size, = struct.unpack(self.SIZE_FORMAT, self.f.read()) 

        self.f.seek(-size-self.SIZE_SIZE, os.SEEK_END) 

        data = self.f.read(size) 

        self.f.seek(-size, os.SEEK_CUR) 

        self.f.truncate() 

        self.size -= 1 

        return data 

 

    def close(self): 

        if self.size: 

            self.f.seek(0) 

            self.f.write(struct.pack(self.SIZE_FORMAT, self.size)) 

        self.f.close() 

        if not self.size: 

            os.remove(self.path) 

 

    def __len__(self): 

        return self.size