from __future__ import absolute_import
import os
import logging
try:
from urllib import urlopen # py2
except ImportError:
from urllib.request import urlopen # py3
from traitlets import (
observe,
Bool, Bytes, Dict, Instance, Int, List, TraitError, Unicode, validate
)
from ipywidgets import DOMWidget, Image, Video, Audio, register, widget_serialization
from ipython_genutils.py3compat import string_types
import ipywebrtc._version
import traitlets
import ipywidgets as widgets
logger = logging.getLogger("jupyter-webrtc")
semver_range_frontend = "~" + ipywebrtc._version.__version_js__
# for backwards compatibility with ipyvolume
HasStream = MediaStream
[docs]@register
class ImageStream(MediaStream):
"""Represent a media stream by a static image"""
_model_name = Unicode('ImageStreamModel').tag(sync=True)
image = Instance(
Image,
help="An ipywidgets.Image instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
[docs] @classmethod
def from_file(cls, filename, **kwargs):
"""Create a `ImageStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the value from disk.
**kwargs:
Extra keyword arguments for `ImageStream`
Returns an `ImageStream`.
"""
return cls(image=Image.from_file(filename), **kwargs)
[docs] @classmethod
def from_url(cls, url, **kwargs):
"""Create a `ImageStream` from a url.
This will create a `ImageStream` from an Image using its url
Parameters
----------
url: str
The url of the file that will be used for the .image trait.
**kwargs:
Extra keyword arguments for `ImageStream`
Returns an `ImageStream`.
"""
return cls(image=Image.from_url(url), **kwargs)
[docs] @classmethod
def from_download(cls, url, **kwargs):
"""Create a `ImageStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs:
Extra keyword arguments for `ImageStream`
Returns an `ImageStream` with the value set from the content of a url.
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
image = Image(value=urlopen(url).read(), format=format)
return cls(image=image, **kwargs)
[docs]@register
class VideoStream(MediaStream):
"""Represent a stream of a video element"""
_model_name = Unicode('VideoStreamModel').tag(sync=True)
video = Instance(
Video,
allow_none=False,
help="An ipywidgets.Video instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
playing = Bool(True, help='Plays the videostream or pauses it.').tag(sync=True)
[docs] @classmethod
def from_file(cls, filename, **kwargs):
"""Create a `VideoStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the value from disk.
**kwargs:
Extra keyword arguments for `VideoStream`
Returns an `VideoStream`.
"""
video = Video.from_file(filename, autoplay=False, controls=False)
return cls(video=video, **kwargs)
[docs] @classmethod
def from_url(cls, url, **kwargs):
"""Create a `VideoStream` from a url.
This will create a `VideoStream` from a Video using its url
Parameters
----------
url: str
The url of the file that will be used for the .video trait.
**kwargs:
Extra keyword arguments for `VideoStream`
Returns an `VideoStream`.
"""
video = Video.from_url(url, autoplay=False, controls=False)
return cls(video=video, **kwargs)
[docs] @classmethod
def from_download(cls, url, **kwargs):
"""Create a `VideoStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs:
Extra keyword arguments for `VideoStream`
Returns an `VideoStream` with the value set from the content of a url.
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
video = Video(value=urlopen(url).read(), format=format, autoplay=False, controls=False)
return cls(video=video, **kwargs)
@register
class AudioStream(MediaStream):
"""Represent a stream of an audio element"""
_model_name = Unicode('AudioStreamModel').tag(sync=True)
_view_name = Unicode('AudioStreamView').tag(sync=True)
audio = Instance(
Audio,
help="An ipywidgets.Audio instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
playing = Bool(True, help='Plays the audiostream or pauses it.').tag(sync=True)
@classmethod
def from_file(cls, filename, **kwargs):
"""Create a `AudioStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the audio value from disk.
**kwargs:
Extra keyword arguments for `AudioStream`
Returns an `AudioStream`.
"""
audio = Audio.from_file(filename, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
@classmethod
def from_url(cls, url, **kwargs):
"""Create a `AudioStream` from a url.
This will create a `AudioStream` from an Audio using its url
Parameters
----------
url: str
The url of the file that will be used for the .audio trait.
**kwargs:
Extra keyword arguments for `AudioStream`
Returns an `AudioStream`.
"""
audio = Audio.from_url(url, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
@classmethod
def from_download(cls, url, **kwargs):
"""Create a `AudioStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs:
Extra keyword arguments for `AudioStream`
Returns an `AudioStream` with the value set from the content of a url.
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
audio = Audio(value=urlopen(url).read(), format=format, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
[docs]@register
class CameraStream(MediaStream):
"""Represents a media source by a camera/webcam/microphone using
getUserMedia. See
https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
for more detail.
The constraints trait can be set to specify constraints for the camera or
microphone, which is described in the documentation of getUserMedia, such
as in the link above,
Two convenience methods are avaiable to easily get access to the 'front'
and 'back' camera, when present
>>> CameraStream.facing_user(audio=False)
>>> CameraStream.facing_environment(audio=False)
"""
_model_name = Unicode('CameraStreamModel').tag(sync=True)
# Specify audio constraint and video constraint
# see https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
constraints = Dict(
{'audio': True, 'video': True},
help='Constraints for the camera, see https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia for details.'
).tag(sync=True)
[docs] @classmethod
def facing_user(cls, audio=True, **kwargs):
"""Convenience method to get the camera facing the user (often front)
Parameters
----------
audio: bool
Capture audio or not
kwargs:
Extra keyword arguments passed to the `CameraStream`
"""
return cls._facing(facing_mode='user', audio=audio, **kwargs)
[docs] @classmethod
def facing_environment(cls, audio=True, **kwargs):
"""Convenience method to get the camera facing the environment (often the back)
Parameters
----------
audio: bool
Capture audio or not
kwargs:
Extra keyword arguments passed to the `CameraStream`
"""
return cls._facing(facing_mode='environment', audio=audio, **kwargs)
@staticmethod
def _facing(facing_mode, audio=True, **kwargs):
kwargs = dict(kwargs)
constraints = kwargs.pop('constraints', {})
if 'audio' not in constraints:
constraints['audio'] = audio
if 'video' not in constraints:
constraints['video'] = {}
constraints['video']['facingMode'] = facing_mode
return CameraStream(constraints=constraints, **kwargs)
class Recorder(DOMWidget):
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
stream = Instance(MediaStream, allow_none=True, help="An instance of :class:`MediaStream` that is the source for recording.")\
.tag(sync=True, **widget_serialization)
filename = Unicode('record', help='The filename used for downloading or auto saving.').tag(sync=True)
format = Unicode('webm', help='The format of the recording.').tag(sync=True)
recording = Bool(False, help='(boolean) Indicator and controller of the recorder state, i.e. putting the value to True will start recording.').tag(sync=True)
autosave = Bool(False, help='If true, will save the data to a file once the recording is finished (based on filename and format)').tag(sync=True)
_data_src = Unicode('').tag(sync=True)
def download(self):
"""Download the recording (usually a popup appears in the browser)"""
self.send({'msg': 'download'})
@register
class ImageRecorder(Recorder):
"""Creates a recorder which allows to grab an Image from a MediaStream widget.
"""
_model_name = Unicode('ImageRecorderModel').tag(sync=True)
_view_name = Unicode('ImageRecorderView').tag(sync=True)
image = Instance(Image).tag(sync=True, **widget_serialization)
format = Unicode('png', help='The format of the image.').tag(sync=True)
_width = Unicode().tag(sync=True)
_height = Unicode().tag(sync=True)
@traitlets.default('image')
def _default_image(self):
return Image(width=self._width, height=self._height, format=self.format)
@observe('_width')
def _update_image_width(self, change):
self.image.width = self._width
@observe('_height')
def _update_image_height(self, change):
self.image.height = self._height
@observe('format')
def _update_image_format(self, change):
self.image.format = self.format
@observe('image')
def _bind_image(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.image.value) and self.autosave:
self.save()
def save(self, filename=None):
"""Save the image to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = ImageRecorder(filename='test', format='png')
>>> ...
>>> recorder.save() # will save to test.png
>>> recorder.save('foo') # will save to foo.png
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.image.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.image.value)
@register
class VideoRecorder(Recorder):
"""Creates a recorder which allows to record a MediaStream widget, play the
record in the Notebook, and download it or turn it into a Video widget.
"""
_model_name = Unicode('VideoRecorderModel').tag(sync=True)
_view_name = Unicode('VideoRecorderView').tag(sync=True)
video = Instance(Video).tag(sync=True, **widget_serialization)
@traitlets.default('video')
def _default_video(self):
return Video(format=self.format, controls=True)
@observe('format')
def _update_video_format(self, change):
self.video.format = self.format
@observe('video')
def _bind_video(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.video.value) and self.autosave:
self.save()
def save(self, filename=None):
"""Save the video to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = VideoRecorder(filename='test', format='mp4')
>>> ...
>>> recorder.save() # will save to test.mp4
>>> recorder.save('foo') # will save to foo.mp4
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.video.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.video.value)
@register
class AudioRecorder(Recorder):
"""Creates a recorder which allows to record the Audio of a MediaStream widget, play the
record in the Notebook, and download it or turn it into an Audio widget.
"""
_model_name = Unicode('AudioRecorderModel').tag(sync=True)
_view_name = Unicode('AudioRecorderView').tag(sync=True)
audio = Instance(Audio).tag(sync=True, **widget_serialization)
@traitlets.default('audio')
def _default_audio(self):
return Audio(format=self.format, controls=True)
@observe('format')
def _update_audio_format(self, change):
self.audio.format = self.format
@observe('audio')
def _bind_video(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.audio.value) and self.autosave:
self.save()
def save(self, filename=None):
"""Save the audio to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = AudioRecorder(filename='test', format='mp3')
>>> ...
>>> recorder.save() # will save to test.mp3
>>> recorder.save('foo') # will save to foo.mp3
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.audio.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.audio.value)
[docs]@register
class WebRTCPeer(DOMWidget):
"""A peer-to-peer webrtc connection"""
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_name = Unicode('WebRTCPeerView').tag(sync=True)
_model_name = Unicode('WebRTCPeerModel').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
stream_local = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
stream_remote = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
id_local = Unicode('').tag(sync=True)
id_remote = Unicode('').tag(sync=True)
connected = Bool(False, read_only=True).tag(sync=True)
failed = Bool(False, read_only=True).tag(sync=True)
[docs] def connect(self):
self.send({'msg': 'connect'})
[docs]@register
class WebRTCRoom(DOMWidget):
"""A 'chatroom', which consists of a list of :`WebRTCPeer` connections
"""
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_model_name = Unicode('WebRTCRoomModel').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
room = Unicode('room').tag(sync=True)
stream = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
id = Unicode(read_only=True).tag(sync=True)
nickname = Unicode('anonymous').tag(sync=True)
peers = List(Instance(WebRTCPeer), [], allow_none=False).tag(sync=True, **widget_serialization)
streams = List(Instance(MediaStream), [], allow_none=False).tag(sync=True, **widget_serialization)
[docs]@register
class WebRTCRoomLocal(WebRTCRoom):
_model_name = Unicode('WebRTCRoomLocalModel').tag(sync=True)
[docs]@register
class WebRTCRoomMqtt(WebRTCRoom):
"""Use a mqtt server to connect to other peers"""
_model_name = Unicode('WebRTCRoomMqttModel').tag(sync=True)
server = Unicode('wss://iot.eclipse.org:443/ws').tag(sync=True)
# add all help strings to the __doc__ for the api docstrings
for name, cls in list(vars().items()):
try:
if issubclass(cls, traitlets.HasTraits):
for trait_name, trait in cls.class_traits().items():
if 'help' in trait.metadata:
trait.__doc__ = trait.metadata['help']
except TypeError:
pass