Skip to content

Server

penvm.server.kernelmanager

All server-side kernels are registered with the manager and accessible by name.

KernelManager

Bases: BaseObject

Manage server-side kernels.

Source code in penvm/src/server/penvm/server/kernelmanager.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class KernelManager(BaseObject):
    """Manage server-side kernels."""

    def __init__(self, machine: "Machine"):
        """Initialize.

        Args:
            machine: Machine.
        """
        super().__init__(None, logger)
        self.machine = machine
        self.kernels = {}

    def drop(self, name: str):
        """Drop kernel by name.

        Args:
            name: Kernel name.
        """
        self.kernels.pop(name)

    def get(self, name: str) -> "Kernel":
        """Get kernel by name.

        Args:
            name: Kernel name.
        """
        return self.kernels.get(name)

    def list(self) -> List[str]:
        """Get list of kernel names.

        Returns:
            Kernel names.
        """
        return list(self.kernels.keys())

    def set(self, name: str, kernel: "Kernel"):
        """Register kernel by name.

        Args:
            name: Kernel name.
            kernel: Kernel.
        """
        self.kernels[name] = kernel

    def start(self):
        """Start.

        No background threads."""
        pass

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "kernel-manager",
                None,
                {
                    "kernel-ids": self.list(),
                    "nkernels": len(self.kernels),
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(machine)

Initialize.

Parameters:

Name Type Description Default
machine Machine

Machine.

required
Source code in penvm/src/server/penvm/server/kernelmanager.py
38
39
40
41
42
43
44
45
46
def __init__(self, machine: "Machine"):
    """Initialize.

    Args:
        machine: Machine.
    """
    super().__init__(None, logger)
    self.machine = machine
    self.kernels = {}

drop(name)

Drop kernel by name.

Parameters:

Name Type Description Default
name str

Kernel name.

required
Source code in penvm/src/server/penvm/server/kernelmanager.py
48
49
50
51
52
53
54
def drop(self, name: str):
    """Drop kernel by name.

    Args:
        name: Kernel name.
    """
    self.kernels.pop(name)

get(name)

Get kernel by name.

Parameters:

Name Type Description Default
name str

Kernel name.

required
Source code in penvm/src/server/penvm/server/kernelmanager.py
56
57
58
59
60
61
62
def get(self, name: str) -> "Kernel":
    """Get kernel by name.

    Args:
        name: Kernel name.
    """
    return self.kernels.get(name)

list()

Get list of kernel names.

Returns:

Type Description
List[str]

Kernel names.

Source code in penvm/src/server/penvm/server/kernelmanager.py
64
65
66
67
68
69
70
def list(self) -> List[str]:
    """Get list of kernel names.

    Returns:
        Kernel names.
    """
    return list(self.kernels.keys())

set(name, kernel)

Register kernel by name.

Parameters:

Name Type Description Default
name str

Kernel name.

required
kernel Kernel

Kernel.

required
Source code in penvm/src/server/penvm/server/kernelmanager.py
72
73
74
75
76
77
78
79
def set(self, name: str, kernel: "Kernel"):
    """Register kernel by name.

    Args:
        name: Kernel name.
        kernel: Kernel.
    """
    self.kernels[name] = kernel

start()

Start.

No background threads.

Source code in penvm/src/server/penvm/server/kernelmanager.py
81
82
83
84
85
def start(self):
    """Start.

    No background threads."""
    pass

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/server/penvm/server/kernelmanager.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "kernel-manager",
            None,
            {
                "kernel-ids": self.list(),
                "nkernels": len(self.kernels),
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

penvm.server.machine

Machine

Bases: BaseObject

Server-side machine.

Provides access to all "managers" (e.g, connection, kernel, session, store), main incoming and outgoing message queues, and state/control to support debug mode.

Source code in penvm/src/server/penvm/server/machine.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class Machine(BaseObject):
    """Server-side machine.

    Provides access to all "managers" (e.g, connection, kernel,
    session, store), main incoming and outgoing message queues, and
    state/control to support debug mode."""

    def __init__(
        self,
        host: str,
        port: int,
        sslprofile: Union[str, None] = None,
        machineid: Union[str, None] = None,
    ):
        """Set up server-side machine.

        Args:
            host: Host address.
            port: Port.
            sslprofile: SSL profile for SSL context.
            machineid: Machine id. Generated if not provided.
        """
        try:
            super().__init__(machineid, logger)
            tlogger = self.logger.enter()

            self.sslprofile = sslprofile
            self.sslcontext = self.get_sslcontext(self.sslprofile)

            self.connmgr = ConnectionManager(self, host, port, self.sslcontext)
            self.kernelmgr = KernelManager(self)
            self.sessmgr = SessionManager(self)
            # self.storemgr = StorageManager(self)
            self.fkvstore = FileKVStore(f"/tmp/penvm-store-{self.oid}-{get_uuid1()}")

            for kernel_cls in [CoreKernel, DefaultKernel]:
                self.kernelmgr.set(kernel_cls.name, kernel_cls())

            self._background = False
            self.debug = False

            self.schedlock = threading.Lock()
            self.imq = RoutingQueue(put=self.imq_put)
            self.omq = RoutingQueue(put=self.omq_put)

        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def features(self) -> Dict:
        """Get a dictionary of features.

        Returns:
            Dictionary of features.
        """
        try:
            # languages
            d = {
                "language": "python",
                "python": {
                    "platform": sys.platform,
                    "version": list(sys.version_info),
                },
                "library": {},
            }

            # libraries
            try:
                import numpy

                d["library"]["numpy"] = numpy.version.full_version
            except:
                pass

            return d
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def get_addr_port(self) -> Tuple[str, int]:
        """Get machine listener address and port.

        Returns:
            Tuple of listener host and port.
        """
        return (self.connmgr.listener.lhost, self.connmgr.listener.lport)

    def get_session(
        self,
        sessionid: str = "default",
    ) -> "Session":
        """Get/create session.

        Args:
            sessionid (str): Session id (optional).

        Returns:
            Session.
        """
        # TODO: no default. must be provided by client
        return self.sessmgr.setdefault(sessionid)

    def get_sslcontext(
        self,
        sslprofile: Union[str, None],
    ) -> Union["ssl.SSLContext", None]:
        """Load server-side SSLContext based on named ssl profile.

        Args:
            sslprofile: SSL profile name.

        Returns:
            SSLContext.
        """
        if sslprofile:
            try:
                import ssl

                sslprofile_dir = os.path.expanduser(f"~/.penvm/ssl/{sslprofile}")
                sslcontext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
                # sslcontext.verify_mode = ssl.CERT_REQUIRED
                sslcontext.load_cert_chain(
                    certfile=f"{sslprofile_dir}/server.crt",
                    keyfile=f"{sslprofile_dir}/server.key",
                )
                return sslcontext
            except Exception as e:
                logger.debug(f"ssl required but missing for ssl profile ({sslprofile})")
                raise Exception(f"ssl required but missing for ssl profile ({sslprofile})")

    def imq_put(
        self,
        msg: "Message",
    ):
        """Triage message and put on proper session incoming message
        queue.

        Note: This is where incoming requests result in session
        creation, then request processing!

        Args:
            msg: Message to enqueue.
        """
        try:
            tlogger = self.logger.enter()

            sessionid = msg.header.get("session-id", "default")
            sess = self.sessmgr.setdefault(sessionid)
            if sess:
                sess.imq.put(msg)
            else:
                # DROP!
                tlogger.warning(f"dropped message for session {sessionid}")
                pass
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def omq_put(
        self,
        msg: "Message",
    ):
        """Triage message and put on proper connection outgoing
        message queue.

        Args:
            msg: Message to enqueue.
        """
        try:
            tlogger = self.logger.enter()

            connectionid = msg.header.get("connection-id")
            conn = self.connmgr.get(connectionid)
            if conn:
                conn.omq.put(msg)
            else:
                # DROP!
                tlogger.warning("omq_put dropped message")
                pass
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def pop_session(
        self,
        sessionid: str = "default",
    ) -> "Session":
        """Pop session.

        Args:
            sessionid: Session id.

        Returns:
            Session.
        """
        return self.sessmgr.pop(sessionid)

    def run(self):
        """Run "runnable" managers."""
        try:
            tlogger = self.logger.enter()
            self.connmgr.run()
            # self.sessmgr.run()
            # self.opsmgr.run()
        finally:
            tlogger.exit()

    def set_debug(
        self,
        enabled: bool,
    ):
        """Set debug mode

        Args:
            enabled: New state for debug mode.
        """
        try:
            tlogger = self.logger.enter()
            tlogger.debug("setting debug ({enabled})")

            self.debug = enabled
            if self.debug:
                try:
                    if self.schedlock.locked():
                        self.schedlock.release()
                except Exception as e:
                    pass
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def shutdown(
        self,
        now: bool = False,
    ):
        """Shutdown machine.

        Args:
            now: To shut down "now".
        """
        try:
            tlogger = self.logger.enter()
            tlogger.debug("SHUTTTING DOWN ...")
            if now:
                os._exit(0)
        finally:
            tlogger.exit()

    def start(self):
        """Start machine."""
        try:
            tlogger = self.logger.enter()
            self.connmgr.start()
            self.sessmgr.start()
            self.kernelmgr.start()
        finally:
            tlogger.exit()

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            uname = os.uname()
            return State(
                "machine",
                self.oid,
                {
                    "debug": self.debug,
                    "features": self.features(),
                    "host": socket.gethostname(),
                    "schedlock": self.schedlock.locked(),
                    "timestamp": get_timestamp(),
                    "uname": {
                        "machine": uname.machine,
                        "nodename": uname.nodename,
                        "release": uname.release,
                        "sysname": uname.sysname,
                        "version": uname.version,
                    },
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def step_debug(self):
        """Allow a step if in debug mode."""
        try:
            tlogger = self.logger.enter()
            tlogger.debug("stepping debug")
            if self.debug:
                try:
                    if self.schedlock.locked():
                        self.schedlock.release()
                except Exception as e:
                    pass
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def stop(self):
        """Step machine."""
        try:
            tlogger = self.logger.enter()
            self.connmgr.stop()
            # self.sessmgr.stop()
        finally:
            tlogger.exit()

    def wait(self):
        """Wait (indefinitely) for machine to exit."""
        try:
            tlogger = self.logger.enter()
            time.sleep(1000000000)
        finally:
            tlogger.exit()

__init__(host, port, sslprofile=None, machineid=None)

Set up server-side machine.

Parameters:

Name Type Description Default
host str

Host address.

required
port int

Port.

required
sslprofile Union[str, None]

SSL profile for SSL context.

None
machineid Union[str, None]

Machine id. Generated if not provided.

None
Source code in penvm/src/server/penvm/server/machine.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    host: str,
    port: int,
    sslprofile: Union[str, None] = None,
    machineid: Union[str, None] = None,
):
    """Set up server-side machine.

    Args:
        host: Host address.
        port: Port.
        sslprofile: SSL profile for SSL context.
        machineid: Machine id. Generated if not provided.
    """
    try:
        super().__init__(machineid, logger)
        tlogger = self.logger.enter()

        self.sslprofile = sslprofile
        self.sslcontext = self.get_sslcontext(self.sslprofile)

        self.connmgr = ConnectionManager(self, host, port, self.sslcontext)
        self.kernelmgr = KernelManager(self)
        self.sessmgr = SessionManager(self)
        # self.storemgr = StorageManager(self)
        self.fkvstore = FileKVStore(f"/tmp/penvm-store-{self.oid}-{get_uuid1()}")

        for kernel_cls in [CoreKernel, DefaultKernel]:
            self.kernelmgr.set(kernel_cls.name, kernel_cls())

        self._background = False
        self.debug = False

        self.schedlock = threading.Lock()
        self.imq = RoutingQueue(put=self.imq_put)
        self.omq = RoutingQueue(put=self.omq_put)

    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

features()

Get a dictionary of features.

Returns:

Type Description
Dict

Dictionary of features.

Source code in penvm/src/server/penvm/server/machine.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def features(self) -> Dict:
    """Get a dictionary of features.

    Returns:
        Dictionary of features.
    """
    try:
        # languages
        d = {
            "language": "python",
            "python": {
                "platform": sys.platform,
                "version": list(sys.version_info),
            },
            "library": {},
        }

        # libraries
        try:
            import numpy

            d["library"]["numpy"] = numpy.version.full_version
        except:
            pass

        return d
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

get_addr_port()

Get machine listener address and port.

Returns:

Type Description
Tuple[str, int]

Tuple of listener host and port.

Source code in penvm/src/server/penvm/server/machine.py
124
125
126
127
128
129
130
def get_addr_port(self) -> Tuple[str, int]:
    """Get machine listener address and port.

    Returns:
        Tuple of listener host and port.
    """
    return (self.connmgr.listener.lhost, self.connmgr.listener.lport)

get_session(sessionid='default')

Get/create session.

Parameters:

Name Type Description Default
sessionid str

Session id (optional).

'default'

Returns:

Type Description
Session

Session.

Source code in penvm/src/server/penvm/server/machine.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_session(
    self,
    sessionid: str = "default",
) -> "Session":
    """Get/create session.

    Args:
        sessionid (str): Session id (optional).

    Returns:
        Session.
    """
    # TODO: no default. must be provided by client
    return self.sessmgr.setdefault(sessionid)

get_sslcontext(sslprofile)

Load server-side SSLContext based on named ssl profile.

Parameters:

Name Type Description Default
sslprofile Union[str, None]

SSL profile name.

required

Returns:

Type Description
Union[SSLContext, None]

SSLContext.

Source code in penvm/src/server/penvm/server/machine.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def get_sslcontext(
    self,
    sslprofile: Union[str, None],
) -> Union["ssl.SSLContext", None]:
    """Load server-side SSLContext based on named ssl profile.

    Args:
        sslprofile: SSL profile name.

    Returns:
        SSLContext.
    """
    if sslprofile:
        try:
            import ssl

            sslprofile_dir = os.path.expanduser(f"~/.penvm/ssl/{sslprofile}")
            sslcontext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            # sslcontext.verify_mode = ssl.CERT_REQUIRED
            sslcontext.load_cert_chain(
                certfile=f"{sslprofile_dir}/server.crt",
                keyfile=f"{sslprofile_dir}/server.key",
            )
            return sslcontext
        except Exception as e:
            logger.debug(f"ssl required but missing for ssl profile ({sslprofile})")
            raise Exception(f"ssl required but missing for ssl profile ({sslprofile})")

imq_put(msg)

Triage message and put on proper session incoming message queue.

Note: This is where incoming requests result in session creation, then request processing!

Parameters:

Name Type Description Default
msg Message

Message to enqueue.

required
Source code in penvm/src/server/penvm/server/machine.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def imq_put(
    self,
    msg: "Message",
):
    """Triage message and put on proper session incoming message
    queue.

    Note: This is where incoming requests result in session
    creation, then request processing!

    Args:
        msg: Message to enqueue.
    """
    try:
        tlogger = self.logger.enter()

        sessionid = msg.header.get("session-id", "default")
        sess = self.sessmgr.setdefault(sessionid)
        if sess:
            sess.imq.put(msg)
        else:
            # DROP!
            tlogger.warning(f"dropped message for session {sessionid}")
            pass
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

omq_put(msg)

Triage message and put on proper connection outgoing message queue.

Parameters:

Name Type Description Default
msg Message

Message to enqueue.

required
Source code in penvm/src/server/penvm/server/machine.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def omq_put(
    self,
    msg: "Message",
):
    """Triage message and put on proper connection outgoing
    message queue.

    Args:
        msg: Message to enqueue.
    """
    try:
        tlogger = self.logger.enter()

        connectionid = msg.header.get("connection-id")
        conn = self.connmgr.get(connectionid)
        if conn:
            conn.omq.put(msg)
        else:
            # DROP!
            tlogger.warning("omq_put dropped message")
            pass
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

pop_session(sessionid='default')

Pop session.

Parameters:

Name Type Description Default
sessionid str

Session id.

'default'

Returns:

Type Description
Session

Session.

Source code in penvm/src/server/penvm/server/machine.py
230
231
232
233
234
235
236
237
238
239
240
241
242
def pop_session(
    self,
    sessionid: str = "default",
) -> "Session":
    """Pop session.

    Args:
        sessionid: Session id.

    Returns:
        Session.
    """
    return self.sessmgr.pop(sessionid)

run()

Run "runnable" managers.

Source code in penvm/src/server/penvm/server/machine.py
244
245
246
247
248
249
250
251
252
def run(self):
    """Run "runnable" managers."""
    try:
        tlogger = self.logger.enter()
        self.connmgr.run()
        # self.sessmgr.run()
        # self.opsmgr.run()
    finally:
        tlogger.exit()

set_debug(enabled)

Set debug mode

Parameters:

Name Type Description Default
enabled bool

New state for debug mode.

required
Source code in penvm/src/server/penvm/server/machine.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def set_debug(
    self,
    enabled: bool,
):
    """Set debug mode

    Args:
        enabled: New state for debug mode.
    """
    try:
        tlogger = self.logger.enter()
        tlogger.debug("setting debug ({enabled})")

        self.debug = enabled
        if self.debug:
            try:
                if self.schedlock.locked():
                    self.schedlock.release()
            except Exception as e:
                pass
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

shutdown(now=False)

Shutdown machine.

Parameters:

Name Type Description Default
now bool

To shut down "now".

False
Source code in penvm/src/server/penvm/server/machine.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def shutdown(
    self,
    now: bool = False,
):
    """Shutdown machine.

    Args:
        now: To shut down "now".
    """
    try:
        tlogger = self.logger.enter()
        tlogger.debug("SHUTTTING DOWN ...")
        if now:
            os._exit(0)
    finally:
        tlogger.exit()

start()

Start machine.

Source code in penvm/src/server/penvm/server/machine.py
296
297
298
299
300
301
302
303
304
def start(self):
    """Start machine."""
    try:
        tlogger = self.logger.enter()
        self.connmgr.start()
        self.sessmgr.start()
        self.kernelmgr.start()
    finally:
        tlogger.exit()

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/server/penvm/server/machine.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        uname = os.uname()
        return State(
            "machine",
            self.oid,
            {
                "debug": self.debug,
                "features": self.features(),
                "host": socket.gethostname(),
                "schedlock": self.schedlock.locked(),
                "timestamp": get_timestamp(),
                "uname": {
                    "machine": uname.machine,
                    "nodename": uname.nodename,
                    "release": uname.release,
                    "sysname": uname.sysname,
                    "version": uname.version,
                },
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

step_debug()

Allow a step if in debug mode.

Source code in penvm/src/server/penvm/server/machine.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def step_debug(self):
    """Allow a step if in debug mode."""
    try:
        tlogger = self.logger.enter()
        tlogger.debug("stepping debug")
        if self.debug:
            try:
                if self.schedlock.locked():
                    self.schedlock.release()
            except Exception as e:
                pass
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

stop()

Step machine.

Source code in penvm/src/server/penvm/server/machine.py
351
352
353
354
355
356
357
358
def stop(self):
    """Step machine."""
    try:
        tlogger = self.logger.enter()
        self.connmgr.stop()
        # self.sessmgr.stop()
    finally:
        tlogger.exit()

wait()

Wait (indefinitely) for machine to exit.

Source code in penvm/src/server/penvm/server/machine.py
360
361
362
363
364
365
366
def wait(self):
    """Wait (indefinitely) for machine to exit."""
    try:
        tlogger = self.logger.enter()
        time.sleep(1000000000)
    finally:
        tlogger.exit()

penvm.server.processor

Processor

Bases: BaseObject

Runs kernel operations.

Kernel operations are scheduled by the session and run on the processor. Operations are run on a selected kernel. Up to max_threads messages can be processsed at a time.

The number of running "threads" can be adjusted dynamically with max_threads limiting the number of running thread. Scheduling of new threads resumes when the number of running threads goes below max_threads.

Source code in penvm/src/server/penvm/server/processor.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class Processor(BaseObject):
    """Runs kernel operations.

    Kernel operations are scheduled by the session and run on the
    processor. Operations are run on a selected kernel. Up to
    `max_threads` messages can be processsed at a time.

    The number of running "threads" can be adjusted dynamically with
    `max_threads` limiting the number of running thread. Scheduling of
    new threads resumes when the number of running threads goes below
    `max_threads`."""

    def __init__(
        self,
        session: "Session",
        kernelname: str = "default",
    ):
        """Initialize.

        Args:
            session: Owning session.
            kernelname: Kernel name.
        """
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.session = session
            self.kernelname = kernelname

            self.kill = False
            self.max_threads = 1
            self.kernel = self.session.machine.kernelmgr.get(kernelname)
            self.runsem = AdjustableSemaphore(self.max_threads)
            self.th = None
            self.threads = set()
            self.reqid2thread = {}
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def __repr__(self):
        return f"<Processor id={self.oid} max_threads={self.max_threads} nthreads={self.active_count()}>"

    def active_count(self) -> int:
        """Get number of running threads.

        Returns:
            Number of running threads.
        """
        return len(self.threads)

    def get_thread_reqids(self) -> List[str]:
        """Get list of request ids for each running thread.

        Returns:
            List of request ids.
        """
        return list(self.reqid2thread.keys())

    def run_thread(self, fn: Callable, fargs: List):
        """Runs a thread (to handle a request).

        Machinery manages creating a context, starting a thread,
        adhering to the thread limit, `ThreadInterrupt` handling,
        tracking information, and `runsem` semaphore.

        Args:
            fn: Function to call.
            fargs: Arguments to pass to function.
        """

        def _run(reqid, target, *args, **kwargs):
            try:
                # tlogger = self.logger.enter()
                try:
                    tlogger.debug(f"_run calling target={target} args={args}")
                    target(*args)
                    tlogger.lap(f"target {target} run completed ")
                except ThreadInterrupt as e:
                    tlogger.info(f"thread interrupted {threading.current_thread()}")
                except Exception as e:
                    tlogger.debug(f"_run exception ({traceback.format_exc()}")

                try:
                    tlogger.debug(f"removing thread {threading.current_thread()}...")
                    self.threads.discard(threading.current_thread())
                    self.reqid2thread.pop(reqid, None)
                    tlogger.debug(f"removed thread {threading.current_thread()}")

                    if 0:
                        # clean up "empty" sessions unless pinned
                        tlogger.debug("checking to clean up")
                        self.session.machine.sessmgr.cleanup(self.session.oid)

                except Exception as e:
                    tlogger.debug(f"thread cleanup EXCEPTION ({e})")
                tlogger.debug(f"releasing runsem ({self.session.oid=})...")
                self.runsem.release()
                tlogger.debug(f"release runsem ({self.session.oid=})")
            except Exception as e:
                self.logger.warning(f"EXCEPTION ({e})")
            finally:
                # tlogger.exit()
                pass

        try:
            tlogger = self.logger.enter()

            tlogger.debug(f"acquiring runsem ({self.session.oid=})...")
            self.runsem.acquire()
            tlogger.debug(f"acquired runsem ({self.session.oid=})")

            _opname, _ctxt, _req = fargs
            reqid = _req.header.get("id")
            sessionid = _ctxt.session.oid

            args = [reqid, fn] + list(fargs)
            th = Thread(target=_run, name=reqid, args=args)
            th.daemon = True
            self.threads.add(th)
            self.reqid2thread[reqid] = th
            th.start()
            # cleanup is done in _run()
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def schedule(self, ctxt: "OpContext", req: "Message"):
        """Schedule thread to run the requested kernel operation.

        Args:
            ctxt: Context in which to run operation.
            req: Request message.
        """
        try:
            tlogger = self.logger.enter()

            opname = req.payload.get("op")
            if opname:
                tlogger.info(f"opname={opname}")
                self.run_thread(self.kernel.run, (opname, ctxt, req))
            else:
                tlogger.warning(f"opname={opname} not found")
                # TODO: does this hang here?
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def set_max_threads(self, n: int):
        """Set the upper limit of the number of threads to run.

        Updates the `runsem` semaphore to allow acquisition, if
        possible.

        Args:
            n: Maximum number of threads.
        """
        try:
            self.max_threads = max(1, n)
            self.runsem.adjust(self.max_threads)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def start(self):
        """Start main processor thread."""
        try:
            tlogger = self.logger.enter()
            return

            # TODO: are start()/run() necessary?
            if not self.th:
                self.th = Thread(target=self.run)
                self.th.daemon = True
            self.th.start()

        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "processor",
                self.oid,
                {
                    "kernel": self.kernelname,
                    "max-threads": self.max_threads,
                    "nthreads": len(self.threads),
                    "runsem": {
                        "_lock-locked": self.runsem._lock.locked(),
                        "_waitlock-locked": self.runsem._waitlock.locked(),
                        "count": self.runsem.count(),
                        "max": self.runsem.max(),
                    },
                    "threads": [th.state() for th in self.threads],
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

    def terminate_thread(self, reqid: str):
        """Forcefully terminate a running thread.

        A `ThreadInterrupt` exception is raised for the thread.

        Args:
            reqid: Request id.
        """
        try:
            tlogger = self.logger.enter()
            tlogger.info("terminating thread ({reqid})")

            th = self.reqid2thread.get(reqid)
            th.terminate()
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

__init__(session, kernelname='default')

Initialize.

Parameters:

Name Type Description Default
session Session

Owning session.

required
kernelname str

Kernel name.

'default'
Source code in penvm/src/server/penvm/server/processor.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    session: "Session",
    kernelname: str = "default",
):
    """Initialize.

    Args:
        session: Owning session.
        kernelname: Kernel name.
    """
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.session = session
        self.kernelname = kernelname

        self.kill = False
        self.max_threads = 1
        self.kernel = self.session.machine.kernelmgr.get(kernelname)
        self.runsem = AdjustableSemaphore(self.max_threads)
        self.th = None
        self.threads = set()
        self.reqid2thread = {}
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

active_count()

Get number of running threads.

Returns:

Type Description
int

Number of running threads.

Source code in penvm/src/server/penvm/server/processor.py
80
81
82
83
84
85
86
def active_count(self) -> int:
    """Get number of running threads.

    Returns:
        Number of running threads.
    """
    return len(self.threads)

get_thread_reqids()

Get list of request ids for each running thread.

Returns:

Type Description
List[str]

List of request ids.

Source code in penvm/src/server/penvm/server/processor.py
88
89
90
91
92
93
94
def get_thread_reqids(self) -> List[str]:
    """Get list of request ids for each running thread.

    Returns:
        List of request ids.
    """
    return list(self.reqid2thread.keys())

run_thread(fn, fargs)

Runs a thread (to handle a request).

Machinery manages creating a context, starting a thread, adhering to the thread limit, ThreadInterrupt handling, tracking information, and runsem semaphore.

Parameters:

Name Type Description Default
fn Callable

Function to call.

required
fargs List

Arguments to pass to function.

required
Source code in penvm/src/server/penvm/server/processor.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run_thread(self, fn: Callable, fargs: List):
    """Runs a thread (to handle a request).

    Machinery manages creating a context, starting a thread,
    adhering to the thread limit, `ThreadInterrupt` handling,
    tracking information, and `runsem` semaphore.

    Args:
        fn: Function to call.
        fargs: Arguments to pass to function.
    """

    def _run(reqid, target, *args, **kwargs):
        try:
            # tlogger = self.logger.enter()
            try:
                tlogger.debug(f"_run calling target={target} args={args}")
                target(*args)
                tlogger.lap(f"target {target} run completed ")
            except ThreadInterrupt as e:
                tlogger.info(f"thread interrupted {threading.current_thread()}")
            except Exception as e:
                tlogger.debug(f"_run exception ({traceback.format_exc()}")

            try:
                tlogger.debug(f"removing thread {threading.current_thread()}...")
                self.threads.discard(threading.current_thread())
                self.reqid2thread.pop(reqid, None)
                tlogger.debug(f"removed thread {threading.current_thread()}")

                if 0:
                    # clean up "empty" sessions unless pinned
                    tlogger.debug("checking to clean up")
                    self.session.machine.sessmgr.cleanup(self.session.oid)

            except Exception as e:
                tlogger.debug(f"thread cleanup EXCEPTION ({e})")
            tlogger.debug(f"releasing runsem ({self.session.oid=})...")
            self.runsem.release()
            tlogger.debug(f"release runsem ({self.session.oid=})")
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            # tlogger.exit()
            pass

    try:
        tlogger = self.logger.enter()

        tlogger.debug(f"acquiring runsem ({self.session.oid=})...")
        self.runsem.acquire()
        tlogger.debug(f"acquired runsem ({self.session.oid=})")

        _opname, _ctxt, _req = fargs
        reqid = _req.header.get("id")
        sessionid = _ctxt.session.oid

        args = [reqid, fn] + list(fargs)
        th = Thread(target=_run, name=reqid, args=args)
        th.daemon = True
        self.threads.add(th)
        self.reqid2thread[reqid] = th
        th.start()
        # cleanup is done in _run()
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

schedule(ctxt, req)

Schedule thread to run the requested kernel operation.

Parameters:

Name Type Description Default
ctxt OpContext

Context in which to run operation.

required
req Message

Request message.

required
Source code in penvm/src/server/penvm/server/processor.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def schedule(self, ctxt: "OpContext", req: "Message"):
    """Schedule thread to run the requested kernel operation.

    Args:
        ctxt: Context in which to run operation.
        req: Request message.
    """
    try:
        tlogger = self.logger.enter()

        opname = req.payload.get("op")
        if opname:
            tlogger.info(f"opname={opname}")
            self.run_thread(self.kernel.run, (opname, ctxt, req))
        else:
            tlogger.warning(f"opname={opname} not found")
            # TODO: does this hang here?
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

set_max_threads(n)

Set the upper limit of the number of threads to run.

Updates the runsem semaphore to allow acquisition, if possible.

Parameters:

Name Type Description Default
n int

Maximum number of threads.

required
Source code in penvm/src/server/penvm/server/processor.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def set_max_threads(self, n: int):
    """Set the upper limit of the number of threads to run.

    Updates the `runsem` semaphore to allow acquisition, if
    possible.

    Args:
        n: Maximum number of threads.
    """
    try:
        self.max_threads = max(1, n)
        self.runsem.adjust(self.max_threads)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

start()

Start main processor thread.

Source code in penvm/src/server/penvm/server/processor.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def start(self):
    """Start main processor thread."""
    try:
        tlogger = self.logger.enter()
        return

        # TODO: are start()/run() necessary?
        if not self.th:
            self.th = Thread(target=self.run)
            self.th.daemon = True
        self.th.start()

    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/server/penvm/server/processor.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "processor",
            self.oid,
            {
                "kernel": self.kernelname,
                "max-threads": self.max_threads,
                "nthreads": len(self.threads),
                "runsem": {
                    "_lock-locked": self.runsem._lock.locked(),
                    "_waitlock-locked": self.runsem._waitlock.locked(),
                    "count": self.runsem.count(),
                    "max": self.runsem.max(),
                },
                "threads": [th.state() for th in self.threads],
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

terminate_thread(reqid)

Forcefully terminate a running thread.

A ThreadInterrupt exception is raised for the thread.

Parameters:

Name Type Description Default
reqid str

Request id.

required
Source code in penvm/src/server/penvm/server/processor.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def terminate_thread(self, reqid: str):
    """Forcefully terminate a running thread.

    A `ThreadInterrupt` exception is raised for the thread.

    Args:
        reqid: Request id.
    """
    try:
        tlogger = self.logger.enter()
        tlogger.info("terminating thread ({reqid})")

        th = self.reqid2thread.get(reqid)
        th.terminate()
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

penvm.server.session

A (server-side) session isolates communication, operations, and processing. Except for special kernel operations which allow for one session to affect another, there are no inter-session dependencies.

Each session has its own incoming and outgoing message queues.

Each session has and manages its own Processor which performs all session-related processing.

Concurrency is supported by the Processor (dynamically adjustable number of threads) and between sessions. Which means that while one/some session(s) may be blocked, others may not be.

Session

Bases: BaseObject

Server-side session.

Source code in penvm/src/server/penvm/server/session.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class Session(BaseObject):
    """Server-side session."""

    def __init__(
        self,
        machine: "Machine",
        sessionid: str,
        kernelname: str = "default",
    ):
        """Initialize.

        Args:
            machine: Owning machine.
            sessionid: Session id.
            kernelname: Kernel name.
        """
        try:
            super().__init__(sessionid, logger)
            tlogger = self.logger.enter()

            self.machine = machine

            self.exit = False
            # TODO: should session/sessionid be passed to Processor()?
            self.imq = MessageQueue()
            self.omq = MessageQueue()
            self.proc = Processor(self, kernelname)
            self.th = None
            self.pinned = False
        finally:
            tlogger.exit()

    def __repr__(self):
        return f"<Session id={self.oid} machine={self.machine}>"

    def is_empty(self) -> bool:
        """Return if session is "empty", and therefore deletable.

        Returns:
            "Emptiness" status.
        """
        # TODO: verify that active_count is an appropriate check
        if self.proc.active_count() == 0 and self.imq.size() == 0 and self.omq.size() == 0:
            return True
        return False

    def is_running(self) -> bool:
        """Return running status.

        Returns:
            Running status.
        """
        return self.th != None

    def run(self):
        """Run session.

        Loops until `self.exit` is `False`.
        """
        try:
            tlogger = self.logger.enter()

            self.proc.start()
            ctxt = OpContext()
            ctxt.machine = self.machine
            ctxt.processor = self.proc
            ctxt.session = self

            try:
                while not self.exit:
                    tlogger.debug("waiting for message ...")
                    req = self.imq.pop()
                    if req == None:
                        # reject dummy "message"
                        continue

                    tlogger.debug(f"popped message and scheduling ({req.payload.get('op')}) ...")
                    if self.machine.debug and not self.oid.startswith("-debug-"):
                        self.machine.schedlock.acquire()
                    self.proc.schedule(ctxt, req)

                    tlogger.debug(f"message scheduled ({req.payload.get('op')})")
            except Exception as e:
                tlogger.warning(f"EXCEPTION ({e})")
                raise

            self.th = None
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def set_kernel(
        self,
        kernel: "Kernel",
    ):
        """Set the kernel to use.

        Args:
            kernel: Kernel.
        """
        self.logger.debug("set kernel")
        self.proc.kernel = kernel

    def start(self):
        """Start the main session thread."""
        try:
            tlogger = self.logger.enter()

            if not self.th:
                try:
                    self.th = Thread(target=self.run)
                    self.th.daemon = True
                    self.th.start()
                except Exception as e:
                    tlogger.debug(f"failed to start ({e})")
                    self.th = None
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def stop(self):
        """Stop the `run` method running in a thread."""
        self.exit = True
        # dummy "message" to wake up thread
        self.imq.put(None)

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "session",
                self.oid,
                {
                    "imq": self.imq.state(),
                    "nimq": self.imq.size(),
                    "nomq": self.omq.size(),
                    "omq": self.omq.state(),
                    "processor": self.proc.state(),
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(machine, sessionid, kernelname='default')

Initialize.

Parameters:

Name Type Description Default
machine Machine

Owning machine.

required
sessionid str

Session id.

required
kernelname str

Kernel name.

'default'
Source code in penvm/src/server/penvm/server/session.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    machine: "Machine",
    sessionid: str,
    kernelname: str = "default",
):
    """Initialize.

    Args:
        machine: Owning machine.
        sessionid: Session id.
        kernelname: Kernel name.
    """
    try:
        super().__init__(sessionid, logger)
        tlogger = self.logger.enter()

        self.machine = machine

        self.exit = False
        # TODO: should session/sessionid be passed to Processor()?
        self.imq = MessageQueue()
        self.omq = MessageQueue()
        self.proc = Processor(self, kernelname)
        self.th = None
        self.pinned = False
    finally:
        tlogger.exit()

is_empty()

Return if session is "empty", and therefore deletable.

Returns:

Type Description
bool

"Emptiness" status.

Source code in penvm/src/server/penvm/server/session.py
83
84
85
86
87
88
89
90
91
92
def is_empty(self) -> bool:
    """Return if session is "empty", and therefore deletable.

    Returns:
        "Emptiness" status.
    """
    # TODO: verify that active_count is an appropriate check
    if self.proc.active_count() == 0 and self.imq.size() == 0 and self.omq.size() == 0:
        return True
    return False

is_running()

Return running status.

Returns:

Type Description
bool

Running status.

Source code in penvm/src/server/penvm/server/session.py
 94
 95
 96
 97
 98
 99
100
def is_running(self) -> bool:
    """Return running status.

    Returns:
        Running status.
    """
    return self.th != None

run()

Run session.

Loops until self.exit is False.

Source code in penvm/src/server/penvm/server/session.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def run(self):
    """Run session.

    Loops until `self.exit` is `False`.
    """
    try:
        tlogger = self.logger.enter()

        self.proc.start()
        ctxt = OpContext()
        ctxt.machine = self.machine
        ctxt.processor = self.proc
        ctxt.session = self

        try:
            while not self.exit:
                tlogger.debug("waiting for message ...")
                req = self.imq.pop()
                if req == None:
                    # reject dummy "message"
                    continue

                tlogger.debug(f"popped message and scheduling ({req.payload.get('op')}) ...")
                if self.machine.debug and not self.oid.startswith("-debug-"):
                    self.machine.schedlock.acquire()
                self.proc.schedule(ctxt, req)

                tlogger.debug(f"message scheduled ({req.payload.get('op')})")
        except Exception as e:
            tlogger.warning(f"EXCEPTION ({e})")
            raise

        self.th = None
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

set_kernel(kernel)

Set the kernel to use.

Parameters:

Name Type Description Default
kernel Kernel

Kernel.

required
Source code in penvm/src/server/penvm/server/session.py
140
141
142
143
144
145
146
147
148
149
150
def set_kernel(
    self,
    kernel: "Kernel",
):
    """Set the kernel to use.

    Args:
        kernel: Kernel.
    """
    self.logger.debug("set kernel")
    self.proc.kernel = kernel

start()

Start the main session thread.

Source code in penvm/src/server/penvm/server/session.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def start(self):
    """Start the main session thread."""
    try:
        tlogger = self.logger.enter()

        if not self.th:
            try:
                self.th = Thread(target=self.run)
                self.th.daemon = True
                self.th.start()
            except Exception as e:
                tlogger.debug(f"failed to start ({e})")
                self.th = None
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/server/penvm/server/session.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "session",
            self.oid,
            {
                "imq": self.imq.state(),
                "nimq": self.imq.size(),
                "nomq": self.omq.size(),
                "omq": self.omq.state(),
                "processor": self.proc.state(),
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

stop()

Stop the run method running in a thread.

Source code in penvm/src/server/penvm/server/session.py
170
171
172
173
174
def stop(self):
    """Stop the `run` method running in a thread."""
    self.exit = True
    # dummy "message" to wake up thread
    self.imq.put(None)

penvm.server.sessionmanager

All sessions are managed by SessionManager. Creation, removal, cleanup, and lookups are centrally handled by the SessionManager.

SessionManager

Bases: BaseObject

Manages server-side sessions.

All sessions are managed by a SessionManager owned by the machine.

Source code in penvm/src/server/penvm/server/sessionmanager.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class SessionManager(BaseObject):
    """Manages server-side sessions.

    All sessions are managed by a SessionManager owned by the machine."""

    def __init__(
        self,
        machine: "Machine",
    ):
        """Initialize.

        Args:
            machine: Owning machine.
        """
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.machine = machine

            self.lock = threading.Lock()
            self.sessions = {}
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def cleanup(
        self,
        sessionid: str,
    ):
        """Clean up session (if "empty").

        Args:
            sessionid: Session id.
        """
        try:
            self.lock.acquire()
            sess = self.sessions.get(sessionid)
            if sess and not sess.pinned:
                if sess.is_empty():
                    sess.stop()
                    self.pop(sessionid)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            self.lock.release()

    def delete(
        self,
        sessionid: str,
    ):
        """Delete (forced) a session by session id.

        Args:
            sessionid: Session id.
        """
        try:
            tlogger = self.logger.enter()

            sess = self.sessions.pop(sessionid)
            if sess:
                # delete
                pass
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            tlogger.exit()

    def get(
        self,
        sessionid: str,
    ) -> "Session":
        """Get a session by session id.

        Args:
            sessionid: Session id.

        Returns:
            Session.
        """
        try:
            self.lock.acquire()
            return self.sessions.get(sessionid)
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            self.lock.release()

    def list(self) -> List[str]:
        """List sessions by session id.

        Returns:
            Session ids.
        """
        return list(self.sessions.keys())

    def pop(self, sessionid: str) -> "Session":
        """Pop a session by session id.

        Args:
            sessionid: Session id.

        Returns:
            Session.
        """
        try:
            self.lock.acquire()
            sess = self.sessions.pop(sessionid)
            return sess
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            self.lock.release()

    def setdefault(
        self,
        sessionid: Union[str, None],
        default: Union["Session", None] = None,
    ) -> "Session":
        """Get a session (or create new one if not present) by session
        id.

        Args:
            sessionid: Session id.
            default: Session if session not found.

        Returns:
            Session.
        """
        try:
            self.lock.acquire()
            session = self.sessions.get(sessionid)
            if not session:
                session = self.sessions[sessionid] = default or Session(self.machine, sessionid)
                session.start()
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")
        finally:
            self.lock.release()
        return session

    def start(self):
        """Start.

        NOOP. No background threads."""
        pass

    def state(self) -> "State":
        """Get object state.

        Returns:
            `State` object.
        """
        try:
            return State(
                "session-manager",
                None,
                {
                    "nsessions": len(self.sessions),
                    "session-ids": self.list(),
                },
            )
        except Exception as e:
            self.logger.warning(f"EXCEPTION ({e})")

__init__(machine)

Initialize.

Parameters:

Name Type Description Default
machine Machine

Owning machine.

required
Source code in penvm/src/server/penvm/server/sessionmanager.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    machine: "Machine",
):
    """Initialize.

    Args:
        machine: Owning machine.
    """
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.machine = machine

        self.lock = threading.Lock()
        self.sessions = {}
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

cleanup(sessionid)

Clean up session (if "empty").

Parameters:

Name Type Description Default
sessionid str

Session id.

required
Source code in penvm/src/server/penvm/server/sessionmanager.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def cleanup(
    self,
    sessionid: str,
):
    """Clean up session (if "empty").

    Args:
        sessionid: Session id.
    """
    try:
        self.lock.acquire()
        sess = self.sessions.get(sessionid)
        if sess and not sess.pinned:
            if sess.is_empty():
                sess.stop()
                self.pop(sessionid)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        self.lock.release()

delete(sessionid)

Delete (forced) a session by session id.

Parameters:

Name Type Description Default
sessionid str

Session id.

required
Source code in penvm/src/server/penvm/server/sessionmanager.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def delete(
    self,
    sessionid: str,
):
    """Delete (forced) a session by session id.

    Args:
        sessionid: Session id.
    """
    try:
        tlogger = self.logger.enter()

        sess = self.sessions.pop(sessionid)
        if sess:
            # delete
            pass
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        tlogger.exit()

get(sessionid)

Get a session by session id.

Parameters:

Name Type Description Default
sessionid str

Session id.

required

Returns:

Type Description
Session

Session.

Source code in penvm/src/server/penvm/server/sessionmanager.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get(
    self,
    sessionid: str,
) -> "Session":
    """Get a session by session id.

    Args:
        sessionid: Session id.

    Returns:
        Session.
    """
    try:
        self.lock.acquire()
        return self.sessions.get(sessionid)
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        self.lock.release()

list()

List sessions by session id.

Returns:

Type Description
List[str]

Session ids.

Source code in penvm/src/server/penvm/server/sessionmanager.py
124
125
126
127
128
129
130
def list(self) -> List[str]:
    """List sessions by session id.

    Returns:
        Session ids.
    """
    return list(self.sessions.keys())

pop(sessionid)

Pop a session by session id.

Parameters:

Name Type Description Default
sessionid str

Session id.

required

Returns:

Type Description
Session

Session.

Source code in penvm/src/server/penvm/server/sessionmanager.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def pop(self, sessionid: str) -> "Session":
    """Pop a session by session id.

    Args:
        sessionid: Session id.

    Returns:
        Session.
    """
    try:
        self.lock.acquire()
        sess = self.sessions.pop(sessionid)
        return sess
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        self.lock.release()

setdefault(sessionid, default=None)

Get a session (or create new one if not present) by session id.

Parameters:

Name Type Description Default
sessionid Union[str, None]

Session id.

required
default Union[Session, None]

Session if session not found.

None

Returns:

Type Description
Session

Session.

Source code in penvm/src/server/penvm/server/sessionmanager.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def setdefault(
    self,
    sessionid: Union[str, None],
    default: Union["Session", None] = None,
) -> "Session":
    """Get a session (or create new one if not present) by session
    id.

    Args:
        sessionid: Session id.
        default: Session if session not found.

    Returns:
        Session.
    """
    try:
        self.lock.acquire()
        session = self.sessions.get(sessionid)
        if not session:
            session = self.sessions[sessionid] = default or Session(self.machine, sessionid)
            session.start()
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")
    finally:
        self.lock.release()
    return session

start()

Start.

NOOP. No background threads.

Source code in penvm/src/server/penvm/server/sessionmanager.py
177
178
179
180
181
def start(self):
    """Start.

    NOOP. No background threads."""
    pass

state()

Get object state.

Returns:

Type Description
State

State object.

Source code in penvm/src/server/penvm/server/sessionmanager.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def state(self) -> "State":
    """Get object state.

    Returns:
        `State` object.
    """
    try:
        return State(
            "session-manager",
            None,
            {
                "nsessions": len(self.sessions),
                "session-ids": self.list(),
            },
        )
    except Exception as e:
        self.logger.warning(f"EXCEPTION ({e})")

penvm.server.storage

StorageManager

Bases: BaseObject

Storage manager.

NIY.

Source code in penvm/src/server/penvm/server/storage.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class StorageManager(BaseObject):
    """Storage manager.

    NIY.
    """

    def __init__(self, machine: "Machine"):
        """Initialize.

        Args:
            machine: Owning machine.
        """
        try:
            super().__init__(None, logger)
            tlogger = self.logger.enter()

            self.machine = machine
            self.stores = {}
        finally:
            tlogger.exit()

    def create(self):
        pass

    def delete(
        self,
        storeid: str,
    ):
        """Delete by id.

        Args:
            storeid: Store id.
        """
        store = self.stores.pop(storeid)
        if store:
            # delete
            pass

    def get(self, storeid: str) -> Any:
        """Get object by id.

        Args:
            storeid: Store id.

        Returns:
            Store.
        """
        return self.store.get(storeid)

    def list(self) -> List[str]:
        """List store ids.

        Returns:
            Store ids.
        """
        return self.store.keys()

    def run(self):
        pass

__init__(machine)

Initialize.

Parameters:

Name Type Description Default
machine Machine

Owning machine.

required
Source code in penvm/src/server/penvm/server/storage.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, machine: "Machine"):
    """Initialize.

    Args:
        machine: Owning machine.
    """
    try:
        super().__init__(None, logger)
        tlogger = self.logger.enter()

        self.machine = machine
        self.stores = {}
    finally:
        tlogger.exit()

delete(storeid)

Delete by id.

Parameters:

Name Type Description Default
storeid str

Store id.

required
Source code in penvm/src/server/penvm/server/storage.py
52
53
54
55
56
57
58
59
60
61
62
63
64
def delete(
    self,
    storeid: str,
):
    """Delete by id.

    Args:
        storeid: Store id.
    """
    store = self.stores.pop(storeid)
    if store:
        # delete
        pass

get(storeid)

Get object by id.

Parameters:

Name Type Description Default
storeid str

Store id.

required

Returns:

Type Description
Any

Store.

Source code in penvm/src/server/penvm/server/storage.py
66
67
68
69
70
71
72
73
74
75
def get(self, storeid: str) -> Any:
    """Get object by id.

    Args:
        storeid: Store id.

    Returns:
        Store.
    """
    return self.store.get(storeid)

list()

List store ids.

Returns:

Type Description
List[str]

Store ids.

Source code in penvm/src/server/penvm/server/storage.py
77
78
79
80
81
82
83
def list(self) -> List[str]:
    """List store ids.

    Returns:
        Store ids.
    """
    return self.store.keys()
You are on penvm.dev