Serveur d'impression
Exemple Python: Code serveur Python (server.py) – Bien choisir son serveur d impression
Cette section fournit le code du serveur Python décrit dans Exemple Python (client HTML5 et Python
Serveur).
"" "
Exemple d'application Python 2.7 + / 3.3 +
Cette application se compose d'un serveur HTTP 1.1 utilisant le transfert HTTP fragmenté
codage (https://tools.ietf.org/html/rfc2616#section-3.6.1) et un HTML5 minimal
interface utilisateur qui interagit avec elle.
Le but de cet exemple est de commencer à diffuser le discours vers le client (le
Interface Web HTML5) dès que le premier morceau de discours consommable est renvoyé dans
afin de commencer à lire l'audio dès que possible.
Pour les cas d'utilisation où une faible latence et réactivité sont des exigences fortes,
c'est l'approche recommandée.
La documentation du service contient des exemples de cas d'utilisation sans streaming où
attendre la fin de la synthèse vocale et récupérer tout le flux audio
sont à la fois une option.
Pour tester l'application, exécutez 'python server.py' puis ouvrez l'URL
affiché dans le terminal dans un navigateur Web (voir index.html pour une liste des
navigateurs pris en charge). L'adresse et le port du serveur peuvent être transmis comme
paramètres à server.py. Pour plus d'informations, exécutez: 'python server.py -h'
"" "
depuis argparse import ArgumentParser
à partir des collections importées namedtuple
de la fermeture de l'import contextlib
depuis io import BytesIO
depuis les vidages d'importation json en tant que json_encode
importer os
importer sys
si sys.version_info> = (3, 0):
depuis http.server import BaseHTTPRequestHandler, HTTPServer
depuis socketserver importation ThreadingMixIn
depuis urllib.parse importez parse_qs
autre:
depuis BaseHTTPServer, importez BaseHTTPRequestHandler, HTTPServer
depuis SocketServer importation ThreadingMixIn
depuis urlparse import parse_qs
de la session d'importation boto3
à partir de botocore.exceptions, importez BotoCoreError, ClientError
ResponseStatus = namedtuple ("HTTPStatus",
["code", "message"])
ResponseData = namedtuple ("ResponseData",
["status", "content_type", "data_stream"])
# Mappage du format de sortie utilisé dans le client au type de contenu pour le
# réponse
AUDIO_FORMATS = "ogg_vorbis": "audio / ogg",
"mp3": "audio / mpeg",
"pcm": "audio / onde; codecs = 1"
CHUNK_SIZE = 1024
HTTP_STATUS = "OK": ResponseStatus (code = 200, message = "OK"),
"BAD_REQUEST": ResponseStatus (code = 400, message = "Bad request"),
"NOT_FOUND": ResponseStatus (code = 404, message = "Not found"),
"INTERNAL_SERVER_ERROR": ResponseStatus (code = 500, message = "Erreur de serveur interne")
PROTOCOL = "http"
ROUTE_INDEX = "/index.html"
ROUTE_VOICES = "/ voices"
ROUTE_READ = "/ lire"
# Créez un client en utilisant les informations d'identification et la région définies dans l'administrateur
# section des informations d'identification AWS et des fichiers de configuration
session = Session (profile_name = "adminuser")
polly = session.client ("polly")
classe HTTPStatusError (Exception):
"" "Exception encapsulant une valeur de http.server.HTTPStatus" ""
def __init __ (auto, état, description = Aucun):
"" "
Construit une instance d'erreur à partir d'un tuple de
(code, message, description), voir http.server.HTTPStatus
"" "
super (HTTPStatusError, self) .__ init __ ()
self.code = status.code
self.message = status.message
self.explain = description
classe ThreadedHTTPServer (ThreadingMixIn, HTTPServer):
"" "Un serveur HTTP qui gère chaque demande dans un nouveau thread" ""
daemon_threads = True
classe ChunkedHTTPRequestHandler (BaseHTTPRequestHandler):
"" "" HTTP 1.1 Gestionnaire de demande de codage en morceaux "" "
# Utiliser HTTP 1.1 car 1.0 ne prend pas en charge le codage en morceaux
protocol_version = "HTTP / 1.1"
def query_get (self, queryData, key, default = ""):
"" "Aide pour obtenir des valeurs à partir d'une chaîne de requête pré-analysée" ""
return queryData.get (clé, [default])[0]
def do_GET (auto):
"" "Gère les requêtes GET" ""
# Extraire des valeurs de la chaîne de requête
path, _, query_string = self.path.partition ('?')
query = parse_qs (query_string)
réponse = Aucune
imprimer (u "[START]: GET reçu pour% s avec requête:% s "% (chemin, requête))
essayer:
# Gérer les chemins de requête possibles
si chemin == ROUTE_INDEX:
response = self.route_index (chemin, requête)
chemin elif == ROUTE_VOICES:
response = self.route_voices (chemin, requête)
chemin elif == ROUTE_READ:
response = self.route_read (chemin, requête)
autre:
response = self.route_not_found (chemin, requête)
self.send_headers (response.status, response.content_type)
self.stream_data (response.data_stream)
sauf HTTPStatusError comme err:
# Répondre avec une erreur et déboguer le journal
# information
si sys.version_info> = (3, 0):
self.send_error (err.code, err.message, err.explain)
autre:
self.send_error (err.code, err.message)
self.log_error (u "% s% s% s - [%d] % s ", self.client_address[0],
self.command, self.path, err.code, err.explain)
impression("[END]")
def route_not_found (auto, chemin, requête):
"" "Gère le routage pour les chemins inattendus" ""
lever HTTPStatusError (HTTP_STATUS["NOT_FOUND"], "Page non trouvée")
def route_index (auto, chemin, requête):
"" "Gère le routage pour le point d'entrée de l'application" "" "
essayer:
return ResponseData (status = HTTP_STATUS["OK"], content_type = "text_html",
# Ouvrir un flux binaire pour lire l'index
# Fichier HTML
data_stream = open (os.path.join (sys.path[0],
chemin[1:]), "rb"))
sauf IOError comme err:
# Impossible d'ouvrir le flux
lever HTTPStatusError (HTTP_STATUS["INTERNAL_SERVER_ERROR"],
str (err))
def route_voices (auto, chemin, requête):
"" "Gère le routage pour répertorier les voix disponibles" ""
params =
voix = []
tandis que True:
essayer:
# Demander la liste des voix disponibles, si un jeton de continuation
# a été retourné par l'appel précédent, puis utilisez-le pour continuer
# référencement
response = polly.describe_voices (** params)
sauf (BotoCoreError, ClientError) comme err:
# Le service a renvoyé une erreur
lever HTTPStatusError (HTTP_STATUS["INTERNAL_SERVER_ERROR"],
str (err))
# Recueillez toutes les voix
voices.extend (response.get ("Voices", []))
# Si un jeton de continuation a été renvoyé continuez, arrêtez d'itérer
# autrement
si "NextToken" en réponse:
params = "NextToken": réponse["NextToken"]
autre:
Pause
json_data = json_encode (voix)
bytes_data = octets (json_data, "utf-8") si sys.version_info> = (3, 0)
sinon octets (json_data)
return ResponseData (status = HTTP_STATUS["OK"],
content_type = "application / json",
# Créer un flux binaire pour les données JSON
data_stream = BytesIO (bytes_data))
def route_read (auto, chemin, requête):
"" "Gère le routage pour la lecture de texte (synthèse vocale)" ""
# Récupère les paramètres de la chaîne de requête
text = self.query_get (requête, "texte")
voiceId = self.query_get (requête, "voiceId")
outputFormat = self.query_get (requête, "outputFormat")
# Validez les paramètres, définissez le drapeau d'erreur en cas d'inattendu
# valeurs
si len (texte) == 0 ou len (voiceId) == 0 ou
outputFormat pas dans AUDIO_FORMATS:
lever HTTPStatusError (HTTP_STATUS["BAD_REQUEST"],
"Paramètres incorrects")
autre:
essayer:
# Demander la synthèse vocale
response = polly.synthesize_speech (Text = text,
VoiceId = voiceId,
OutputFormat = outputFormat)
sauf (BotoCoreError, ClientError) comme err:
# Le service a renvoyé une erreur
lever HTTPStatusError (HTTP_STATUS["INTERNAL_SERVER_ERROR"],
str (err))
return ResponseData (status = HTTP_STATUS["OK"],
content_type = AUDIO_FORMATS[outputFormat],
# Accéder au flux audio dans la réponse
data_stream = response.get ("AudioStream"))
def send_headers (self, status, content_type):
"" "Envoyer le groupe d'en-têtes pour une demande réussie" ""
# Envoyer des en-têtes HTTP
self.send_response (status.code, status.message)
self.send_header ('Content-type', content_type)
self.send_header ('Transfer-Encoding', 'chunked')
self.send_header ('Connexion', 'fermer')
self.end_headers ()
def stream_data (self, stream):
"" "Consomme un flux en morceaux pour produire la sortie de la réponse" "" "
print ("Le streaming a commencé ...")
si flux:
# Remarque: la fermeture du flux est importante car le service se réduit
# le nombre de connexions parallèles. Ici, nous utilisons
# contextlib.closing pour garantir la méthode close de l'objet stream
# sera appelé automatiquement à la fin de l'instruction with
# portée.
avec fermeture (flux) en tant que flux géré:
# Poussez le contenu du flux en morceaux
tandis que True:
data = managed_stream.read (CHUNK_SIZE)
self.wfile.write (b "% X r n% s r n"% (len (data), data))
# S'il n'y a plus de données à lire, arrêtez la diffusion
sinon données:
Pause
# Assurez-vous que toute sortie en mémoire tampon a été transmise et fermez
# stream
self.wfile.flush ()
imprimer ("Streaming terminé.")
autre:
# Le flux transmis est vide
self.wfile.write (b "0 r n r n")
print ("Rien à diffuser.")
# Définir et analyser les arguments de la ligne de commande
cli = ArgumentParser (description = 'Exemple d'application Python')
cli.add_argument (
"-p", "--port", type = int, metavar = "PORT", dest = "port", par défaut = 8000)
cli.add_argument (
"--host", type = str, metavar = "HOST", dest = "host", default = "localhost")
arguments = cli.parse_args ()
# Si le module est appelé directement, initialisez l'application
si __name__ == '__main__':
# Créer et configurer l'instance de serveur HTTP
server = ThreadedHTTPServer ((arguments.host, arguments.port),
ChunkedHTTPRequestHandler)
print ("Démarrage du serveur, utilisez arrêter...")
imprimer (u "ouvert 0: //1:23 dans un navigateur Web. ". Format (PROTOCOLE,
arguments.host,
arguments.port,
ROUTE_INDEX))
essayer:
# Écoutez les demandes indéfiniment
server.serve_forever ()
sauf KeyboardInterrupt:
# Une demande de résiliation a été reçue, arrêtez le serveur
print (" nArrêt ...")
server.socket.close ()
Click to rate this post!
[Total: 0 Average: 0]
Commentaires
Laisser un commentaire